Files
railway_cloud/app/api/comprehensive_data.py
2025-11-01 13:52:18 +08:00

428 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Depends, HTTPException, status,Query
from sqlalchemy.orm import Session
from typing import List, Optional
from ..core.database import get_db
from ..core.response_code import ResponseCode, ResponseMessage
from ..schemas.comprehensive_data import (
BatchSectionDataImportRequest,
BatchCheckpointDataImportRequest,
BatchSettlementDataImportRequest,
BatchLevelDataImportRequest,
BatchOriginalDataImportRequest,
DataImportResponse,
DataResponse,
SectionDataQueryRequest,
SettlementDataQueryRequest,
OriginalDataQueryRequest,
SettlementDataCheckpointQueryRequest,
LevelDataQueryRequest,
LinecodeRequest,
NYIDRequest,
SectionByAccountRequest
)
from ..services.daily import DailyDataService
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService
from ..services.original_data import OriginalDataService
from ..services.comprehensive import ComprehensiveDataService
import logging
router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"])
logger = logging.getLogger(__name__)
# 实例化服务类
section_service = SectionDataService()
checkpoint_service = CheckpointService()
settlement_service = SettlementDataService()
level_service = LevelDataService()
original_service = OriginalDataService()
comprehensive_service = ComprehensiveDataService()
@router.post("/batch_import_sections", response_model=DataImportResponse)
def batch_import_sections(request: BatchSectionDataImportRequest, db: Session = Depends(get_db)):
"""批量导入断面数据"""
try:
logger.info(f"Starting batch import sections, count: {len(request.data)}")
data_list = request.data
result = section_service.batch_import_sections(db, data_list)
logger.info(f"Batch import sections completed: {result['message']}")
# 统一响应格式
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import sections failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_checkpoints", response_model=DataImportResponse)
def batch_import_checkpoints(request: BatchCheckpointDataImportRequest, db: Session = Depends(get_db)):
"""批量导入观测点数据"""
try:
logger.info(f"Starting batch import checkpoints, count: {len(request.data)}")
data_list = request.data
result = checkpoint_service.batch_import_checkpoints(db, data_list)
logger.info(f"Batch import checkpoints completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import checkpoints failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_settlement_data", response_model=DataImportResponse)
def batch_import_settlement_data(request: BatchSettlementDataImportRequest, db: Session = Depends(get_db)):
"""批量导入沉降数据"""
try:
logger.info(f"Starting batch import settlement data, count: {len(request.data)}")
data_list = request.data
result = settlement_service.batch_import_settlement_data(db, data_list)
logger.info(f"Batch import settlement data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import settlement data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_level_data", response_model=DataImportResponse)
def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session = Depends(get_db)):
"""批量导入水准数据"""
try:
logger.info(f"Starting batch import level data, count: {len(request.data)}")
data_list = request.data
result = level_service.batch_import_level_data(db, data_list)
logger.info(f"Batch import level data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import level data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_original_data", response_model=DataImportResponse)
def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Session = Depends(get_db)):
"""批量导入原始数据 - 数据中必须包含account_id字段"""
try:
logger.info(f"Starting batch import original data, count: {len(request.data)}")
# 验证数据中是否包含account_id
if not request.data or len(request.data) == 0:
return DataImportResponse(
code=ResponseCode.BAD_REQUEST,
message="导入数据不能为空",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
# 检查第一条数据是否包含account_id
if 'account_id' not in request.data[0]:
return DataImportResponse(
code=ResponseCode.BAD_REQUEST,
message="数据中必须包含account_id字段",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
data_list = request.data
result = original_service.batch_import_original_data(db, data_list)
logger.info(f"Batch import original data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import original data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
# 查询断面数据对应观察点数据
@router.post("/get_section", response_model=DataResponse)
def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
logger.info(f"Querying section data with params: {request.dict()}")
result_data = section_service.search_sections_with_checkpoints(
db,
id=request.id,
section_id=request.section_id,
mileage=request.mileage,
work_site=request.work_site,
number=request.number,
status=request.status,
account_id=request.account_id
)
logger.info(f"Found {len(result_data)} sections with checkpoints")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(result_data),
data=result_data
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 根据观测点id查询沉降数据
@router.post("/get_settlement", response_model=DataResponse)
def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据按上传时间倒序排序支持limit参数限制返回数量"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
result_data = settlement_service.search_settlement_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
limit=request.limit
)
logger.info(f"Found {len(result_data)} settlement records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(result_data),
data=result_data
)
except Exception as e:
logger.error(f"Query settlement data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 查询沉降数据+观测点数据
@router.post("/get_settlement_checkpoint", response_model=DataResponse)
def get_settlement_checkpoint(request: SettlementDataCheckpointQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据+观测点数据按上传时间倒序排序支持limit参数限制返回数量"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
result_data = settlement_service.search_settlement_checkpoint_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
linecode=request.linecode,
limit=request.limit
)
logger.info(f"Found {len(result_data)} settlement records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(result_data),
data=result_data
)
except Exception as e:
logger.error(f"Query settlement data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 根据期数id获取原始数据
@router.post("/get_original", response_model=DataResponse)
def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)):
"""获取水准数据+原始数据 - account_id可选不填则查询所有分表"""
try:
logger.info(f"Querying original data with params: {request.dict()}")
result = comprehensive_service.get_level_and_original_data(
db,
account_id=request.account_id, # 可选
id=request.id,
bfpcode=request.bfpcode,
bffb=request.bffb,
nyid=request.NYID,
linecode=request.linecode,
bfpl=request.bfpl
)
return DataResponse(
code=ResponseCode.SUCCESS,
message=result["message"],
total=result["count"],
data=result["data"]
)
except Exception as e:
logger.error(f"Query original data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
@router.post("/get_settlement_by_linecode", response_model=DataResponse)
def get_settlement_by_linecode(
request: LinecodeRequest, # 假设定义了接收linecode的请求模型
db: Session = Depends(get_db)
):
try:
linecode = request.linecode # 从请求体中获取linecode
logger.info(f"接口请求根据linecode={linecode}查询沉降数据")
settlement_service = SettlementDataService()
result = settlement_service.get_settlement_by_linecode(db, linecode)
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据",
total=len(result['settlement_data']),
data=result['settlement_data']
)
except Exception as e:
logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)
@router.post("/get_settlement_by_nyid", response_model=DataResponse)
def get_settlement_by_nyid(
request: NYIDRequest, # 假设定义了接收nyid的请求模型
db: Session = Depends(get_db)
):
try:
nyid = request.NYID # 从请求体中获取nyid
logger.info(f"接口请求根据nyid={nyid}查询沉降数据")
settlement = SettlementDataService()
# 获取模型实例列表
checkpoint_instances = settlement.get_by_nyid(db, nyid=nyid)
# 转为字典列表(核心修正)
checkpoint_data = [instance.__dict__ for instance in checkpoint_instances]
# 清理 SQLAlchemy 内部属性(可选,避免多余字段)
checkpoint_data = [{k: v for k, v in item.items() if not k.startswith('_sa_')} for item in checkpoint_data]
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(checkpoint_data)}条沉降数据nyid={nyid}",
total=len(checkpoint_data),
data=checkpoint_data
)
except Exception as e:
logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)
@router.get("/get_today_data", response_model=DataResponse)
def get_today_data(db: Session = Depends(get_db)):
"""接口:直接触发调度器中的 scheduled_get_max_nyid_by_point_id 定时任务"""
try:
# scheduled_get_max_nyid_by_point_id()
daily_service = DailyDataService()
daily_data = daily_service.get_daily_data_by_account(db,account_id=1)
return DataResponse(
code=ResponseCode.SUCCESS,
message="定时任务触发执行成功!任务已开始处理(具体结果查看系统日志)",
total=1,
data=daily_data
)
except Exception as e:
logger.error(f"接口触发定时任务失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"定时任务触发失败:{str(e)}",
total=0,
data={}
)
# 查询断面数据对应观察点数据
@router.post("/get_all_section_by_account", response_model=DataResponse)
def get_all_section_by_account(request: SectionByAccountRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
account_id = request.account_id
section_service = SectionDataService()
result_data = section_service.get_by_section_id(db, account_id=account_id)
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(result_data),
data=result_data
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)