Files
railway_cloud/app/api/comprehensive_data.py
2025-10-16 11:43:00 +08:00

242 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List, Optional
from ..core.database import get_db
from ..schemas.comprehensive_data import (
BatchSectionDataImportRequest,
BatchCheckpointDataImportRequest,
BatchSettlementDataImportRequest,
BatchLevelDataImportRequest,
BatchOriginalDataImportRequest,
DataImportResponse,
DataResponse,
SectionDataQueryRequest,
SettlementDataQueryRequest,
OriginalDataQueryRequest,
LevelDataQueryRequest
)
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService
from ..services.original_data import OriginalDataService
from ..services.comprehensive import ComprehensiveDataService
import logging
router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"])
logger = logging.getLogger(__name__)
# 实例化服务类
section_service = SectionDataService()
checkpoint_service = CheckpointService()
settlement_service = SettlementDataService()
level_service = LevelDataService()
original_service = OriginalDataService()
comprehensive_service = ComprehensiveDataService()
@router.post("/batch_import_sections", response_model=DataImportResponse)
def batch_import_sections(request: BatchSectionDataImportRequest, db: Session = Depends(get_db)):
"""批量导入断面数据"""
try:
logger.info(f"Starting batch import sections, count: {len(request.data)}")
# 直接使用字典列表,不需要转换
data_list = request.data
result = section_service.batch_import_sections(db, data_list)
logger.info(f"Batch import sections completed: {result['message']}")
return DataImportResponse(**result)
except Exception as e:
logger.error(f"Batch import sections failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量导入断面数据失败: {str(e)}"
)
@router.post("/batch_import_checkpoints", response_model=DataImportResponse)
def batch_import_checkpoints(request: BatchCheckpointDataImportRequest, db: Session = Depends(get_db)):
"""批量导入观测点数据"""
try:
logger.info(f"Starting batch import checkpoints, count: {len(request.data)}")
# 直接使用字典列表,不需要转换
data_list = request.data
result = checkpoint_service.batch_import_checkpoints(db, data_list)
logger.info(f"Batch import checkpoints completed: {result['message']}")
return DataImportResponse(**result)
except Exception as e:
logger.error(f"Batch import checkpoints failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量导入观测点数据失败: {str(e)}"
)
@router.post("/batch_import_settlement_data", response_model=DataImportResponse)
def batch_import_settlement_data(request: BatchSettlementDataImportRequest, db: Session = Depends(get_db)):
"""批量导入沉降数据"""
try:
logger.info(f"Starting batch import settlement data, count: {len(request.data)}")
# 直接使用字典列表,不需要转换
data_list = request.data
result = settlement_service.batch_import_settlement_data(db, data_list)
logger.info(f"Batch import settlement data completed: {result['message']}")
return DataImportResponse(**result)
except Exception as e:
logger.error(f"Batch import settlement data failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量导入沉降数据失败: {str(e)}"
)
@router.post("/batch_import_level_data", response_model=DataImportResponse)
def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session = Depends(get_db)):
"""批量导入水准数据"""
try:
logger.info(f"Starting batch import level data, count: {len(request.data)}")
# 直接使用字典列表,不需要转换
data_list = request.data
result = level_service.batch_import_level_data(db, data_list)
logger.info(f"Batch import level data completed: {result['message']}")
return DataImportResponse(**result)
except Exception as e:
logger.error(f"Batch import level data failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量导入水准数据失败: {str(e)}"
)
@router.post("/batch_import_original_data", response_model=DataImportResponse)
def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Session = Depends(get_db)):
"""批量导入原始数据"""
try:
logger.info(f"Starting batch import original data, count: {len(request.data)}")
# 直接使用字典列表,不需要转换
data_list = request.data
result = original_service.batch_import_original_data(db, data_list)
logger.info(f"Batch import original data completed: {result['message']}")
return DataImportResponse(**result)
except Exception as e:
logger.error(f"Batch import original data failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量导入原始数据失败: {str(e)}"
)
# 查询断面数据对应观察点数据
@router.post("/get_section", response_model=DataResponse)
def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
logger.info(f"Querying section data with params: {request.dict()}")
# 调用服务层的业务方法
result_data = section_service.search_sections_with_checkpoints(
db,
id=request.id,
section_id=request.section_id,
mileage=request.mileage,
work_site=request.work_site,
number=request.number,
status=request.status,
account_id=request.account_id
)
logger.info(f"Found {len(result_data)} sections with checkpoints")
return DataResponse(
success=True,
message="查询成功",
count=len(result_data),
data=result_data
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"查询断面数据失败: {str(e)}"
)
# 根据观测点id查询沉降数据
@router.post("/get_settlement", response_model=DataResponse)
def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据按上传时间倒序排序支持limit参数限制返回数量"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
# 调用服务层的业务方法
result_data = settlement_service.search_settlement_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
limit=request.limit
)
logger.info(f"Found {len(result_data)} settlement records")
return DataResponse(
success=True,
message="查询成功",
count=len(result_data),
data=result_data
)
except Exception as e:
logger.error(f"Query settlement data failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"查询沉降数据失败: {str(e)}"
)
# 根据期数id获取原始数据
@router.post("/get_original", response_model=DataResponse)
def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)):
"""获取水准数据+原始数据"""
try:
logger.info(f"Querying original data with params: {request.dict()}")
# 调用综合服务的业务方法
result = comprehensive_service.get_level_and_original_data(
db,
id=request.id,
bfpcode=request.bfpcode,
bffb=request.bffb,
nyid=request.NYID,
linecode=request.linecode,
bfpl=request.bfpl
)
return DataResponse(
success=result["success"],
message=result["message"],
count=result["count"],
data=result["data"]
)
except Exception as e:
logger.error(f"Query original data failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"查询原始数据失败: {str(e)}"
)