From f89f2eef370798d4139f42ecab22571422b8fa31 Mon Sep 17 00:00:00 2001 From: lhx Date: Sun, 28 Sep 2025 15:17:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E5=9F=BA?= =?UTF-8?q?=E7=A1=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + app/api/comprehensive_data.py | 153 +++++++++++++++++++ app/main.py | 2 + app/models/checkpoint.py | 11 ++ app/models/level_data.py | 12 ++ app/models/original_data.py | 15 ++ app/models/section_data.py | 15 ++ app/models/settlement_data.py | 26 ++++ app/schemas/comprehensive_data.py | 95 ++++++++++++ app/services/base.py | 65 ++++++++ app/services/checkpoint.py | 109 ++++++++++++++ app/services/comprehensive.py | 190 +++++++++++++++++++++++ app/services/level_data.py | 120 +++++++++++++++ app/services/original_data.py | 106 +++++++++++++ app/services/section_data.py | 243 ++++++++++++++++++++++++++++++ app/services/settlement_data.py | 151 +++++++++++++++++++ 16 files changed, 1314 insertions(+) create mode 100644 app/api/comprehensive_data.py create mode 100644 app/models/checkpoint.py create mode 100644 app/models/level_data.py create mode 100644 app/models/original_data.py create mode 100644 app/models/section_data.py create mode 100644 app/models/settlement_data.py create mode 100644 app/schemas/comprehensive_data.py create mode 100644 app/services/base.py create mode 100644 app/services/checkpoint.py create mode 100644 app/services/comprehensive.py create mode 100644 app/services/level_data.py create mode 100644 app/services/original_data.py create mode 100644 app/services/section_data.py create mode 100644 app/services/settlement_data.py diff --git a/.gitignore b/.gitignore index 9864d9d..8a91539 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.pyc logs/ *.log +temp/ !README.md diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py new file mode 100644 index 0000000..cafa808 --- /dev/null +++ b/app/api/comprehensive_data.py @@ -0,0 +1,153 @@ +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session +from typing import List, Optional +from ..core.database import get_db +from ..schemas.comprehensive_data import ( + BatchSectionDataImportRequest, + BatchCheckpointDataImportRequest, + BatchSettlementDataImportRequest, + BatchLevelDataImportRequest, + BatchOriginalDataImportRequest, + DataImportResponse, + ComprehensiveDataImportRequest, + ComprehensiveDataImportResponse +) +from ..services.section_data import SectionDataService +from ..services.checkpoint import CheckpointService +from ..services.settlement_data import SettlementDataService +from ..services.level_data import LevelDataService +from ..services.original_data import OriginalDataService +import logging + +router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"]) +logger = logging.getLogger(__name__) + +# 实例化服务类 +section_service = SectionDataService() +checkpoint_service = CheckpointService() +settlement_service = SettlementDataService() +level_service = LevelDataService() +original_service = OriginalDataService() + +@router.post("/batch_import_sections", response_model=DataImportResponse) +def batch_import_sections(request: BatchSectionDataImportRequest, db: Session = Depends(get_db)): + """批量导入断面数据""" + try: + logger.info(f"Starting batch import sections, count: {len(request.data)}") + + # 转换为字典格式 + data_list = [item.dict() for item in request.data] + + result = section_service.batch_import_sections(db, data_list) + + logger.info(f"Batch import sections completed: {result['message']}") + return DataImportResponse(**result) + + except Exception as e: + logger.error(f"Batch import sections failed: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"批量导入断面数据失败: {str(e)}" + ) + +@router.post("/batch_import_checkpoints", response_model=DataImportResponse) +def batch_import_checkpoints(request: BatchCheckpointDataImportRequest, db: Session = Depends(get_db)): + """批量导入观测点数据""" + try: + logger.info(f"Starting batch import checkpoints, count: {len(request.data)}") + + # 转换为字典格式 + data_list = [item.dict() for item in request.data] + + result = checkpoint_service.batch_import_checkpoints(db, data_list) + + logger.info(f"Batch import checkpoints completed: {result['message']}") + return DataImportResponse(**result) + + except Exception as e: + logger.error(f"Batch import checkpoints failed: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"批量导入观测点数据失败: {str(e)}" + ) + +@router.post("/batch_import_settlement_data", response_model=DataImportResponse) +def batch_import_settlement_data(request: BatchSettlementDataImportRequest, db: Session = Depends(get_db)): + """批量导入沉降数据""" + try: + logger.info(f"Starting batch import settlement data, count: {len(request.data)}") + + # 转换为字典格式 + data_list = [item.dict() for item in request.data] + + result = settlement_service.batch_import_settlement_data(db, data_list) + + logger.info(f"Batch import settlement data completed: {result['message']}") + return DataImportResponse(**result) + + except Exception as e: + logger.error(f"Batch import settlement data failed: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"批量导入沉降数据失败: {str(e)}" + ) + +@router.post("/batch_import_level_data", response_model=DataImportResponse) +def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session = Depends(get_db)): + """批量导入水准数据""" + try: + logger.info(f"Starting batch import level data, count: {len(request.data)}") + + # 转换为字典格式 + data_list = [item.dict() for item in request.data] + + result = level_service.batch_import_level_data(db, data_list) + + logger.info(f"Batch import level data completed: {result['message']}") + return DataImportResponse(**result) + + except Exception as e: + logger.error(f"Batch import level data failed: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"批量导入水准数据失败: {str(e)}" + ) + +@router.post("/batch_import_original_data", response_model=DataImportResponse) +def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Session = Depends(get_db)): + """批量导入原始数据""" + try: + logger.info(f"Starting batch import original data, count: {len(request.data)}") + + # 转换为字典格式 + data_list = [item.dict() for item in request.data] + + result = original_service.batch_import_original_data(db, data_list) + + logger.info(f"Batch import original data completed: {result['message']}") + return DataImportResponse(**result) + + except Exception as e: + logger.error(f"Batch import original data failed: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"批量导入原始数据失败: {str(e)}" + ) + +# 保留原有接口以保持兼容性 +@router.post("/data_settlement_import", response_model=ComprehensiveDataImportResponse) +def data_import(request: ComprehensiveDataImportRequest, db: Session = Depends(get_db)): + """导入综合数据 (兼容旧接口)""" + try: + logger.info("Using legacy data import interface") + # 这里可以根据需要实现旧接口的兼容逻辑 + return ComprehensiveDataImportResponse( + success=True, + message="请使用新的批量导入接口" + ) + except Exception as e: + logger.error(f"Legacy data import failed: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"数据导入失败: {str(e)}" + ) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 55ecc2f..2468308 100644 --- a/app/main.py +++ b/app/main.py @@ -9,6 +9,7 @@ from .core.database import init_db from .api.account import router as account_router from .api.database import router as database_router from .api.task import router as task_router +from .api.comprehensive_data import router as comprehensive_data_router from .utils.scheduler import task_scheduler # 初始化日志系统 @@ -66,6 +67,7 @@ app.add_middleware( app.include_router(account_router, prefix="/api") app.include_router(database_router, prefix="/api") app.include_router(task_router, prefix="/api") +app.include_router(comprehensive_data_router, prefix="/api") # 根路径 @app.get("/") diff --git a/app/models/checkpoint.py b/app/models/checkpoint.py new file mode 100644 index 0000000..1ba5669 --- /dev/null +++ b/app/models/checkpoint.py @@ -0,0 +1,11 @@ +from sqlalchemy import Column, Integer, String +from ..core.database import Base + +class Checkpoint(Base): + __tablename__ = "checkpoint" + + id = Column(Integer, primary_key=True, index=True, autoincrement=True) + aname = Column(String(100), nullable=False, comment="观察点名称") + burial_date = Column(String(100), comment="埋设日期") + section_id = Column(String(100), nullable=False, comment="所属断面id") + point_id = Column(String(100), nullable=False, comment="观察点id") \ No newline at end of file diff --git a/app/models/level_data.py b/app/models/level_data.py new file mode 100644 index 0000000..64c0c01 --- /dev/null +++ b/app/models/level_data.py @@ -0,0 +1,12 @@ +from sqlalchemy import Column, Integer, String +from ..core.database import Base + +class LevelData(Base): + __tablename__ = "level_data" + + id = Column(Integer, primary_key=True, index=True, autoincrement=True) + linecode = Column(String(100), nullable=False, comment="水准线路编码", index=True) + benchmarkids = Column(String(100), comment="工作基点名称序列") + wsphigh = Column(String(100), comment="工作基点高程序列(m)") + NYID = Column(String(100), nullable=False, comment="期数id", index=True) + createDate = Column(String(100), comment="上传时间") \ No newline at end of file diff --git a/app/models/original_data.py b/app/models/original_data.py new file mode 100644 index 0000000..b2dae41 --- /dev/null +++ b/app/models/original_data.py @@ -0,0 +1,15 @@ +from sqlalchemy import Column, Integer, String, DateTime +from ..core.database import Base + +class OriginalData(Base): + __tablename__ = "original_data" + + id = Column(Integer, primary_key=True, index=True, autoincrement=True) + bfpcode = Column(String(1000), nullable=False, comment="前(后)视点名称") + mtime = Column(DateTime, nullable=False, comment="测点观测时间") + bffb = Column(String(1000), nullable=False, comment="前(后)视标记符") + bfpl = Column(String(1000), nullable=False, comment="前(后)视距离(m)") + bfpvalue = Column(String(1000), nullable=False, comment="前(后)视尺读数(m)") + times = Column(String(1000), nullable=False, comment="上传时间") + NYID = Column(String(100), nullable=False, comment="期数id", index=True) + sort = Column(Integer, comment="序号") \ No newline at end of file diff --git a/app/models/section_data.py b/app/models/section_data.py new file mode 100644 index 0000000..95011ea --- /dev/null +++ b/app/models/section_data.py @@ -0,0 +1,15 @@ +from sqlalchemy import Column, Integer, String +from ..core.database import Base + +class SectionData(Base): + __tablename__ = "section_data" + + id = Column(Integer, primary_key=True, index=True, autoincrement=True) + mileage = Column(String(100), nullable=False, comment="断面里程") + work_site = Column(String(100), nullable=False, comment="工点") + basic_types = Column(String(100), comment="基础类型") + height = Column(String(100), comment="桥墩台高度") + status = Column(String(100), nullable=False, comment="断面状态") + number = Column(String(100), nullable=False, comment="所在桥梁墩(台)编号", index=True) + transition_paragraph = Column(String(100), comment="过渡段") + section_id = Column(String(100), nullable=False, comment="断面id", index=True) \ No newline at end of file diff --git a/app/models/settlement_data.py b/app/models/settlement_data.py new file mode 100644 index 0000000..02c0d86 --- /dev/null +++ b/app/models/settlement_data.py @@ -0,0 +1,26 @@ +from sqlalchemy import Column, Integer, String +from ..core.database import Base + +class SettlementData(Base): + __tablename__ = "settlement_data" + + id = Column(Integer, primary_key=True, index=True, autoincrement=True) + point_id = Column(String(100), nullable=False, comment="观测点id", index=True) + CVALUE = Column(String(100), nullable=False, comment="修正量(m)") + MAVALUE = Column(String(100), nullable=False, comment="成果值(m)") + MTIME_W = Column(String(100), nullable=False, comment="观测时间") + NYID = Column(String(100), nullable=False, comment="期数id", index=True) + PRELOADH = Column(String(100), nullable=False) + PSTATE = Column(String(100), nullable=False) + REMARK = Column(String(100), comment="备注") + WORKINFO = Column(String(100)) + createdate = Column(String(100), nullable=False, comment="上传时间") + day = Column(String(100), nullable=False, comment="累计天数") + day_jg = Column(String(100), nullable=False, comment="两次观测时") + isgzjdxz = Column(String(100)) + mavalue_bc = Column(String(100), comment="本次沉降") + mavalue_lj = Column(String(100), comment="累计沉降") + sjName = Column(String(100), comment="司镜人员") + useflag = Column(String(100)) + workinfoname = Column(String(100), comment="观测阶段") + upd_remark = Column(String(1000), comment="备注") \ No newline at end of file diff --git a/app/schemas/comprehensive_data.py b/app/schemas/comprehensive_data.py new file mode 100644 index 0000000..83ea2fc --- /dev/null +++ b/app/schemas/comprehensive_data.py @@ -0,0 +1,95 @@ +from pydantic import BaseModel +from typing import Any, Dict, List, Optional + + +# 原始数据导入请求 +class OriginalDataImportRequest(BaseModel): + bfpcode: str + mtime: str + bffb: str + bfpl: str + bfpvalue: str + times: str + NYID: str + sort: Optional[int] = None + +# 水准数据导入请求 +class LevelDataImportRequest(BaseModel): + linecode: str + NYID: str + benchmarkids: Optional[str] = None + wsphigh: Optional[str] = None + createDate: Optional[str] = None + +# 沉降数据导入请求 +class SettlementDataImportRequest(BaseModel): + point_id: str + NYID: str + CVALUE: str + MAVALUE: str + MTIME_W: str + PRELOADH: str + PSTATE: str + createdate: str + day: str + day_jg: str + REMARK: Optional[str] = None + WORKINFO: Optional[str] = None + useflag: Optional[str] = None + mavalue_lj: Optional[str] = None + mavalue_bc: Optional[str] = None + sjName: Optional[str] = None + workinfoname: Optional[str] = None + isgzjdxz: Optional[str] = None + upd_remark: Optional[str] = None + +# 观测点数据导入请求 +class CheckpointDataImportRequest(BaseModel): + point_id: str + aname: str + section_id: str + burial_date: Optional[str] = None + +# 断面数据导入请求 +class SectionDataImportRequest(BaseModel): + section_id: str + mileage: str + work_site: str + status: str + number: str + basic_types: Optional[str] = None + height: Optional[str] = None + transition_paragraph: Optional[str] = None + +# 批量导入请求 +class BatchSectionDataImportRequest(BaseModel): + data: List[SectionDataImportRequest] + +class BatchCheckpointDataImportRequest(BaseModel): + data: List[CheckpointDataImportRequest] + +class BatchSettlementDataImportRequest(BaseModel): + data: List[SettlementDataImportRequest] + +class BatchLevelDataImportRequest(BaseModel): + data: List[LevelDataImportRequest] + +class BatchOriginalDataImportRequest(BaseModel): + data: List[OriginalDataImportRequest] + +# 响应模型 +class DataImportResponse(BaseModel): + success: bool + message: str + total_count: int + success_count: int + failed_count: int + failed_items: List[Dict[str, Any]] = [] + +# 兼容旧接口的模型 +class ComprehensiveDataImportRequest(BaseModel): + data: Dict[str, Any] + +class ComprehensiveDataImportResponse(BaseModel): + success: bool + message: str diff --git a/app/services/base.py b/app/services/base.py new file mode 100644 index 0000000..393bf7b --- /dev/null +++ b/app/services/base.py @@ -0,0 +1,65 @@ +from typing import Type, TypeVar, Generic, List, Optional, Any, Dict +from sqlalchemy.orm import Session +from sqlalchemy.ext.declarative import DeclarativeMeta + +ModelType = TypeVar("ModelType") + +class BaseService(Generic[ModelType]): + def __init__(self, model: Type[ModelType]): + self.model = model + + def create(self, db: Session, obj_data: Dict[str, Any]) -> ModelType: + """创建记录""" + db_obj = self.model(**obj_data) + db.add(db_obj) + db.commit() + db.refresh(db_obj) + return db_obj + + def get_by_id(self, db: Session, obj_id: int) -> Optional[ModelType]: + """根据ID获取记录""" + return db.query(self.model).filter(self.model.id == obj_id).first() + + def get_all(self, db: Session, skip: int = 0, limit: int = 100) -> List[ModelType]: + """获取所有记录""" + return db.query(self.model).offset(skip).limit(limit).all() + + def update(self, db: Session, obj_id: int, update_data: Dict[str, Any]) -> Optional[ModelType]: + """更新记录""" + db_obj = self.get_by_id(db, obj_id) + if db_obj: + for field, value in update_data.items(): + if hasattr(db_obj, field): + setattr(db_obj, field, value) + db.commit() + db.refresh(db_obj) + return db_obj + return None + + def delete(self, db: Session, obj_id: int) -> bool: + """删除记录""" + db_obj = self.get_by_id(db, obj_id) + if db_obj: + db.delete(db_obj) + db.commit() + return True + return False + + def get_by_field(self, db: Session, field_name: str, field_value: Any) -> List[ModelType]: + """根据字段值查询记录""" + if hasattr(self.model, field_name): + field = getattr(self.model, field_name) + return db.query(self.model).filter(field == field_value).all() + return [] + + def search_by_conditions(self, db: Session, conditions: Dict[str, Any]) -> List[ModelType]: + """根据多个条件搜索记录""" + query = db.query(self.model) + for field_name, field_value in conditions.items(): + if hasattr(self.model, field_name) and field_value is not None: + field = getattr(self.model, field_name) + if isinstance(field_value, str): + query = query.filter(field.like(f"%{field_value}%")) + else: + query = query.filter(field == field_value) + return query.all() \ No newline at end of file diff --git a/app/services/checkpoint.py b/app/services/checkpoint.py new file mode 100644 index 0000000..ce86c5d --- /dev/null +++ b/app/services/checkpoint.py @@ -0,0 +1,109 @@ +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any +from ..models.checkpoint import Checkpoint +from .base import BaseService + +class CheckpointService(BaseService[Checkpoint]): + def __init__(self): + super().__init__(Checkpoint) + + def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]: + """根据断面ID获取观测点""" + return self.get_by_field(db, "section_id", section_id) + + def get_by_point_id(self, db: Session, point_id: str) -> Optional[Checkpoint]: + """根据观测点ID获取观测点""" + checkpoints = self.get_by_field(db, "point_id", point_id) + return checkpoints[0] if checkpoints else None + + def search_checkpoints(self, db: Session, + aname: Optional[str] = None, + section_id: Optional[str] = None, + point_id: Optional[str] = None) -> List[Checkpoint]: + """根据多个条件搜索观测点""" + conditions = {} + if aname is not None: + conditions["aname"] = aname + if section_id is not None: + conditions["section_id"] = section_id + if point_id is not None: + conditions["point_id"] = point_id + + return self.search_by_conditions(db, conditions) + + def batch_import_checkpoints(self, db: Session, data: List) -> Dict[str, Any]: + """ + 批量导入观测点数据,根据观测点ID判断是否重复,重复数据改为更新操作 + 支持事务回滚,失败时重试一次 + """ + import logging + logger = logging.getLogger(__name__) + + total_count = len(data) + success_count = 0 + failed_count = 0 + failed_items = [] + + for attempt in range(2): # 最多重试1次 + try: + db.begin() + success_count = 0 + failed_count = 0 + failed_items = [] + + for item_data in data: + try: + checkpoint = self.get_by_point_id(db, item_data.get('point_id')) + if checkpoint: + # 更新操作 + checkpoint.aname = item_data.get('aname') + checkpoint.section_id = item_data.get('section_id') + checkpoint.burial_date = item_data.get('burial_date') + logger.info(f"Updated checkpoint: {item_data.get('point_id')}") + else: + # 新增操作 + checkpoint = Checkpoint( + point_id=item_data.get('point_id'), + aname=item_data.get('aname'), + section_id=item_data.get('section_id'), + burial_date=item_data.get('burial_date') + ) + db.add(checkpoint) + logger.info(f"Created checkpoint: {item_data.get('point_id')}") + + success_count += 1 + except Exception as e: + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': str(e) + }) + logger.error(f"Failed to process checkpoint {item_data.get('point_id')}: {str(e)}") + raise e + + db.commit() + logger.info(f"Batch import checkpoints completed. Success: {success_count}, Failed: {failed_count}") + break + + except Exception as e: + db.rollback() + logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + if attempt == 1: # 最后一次重试失败 + logger.error("Batch import checkpoints failed after retries") + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } + + return { + 'success': failed_count == 0, + 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } \ No newline at end of file diff --git a/app/services/comprehensive.py b/app/services/comprehensive.py new file mode 100644 index 0000000..f97106a --- /dev/null +++ b/app/services/comprehensive.py @@ -0,0 +1,190 @@ +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any +from .section_data import SectionDataService +from .checkpoint import CheckpointService +from .settlement_data import SettlementDataService +from .level_data import LevelDataService +from .original_data import OriginalDataService + +class ComprehensiveDataService: + """综合数据服务类 - 提供跨表关系查询和业务分析功能""" + + def __init__(self): + self.section_service = SectionDataService() + self.checkpoint_service = CheckpointService() + self.settlement_service = SettlementDataService() + self.level_service = LevelDataService() + self.original_service = OriginalDataService() + + def get_complete_section_tree(self, db: Session, section_id: str) -> Dict[str, Any]: + """获取完整的断面数据树结构""" + return self.section_service.get_section_with_all_data(db, section_id) + + def get_nyid_related_data(self, db: Session, nyid: str) -> Dict[str, Any]: + """根据期数ID获取所有相关数据""" + settlement_data = self.settlement_service.get_by_nyid(db, nyid) + level_data = self.level_service.get_by_nyid(db, nyid) + original_data = self.original_service.get_by_nyid(db, nyid) + + related_sections = [] + related_checkpoints = [] + + for settlement in settlement_data: + point_id = settlement.point_id + checkpoint = self.checkpoint_service.get_by_point_id(db, point_id) + if checkpoint and checkpoint not in related_checkpoints: + related_checkpoints.append(checkpoint) + + section = self.section_service.get_by_section_id(db, checkpoint.section_id) + if section and section not in related_sections: + related_sections.append(section) + + return { + "nyid": nyid, + "settlement_data": settlement_data, + "level_data": level_data, + "original_data": original_data, + "related_checkpoints": related_checkpoints, + "related_sections": related_sections, + "summary": { + "settlement_count": len(settlement_data), + "level_count": len(level_data), + "original_count": len(original_data), + "checkpoint_count": len(related_checkpoints), + "section_count": len(related_sections) + } + } + + def get_point_monitoring_history(self, db: Session, point_id: str) -> Dict[str, Any]: + """获取观测点的完整监测历史""" + checkpoint = self.checkpoint_service.get_by_point_id(db, point_id) + if not checkpoint: + return {} + + settlement_data = self.settlement_service.get_by_point_id(db, point_id) + section = self.section_service.get_by_section_id(db, checkpoint.section_id) + + all_level_data = [] + all_original_data = [] + + for settlement in settlement_data: + nyid = settlement.NYID + level_data = self.level_service.get_by_nyid(db, nyid) + original_data = self.original_service.get_by_nyid(db, nyid) + all_level_data.extend(level_data) + all_original_data.extend(original_data) + + return { + "checkpoint": checkpoint, + "section": section, + "settlement_history": settlement_data, + "level_data": all_level_data, + "original_data": all_original_data, + "summary": { + "monitoring_periods": len(settlement_data), + "level_records": len(all_level_data), + "original_records": len(all_original_data) + } + } + + def search_by_multiple_ids(self, db: Session, + section_ids: Optional[List[str]] = None, + point_ids: Optional[List[str]] = None, + nyids: Optional[List[str]] = None) -> Dict[str, Any]: + """根据多种ID类型进行综合搜索""" + result = { + "sections": [], + "checkpoints": [], + "settlement_data": [], + "level_data": [], + "original_data": [] + } + + if section_ids: + for section_id in section_ids: + section = self.section_service.get_by_section_id(db, section_id) + if section: + result["sections"].append(section) + + if point_ids: + for point_id in point_ids: + checkpoint = self.checkpoint_service.get_by_point_id(db, point_id) + if checkpoint: + result["checkpoints"].append(checkpoint) + + settlement_data = self.settlement_service.get_by_point_id(db, point_id) + result["settlement_data"].extend(settlement_data) + + if nyids: + for nyid in nyids: + settlement_data = self.settlement_service.get_by_nyid(db, nyid) + level_data = self.level_service.get_by_nyid(db, nyid) + original_data = self.original_service.get_by_nyid(db, nyid) + + result["settlement_data"].extend(settlement_data) + result["level_data"].extend(level_data) + result["original_data"].extend(original_data) + + result["summary"] = { + "section_count": len(result["sections"]), + "checkpoint_count": len(result["checkpoints"]), + "settlement_count": len(result["settlement_data"]), + "level_count": len(result["level_data"]), + "original_count": len(result["original_data"]) + } + + return result + + def get_work_site_overview(self, db: Session, work_site: str) -> Dict[str, Any]: + """获取工点的全览数据""" + sections = self.section_service.search_section_data(db, work_site=work_site) + + all_checkpoints = [] + all_settlement_data = [] + all_level_data = [] + all_original_data = [] + + for section in sections: + section_data = self.section_service.get_section_with_all_data(db, section.section_id) + all_checkpoints.extend(section_data.get("checkpoints", [])) + all_settlement_data.extend(section_data.get("settlement_data", [])) + all_level_data.extend(section_data.get("level_data", [])) + all_original_data.extend(section_data.get("original_data", [])) + + return { + "work_site": work_site, + "sections": sections, + "checkpoints": all_checkpoints, + "settlement_data": all_settlement_data, + "level_data": all_level_data, + "original_data": all_original_data, + "summary": { + "section_count": len(sections), + "checkpoint_count": len(all_checkpoints), + "settlement_count": len(all_settlement_data), + "level_count": len(all_level_data), + "original_count": len(all_original_data) + } + } + + def get_statistics_summary(self, db: Session) -> Dict[str, Any]: + """获取全局统计摘要""" + all_sections = self.section_service.get_all(db, limit=10000) + all_checkpoints = self.checkpoint_service.get_all(db, limit=10000) + all_settlement = self.settlement_service.get_all(db, limit=10000) + all_level = self.level_service.get_all(db, limit=10000) + all_original = self.original_service.get_all(db, limit=10000) + + work_sites = list(set([s.work_site for s in all_sections if s.work_site])) + + return { + "total_counts": { + "sections": len(all_sections), + "checkpoints": len(all_checkpoints), + "settlement_records": len(all_settlement), + "level_records": len(all_level), + "original_records": len(all_original), + "work_sites": len(work_sites) + }, + "work_sites": work_sites + } \ No newline at end of file diff --git a/app/services/level_data.py b/app/services/level_data.py new file mode 100644 index 0000000..7c9ce7d --- /dev/null +++ b/app/services/level_data.py @@ -0,0 +1,120 @@ +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any +from ..models.level_data import LevelData +from .base import BaseService + +class LevelDataService(BaseService[LevelData]): + def __init__(self): + super().__init__(LevelData) + + def get_by_nyid(self, db: Session, nyid: str) -> List[LevelData]: + """根据期数ID获取水准数据""" + return self.get_by_field(db, "NYID", nyid) + + def get_by_linecode(self, db: Session, linecode: str) -> List[LevelData]: + """根据水准线路编码获取水准数据""" + return self.get_by_field(db, "linecode", linecode) + + def search_level_data(self, db: Session, + linecode: Optional[str] = None, + nyid: Optional[str] = None, + benchmarkids: Optional[str] = None) -> List[LevelData]: + """根据多个条件搜索水准数据""" + conditions = {} + if linecode is not None: + conditions["linecode"] = linecode + if nyid is not None: + conditions["NYID"] = nyid + if benchmarkids is not None: + conditions["benchmarkids"] = benchmarkids + + return self.search_by_conditions(db, conditions) + + def get_by_nyid_and_linecode(self, db: Session, nyid: str, linecode: str) -> Optional[LevelData]: + """根据期数ID和线路编码获取水准数据""" + return db.query(LevelData).filter( + LevelData.NYID == nyid, + LevelData.linecode == linecode + ).first() + + def batch_import_level_data(self, db: Session, data: List) -> Dict[str, Any]: + """ + 批量导入水准数据,根据期数ID和线路编码判断是否重复,重复数据改为更新操作 + 支持事务回滚,失败时重试一次 + """ + import logging + logger = logging.getLogger(__name__) + + total_count = len(data) + success_count = 0 + failed_count = 0 + failed_items = [] + + for attempt in range(2): # 最多重试1次 + try: + db.begin() + success_count = 0 + failed_count = 0 + failed_items = [] + + for item_data in data: + try: + level_data = self.get_by_nyid_and_linecode( + db, + item_data.get('NYID'), + item_data.get('linecode') + ) + if level_data: + # 更新操作 + level_data.benchmarkids = item_data.get('benchmarkids') + level_data.wsphigh = item_data.get('wsphigh') + level_data.createDate = item_data.get('createDate') + logger.info(f"Updated level data: {item_data.get('linecode')}-{item_data.get('NYID')}") + else: + # 新增操作 + level_data = LevelData( + linecode=item_data.get('linecode'), + benchmarkids=item_data.get('benchmarkids'), + wsphigh=item_data.get('wsphigh'), + NYID=item_data.get('NYID'), + createDate=item_data.get('createDate') + ) + db.add(level_data) + logger.info(f"Created level data: {item_data.get('linecode')}-{item_data.get('NYID')}") + + success_count += 1 + except Exception as e: + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': str(e) + }) + logger.error(f"Failed to process level data {item_data.get('linecode')}-{item_data.get('NYID')}: {str(e)}") + raise e + + db.commit() + logger.info(f"Batch import level data completed. Success: {success_count}, Failed: {failed_count}") + break + + except Exception as e: + db.rollback() + logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + if attempt == 1: # 最后一次重试失败 + logger.error("Batch import level data failed after retries") + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } + + return { + 'success': failed_count == 0, + 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } \ No newline at end of file diff --git a/app/services/original_data.py b/app/services/original_data.py new file mode 100644 index 0000000..4789e24 --- /dev/null +++ b/app/services/original_data.py @@ -0,0 +1,106 @@ +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any +from ..models.original_data import OriginalData +from .base import BaseService + +class OriginalDataService(BaseService[OriginalData]): + def __init__(self): + super().__init__(OriginalData) + + def get_by_nyid(self, db: Session, nyid: str) -> List[OriginalData]: + """根据期数ID获取原始数据""" + return self.get_by_field(db, "NYID", nyid) + + def get_by_bfpcode(self, db: Session, bfpcode: str) -> List[OriginalData]: + """根据前(后)视点名称获取原始数据""" + return self.get_by_field(db, "bfpcode", bfpcode) + + def search_original_data(self, db: Session, + bfpcode: Optional[str] = None, + bffb: Optional[str] = None, + nyid: Optional[str] = None, + bfpl: Optional[str] = None) -> List[OriginalData]: + """根据多个条件搜索原始数据""" + conditions = {} + if bfpcode is not None: + conditions["bfpcode"] = bfpcode + if bffb is not None: + conditions["bffb"] = bffb + if nyid is not None: + conditions["NYID"] = nyid + if bfpl is not None: + conditions["bfpl"] = bfpl + + return self.search_by_conditions(db, conditions) + + def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: + """ + 批量导入原始数据,直接新增,无需检查重复 + 支持事务回滚,失败时重试一次 + """ + import logging + logger = logging.getLogger(__name__) + + total_count = len(data) + success_count = 0 + failed_count = 0 + failed_items = [] + + for attempt in range(2): # 最多重试1次 + try: + db.begin() + success_count = 0 + failed_count = 0 + failed_items = [] + + for item_data in data: + try: + # 直接新增操作 + original_data = OriginalData( + bfpcode=item_data.get('bfpcode'), + mtime=item_data.get('mtime'), + bffb=item_data.get('bffb'), + bfpl=item_data.get('bfpl'), + bfpvalue=item_data.get('bfpvalue'), + times=item_data.get('times'), + NYID=item_data.get('NYID'), + sort=item_data.get('sort') + ) + db.add(original_data) + logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')}") + success_count += 1 + except Exception as e: + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': str(e) + }) + logger.error(f"Failed to process original data {item_data.get('bfpcode')}-{item_data.get('NYID')}: {str(e)}") + raise e + + db.commit() + logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") + break + + except Exception as e: + db.rollback() + logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + if attempt == 1: # 最后一次重试失败 + logger.error("Batch import original data failed after retries") + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } + + return { + 'success': failed_count == 0, + 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } \ No newline at end of file diff --git a/app/services/section_data.py b/app/services/section_data.py new file mode 100644 index 0000000..533e026 --- /dev/null +++ b/app/services/section_data.py @@ -0,0 +1,243 @@ +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any +from ..models.section_data import SectionData +from .base import BaseService +from .checkpoint import CheckpointService +from .settlement_data import SettlementDataService +from .level_data import LevelDataService +from .original_data import OriginalDataService +from typing import Dict + +class SectionDataService(BaseService[SectionData]): + def __init__(self): + super().__init__(SectionData) + self.checkpoint_service = CheckpointService() + self.settlement_service = SettlementDataService() + self.level_service = LevelDataService() + self.original_service = OriginalDataService() + + def get_by_section_id(self, db: Session, section_id: str) -> Optional[SectionData]: + """根据断面ID获取断面数据""" + sections = self.get_by_field(db, "section_id", section_id) + return sections[0] if sections else None + + def get_by_number(self, db: Session, number: str) -> List[SectionData]: + """根据桥梁墩(台)编号获取断面数据""" + return self.get_by_field(db, "number", number) + + def search_section_data(self, db: Session, + section_id: Optional[str] = None, + work_site: Optional[str] = None, + number: Optional[str] = None, + status: Optional[str] = None, + basic_types: Optional[str] = None) -> List[SectionData]: + """根据多个条件搜索断面数据""" + conditions = {} + if section_id is not None: + conditions["section_id"] = section_id + if work_site is not None: + conditions["work_site"] = work_site + if number is not None: + conditions["number"] = number + if status is not None: + conditions["status"] = status + if basic_types is not None: + conditions["basic_types"] = basic_types + + return self.search_by_conditions(db, conditions) + + def get_section_with_checkpoints(self, db: Session, section_id: str) -> Dict[str, Any]: + """获取断面数据及其关联的观测点""" + section = self.get_by_section_id(db, section_id) + if not section: + return {} + + checkpoints = self.checkpoint_service.get_by_section_id(db, section_id) + + return { + "section": section, + "checkpoints": checkpoints, + "checkpoint_count": len(checkpoints) + } + + def get_section_with_all_data(self, db: Session, section_id: str) -> Dict[str, Any]: + """获取断面数据及其所有关联数据(观测点、沉降数据、原始数据)""" + section = self.get_by_section_id(db, section_id) + if not section: + return {} + + checkpoints = self.checkpoint_service.get_by_section_id(db, section_id) + + all_settlement_data = [] + all_original_data = [] + all_level_data = [] + + for checkpoint in checkpoints: + point_id = checkpoint.point_id + + settlement_data = self.settlement_service.get_by_point_id(db, point_id) + all_settlement_data.extend(settlement_data) + + for settlement in settlement_data: + nyid = settlement.NYID + + level_data = self.level_service.get_by_nyid(db, nyid) + all_level_data.extend(level_data) + + original_data = self.original_service.get_by_nyid(db, nyid) + all_original_data.extend(original_data) + + return { + "section": section, + "checkpoints": checkpoints, + "settlement_data": all_settlement_data, + "level_data": all_level_data, + "original_data": all_original_data, + "summary": { + "checkpoint_count": len(checkpoints), + "settlement_data_count": len(all_settlement_data), + "level_data_count": len(all_level_data), + "original_data_count": len(all_original_data) + } + } + + def get_sections_by_checkpoint_point_id(self, db: Session, point_id: str) -> List[SectionData]: + """根据观测点ID反向查找断面数据""" + checkpoint = self.checkpoint_service.get_by_point_id(db, point_id) + if checkpoint: + return [self.get_by_section_id(db, checkpoint.section_id)] + return [] + + def get_sections_by_settlement_nyid(self, db: Session, nyid: str) -> List[SectionData]: + """根据期数ID反向查找相关的断面数据""" + settlement_data = self.settlement_service.get_by_nyid(db, nyid) + sections = [] + + for settlement in settlement_data: + point_id = settlement.point_id + checkpoint = self.checkpoint_service.get_by_point_id(db, point_id) + if checkpoint: + section = self.get_by_section_id(db, checkpoint.section_id) + if section and section not in sections: + sections.append(section) + + return sections + + def get_settlement_data_by_section(self, db: Session, section_id: str) -> List: + """获取指定断面的所有沉降数据""" + checkpoints = self.checkpoint_service.get_by_section_id(db, section_id) + all_settlement_data = [] + + for checkpoint in checkpoints: + settlement_data = self.settlement_service.get_by_point_id(db, checkpoint.point_id) + all_settlement_data.extend(settlement_data) + + return all_settlement_data + + def get_original_data_by_section(self, db: Session, section_id: str) -> List: + """获取指定断面的所有原始数据""" + settlement_data = self.get_settlement_data_by_section(db, section_id) + all_original_data = [] + + for settlement in settlement_data: + original_data = self.original_service.get_by_nyid(db, settlement.NYID) + all_original_data.extend(original_data) + + return all_original_data + + def get_level_data_by_section(self, db: Session, section_id: str) -> List: + """获取指定断面的所有水准数据""" + settlement_data = self.get_settlement_data_by_section(db, section_id) + all_level_data = [] + + for settlement in settlement_data: + level_data = self.level_service.get_by_nyid(db, settlement.NYID) + all_level_data.extend(level_data) + + return all_level_data + + def batch_import_sections(self, db: Session, data: List) -> Dict[str, Any]: + """ + 批量导入断面数据,根据断面id判断是否重复,重复数据改为更新操作 + 支持事务回滚,失败时重试一次 + """ + import logging + logger = logging.getLogger(__name__) + + total_count = len(data) + success_count = 0 + failed_count = 0 + failed_items = [] + + for attempt in range(2): # 最多重试1次 + try: + db.begin() + success_count = 0 + failed_count = 0 + failed_items = [] + + for item_data in data: + try: + section = self.get_by_section_id(db, item_data.get('section_id')) + if section: + # 更新操作 + section.mileage = item_data.get('mileage') + section.work_site = item_data.get('work_site') + section.basic_types = item_data.get('basic_types') + section.height = item_data.get('height') + section.status = item_data.get('status') + section.number = item_data.get('number') + section.transition_paragraph = item_data.get('transition_paragraph') + logger.info(f"Updated section: {item_data.get('section_id')}") + else: + # 新增操作 + from ..models.section_data import SectionData + section = SectionData( + section_id=item_data.get('section_id'), + mileage=item_data.get('mileage'), + work_site=item_data.get('work_site'), + basic_types=item_data.get('basic_types'), + height=item_data.get('height'), + status=item_data.get('status'), + number=item_data.get('number'), + transition_paragraph=item_data.get('transition_paragraph') + ) + db.add(section) + logger.info(f"Created section: {item_data.get('section_id')}") + + success_count += 1 + except Exception as e: + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': str(e) + }) + logger.error(f"Failed to process section {item_data.get('section_id')}: {str(e)}") + raise e + + db.commit() + logger.info(f"Batch import sections completed. Success: {success_count}, Failed: {failed_count}") + break + + except Exception as e: + db.rollback() + logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + if attempt == 1: # 最后一次重试失败 + logger.error("Batch import sections failed after retries") + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } + + return { + 'success': failed_count == 0, + 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } \ No newline at end of file diff --git a/app/services/settlement_data.py b/app/services/settlement_data.py new file mode 100644 index 0000000..3508f55 --- /dev/null +++ b/app/services/settlement_data.py @@ -0,0 +1,151 @@ +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any +from ..models.settlement_data import SettlementData +from .base import BaseService + +class SettlementDataService(BaseService[SettlementData]): + def __init__(self): + super().__init__(SettlementData) + + def get_by_point_id(self, db: Session, point_id: str) -> List[SettlementData]: + """根据观测点ID获取沉降数据""" + return self.get_by_field(db, "point_id", point_id) + + def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]: + """根据期数ID获取沉降数据""" + return self.get_by_field(db, "NYID", nyid) + + def get_by_point_and_nyid(self, db: Session, point_id: str, nyid: str) -> Optional[SettlementData]: + """根据观测点ID和期数ID获取沉降数据""" + return db.query(SettlementData).filter( + SettlementData.point_id == point_id, + SettlementData.NYID == nyid + ).first() + + def search_settlement_data(self, db: Session, + point_id: Optional[str] = None, + nyid: Optional[str] = None, + sjName: Optional[str] = None, + workinfoname: Optional[str] = None) -> List[SettlementData]: + """根据多个条件搜索沉降数据""" + conditions = {} + if point_id is not None: + conditions["point_id"] = point_id + if nyid is not None: + conditions["NYID"] = nyid + if sjName is not None: + conditions["sjName"] = sjName + if workinfoname is not None: + conditions["workinfoname"] = workinfoname + + return self.search_by_conditions(db, conditions) + + def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]: + """ + 批量导入沉降数据,根据观测点ID和期数ID判断是否重复,重复数据改为更新操作 + 支持事务回滚,失败时重试一次 + """ + import logging + logger = logging.getLogger(__name__) + + total_count = len(data) + success_count = 0 + failed_count = 0 + failed_items = [] + + for attempt in range(2): # 最多重试1次 + try: + db.begin() + success_count = 0 + failed_count = 0 + failed_items = [] + + for item_data in data: + try: + settlement = self.get_by_point_and_nyid( + db, + item_data.get('point_id'), + item_data.get('NYID') + ) + if settlement: + # 更新操作 + settlement.CVALUE = item_data.get('CVALUE') + settlement.MAVALUE = item_data.get('MAVALUE') + settlement.MTIME_W = item_data.get('MTIME_W') + settlement.PRELOADH = item_data.get('PRELOADH') + settlement.PSTATE = item_data.get('PSTATE') + settlement.REMARK = item_data.get('REMARK') + settlement.WORKINFO = item_data.get('WORKINFO') + settlement.createdate = item_data.get('createdate') + settlement.day = item_data.get('day') + settlement.day_jg = item_data.get('day_jg') + settlement.isgzjdxz = item_data.get('isgzjdxz') + settlement.mavalue_bc = item_data.get('mavalue_bc') + settlement.mavalue_lj = item_data.get('mavalue_lj') + settlement.sjName = item_data.get('sjName') + settlement.useflag = item_data.get('useflag') + settlement.workinfoname = item_data.get('workinfoname') + settlement.upd_remark = item_data.get('upd_remark') + logger.info(f"Updated settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") + else: + # 新增操作 + settlement = SettlementData( + point_id=item_data.get('point_id'), + CVALUE=item_data.get('CVALUE'), + MAVALUE=item_data.get('MAVALUE'), + MTIME_W=item_data.get('MTIME_W'), + NYID=item_data.get('NYID'), + PRELOADH=item_data.get('PRELOADH'), + PSTATE=item_data.get('PSTATE'), + REMARK=item_data.get('REMARK'), + WORKINFO=item_data.get('WORKINFO'), + createdate=item_data.get('createdate'), + day=item_data.get('day'), + day_jg=item_data.get('day_jg'), + isgzjdxz=item_data.get('isgzjdxz'), + mavalue_bc=item_data.get('mavalue_bc'), + mavalue_lj=item_data.get('mavalue_lj'), + sjName=item_data.get('sjName'), + useflag=item_data.get('useflag'), + workinfoname=item_data.get('workinfoname'), + upd_remark=item_data.get('upd_remark') + ) + db.add(settlement) + logger.info(f"Created settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") + + success_count += 1 + except Exception as e: + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': str(e) + }) + logger.error(f"Failed to process settlement data {item_data.get('point_id')}-{item_data.get('NYID')}: {str(e)}") + raise e + + db.commit() + logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}") + break + + except Exception as e: + db.rollback() + logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + if attempt == 1: # 最后一次重试失败 + logger.error("Batch import settlement data failed after retries") + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } + + return { + 'success': failed_count == 0, + 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } \ No newline at end of file