From f8ed6cb5d30a98aa6abd39f222b912fe50a78d41 Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 15 Nov 2025 16:36:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=A0=87=E6=AE=B5=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=9C=80=E5=A4=A7=E6=B0=B4=E5=87=86=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/comprehensive_data.py | 27 +++++++++++++-------------- app/api/level_data.py | 2 +- app/schemas/level_data.py | 1 + app/services/level_data.py | 18 ++++++++++++------ app/services/original_data.py | 3 +-- 5 files changed, 28 insertions(+), 23 deletions(-) diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py index 7f9462b..e8eb66d 100644 --- a/app/api/comprehensive_data.py +++ b/app/api/comprehensive_data.py @@ -130,7 +130,6 @@ def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session = logger.info(f"Starting batch import level data, count: {len(request.data)}") data_list = request.data result = level_service.batch_import_level_data(db, data_list) - logger.info(f"Batch import level data completed: {result['message']}") return DataImportResponse( code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED, @@ -390,17 +389,17 @@ def get_settlement_by_linecode( try: linecode = request.linecode # 从请求体中获取linecode logger.info(f"接口请求:根据linecode={linecode}查询沉降数据") - + settlement_service = SettlementDataService() result = settlement_service.get_settlement_by_linecode(db, linecode) - + return DataResponse( code=ResponseCode.SUCCESS, message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据", total=len(result['settlement_data']), data=result['settlement_data'] ) - + except Exception as e: logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True) return DataResponse( @@ -417,7 +416,7 @@ def get_settlement_by_nyid( try: nyid = request.NYID # 从请求体中获取nyid logger.info(f"接口请求:根据nyid={nyid}查询沉降数据") - + settlement = SettlementDataService() # 获取模型实例列表 checkpoint_instances = settlement.get_by_nyid(db, nyid=nyid) @@ -425,14 +424,14 @@ def get_settlement_by_nyid( checkpoint_data = [instance.__dict__ for instance in checkpoint_instances] # 清理 SQLAlchemy 内部属性(可选,避免多余字段) checkpoint_data = [{k: v for k, v in item.items() if not k.startswith('_sa_')} for item in checkpoint_data] - + return DataResponse( code=ResponseCode.SUCCESS, message=f"查询成功,共获取{len(checkpoint_data)}条沉降数据,nyid={nyid}", total=len(checkpoint_data), data=checkpoint_data ) - + except Exception as e: logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True) return DataResponse( @@ -448,17 +447,17 @@ def get_today_data(request: TodayDataRequest, db: Session = Depends(get_db)): # 获取请求参数(如果需要从请求体中接收参数,可通过request获取) # 示例:如需接收account_id,可通过 request.account_id 获取 # account_id = request.account_id # 根据根据实际需求决定是否需要 - + # 触发定时任务(如果需要传入参数,可在这里添加) from ..utils.scheduler import scheduled_get_max_nyid_by_point_id scheduled_get_max_nyid_by_point_id() - + # 调用服务层获取数据 account_id = request.account_id daily_service = DailyDataService() # 如需使用请求参数,可修改为 daily_service.get_daily_data_by_account(db, account_id=account_id) daily_data = daily_service.get_daily_data_by_account(db, account_id=account_id) - + return DataResponse( code=ResponseCode.SUCCESS, message="定时时任务触发执行成功!任务已开始处理(具体结果查看系统日志)", @@ -526,7 +525,7 @@ def get_checkpoint_by_point(request: PointByAccountRequest, db: Session = Depend point_id = request.point_id checkpoint_service = CheckpointService() result_data = checkpoint_service.get_by_point_id(db, point_id=point_id) - + # 使用 __dict__ 转换(过滤内部属性) if result_data: # 复制字典并排除 SQLAlchemy 内部属性 @@ -535,7 +534,7 @@ def get_checkpoint_by_point(request: PointByAccountRequest, db: Session = Depend data_list = [data_dict] else: data_list = [] - + return DataResponse( code=ResponseCode.SUCCESS, message="查询成功", @@ -562,7 +561,7 @@ def get_checkpoint_by_point(request: LevelDataQueryRequest, db: Session = Depend NYID = result_data.NYID if result_data else None settlement_service = SettlementDataService() result_data = settlement_service.get_by_nyid(db, nyid=NYID) - + # 使用 __dict__ 转换(过滤内部属性) if result_data: # 复制字典并排除 SQLAlchemy 内部属性 @@ -571,7 +570,7 @@ def get_checkpoint_by_point(request: LevelDataQueryRequest, db: Session = Depend data_list = [data_dict] else: data_list = [] - + return DataResponse( code=ResponseCode.SUCCESS, message="查询成功", diff --git a/app/api/level_data.py b/app/api/level_data.py index 9b2f32f..fef76a4 100644 --- a/app/api/level_data.py +++ b/app/api/level_data.py @@ -19,7 +19,7 @@ def get_level_data_by_project(request: LevelDataRequest, db: Session = Depends(g """ try: level_service = LevelDataService() - level_data_list = level_service.get_level_data_by_project_name(db, request.project_name) + level_data_list = level_service.get_level_data_by_project_name(db, project_name=request.project_name, nyid_max=request.nyid_max) return LevelDataListResponse( code=ResponseCode.SUCCESS, diff --git a/app/schemas/level_data.py b/app/schemas/level_data.py index e7e3c20..4c9c312 100644 --- a/app/schemas/level_data.py +++ b/app/schemas/level_data.py @@ -22,6 +22,7 @@ class LevelDataResponse(LevelDataBase): class LevelDataRequest(BaseModel): """水准数据请求模型""" project_name: str = Field(..., description="标段名称") + nyid_max: Optional[bool] = Field(False, description="是否只获取最新期数的水准数据") class LevelDataListResponse(BaseModel): """水准数据列表响应格式""" diff --git a/app/services/level_data.py b/app/services/level_data.py index 6e2798a..f5237aa 100644 --- a/app/services/level_data.py +++ b/app/services/level_data.py @@ -148,7 +148,7 @@ class LevelDataService(BaseService[LevelData]): 'failed_items': failed_items } - def get_level_data_by_project_name(self, db: Session, project_name: str) -> List[Dict[str, Any]]: + def get_level_data_by_project_name(self, db: Session, project_name: str, nyid_max: bool = False) -> List[Dict[str, Any]]: """ 通过project_name获取全部水准线路 业务逻辑: @@ -198,11 +198,17 @@ class LevelDataService(BaseService[LevelData]): nyid_list = list(set([settlement.NYID for settlement in settlements if settlement.NYID])) logger.info(f"查询到{len(nyid_list)}个期数ID") - # 5. 查询水准数据表获取水准数据 (通过NYID) - level_data_list = db.query(LevelData).filter(LevelData.NYID.in_(nyid_list)).all() - if not level_data_list: - logger.warning(f"未查询到对应的水准数据") - return [] + if nyid_max: + # 只获取最新期数的水准数据 + nyid_list = [max(nyid_list, key=int)] + logger.info(f"筛选后只获取最新期数ID: {nyid_list}") + level_data_list = db.query(LevelData).filter(LevelData.NYID.in_(nyid_list)).all() + else: + # 5. 查询水准数据表获取水准数据 (通过NYID) + level_data_list = db.query(LevelData).filter(LevelData.NYID.in_(nyid_list)).all() + if not level_data_list: + logger.warning(f"未查询到对应的水准数据") + return [] # 6. 将水准数据依照linecode去重(同linecode只需保留一个) linecode_seen = set() diff --git a/app/services/original_data.py b/app/services/original_data.py index c698c64..8e25f83 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -337,13 +337,12 @@ class OriginalDataService(BaseService[OriginalData]): # ===== 性能优化:批量查询沉降数据 ===== # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) - logger.info(f"Querying settlement data for nyid list: {nyid_list}") settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() - logger.info(f"Found {len(settlements)} settlement records") settlement_map = {s.NYID: s for s in settlements} missing_nyids = set(nyid_list) - set(settlement_map.keys()) if missing_nyids: + logger.warning(f"[批量导入原始数据] 批量查询settlement数据失败, Nyid: {list(missing_nyids)}") db.rollback() return { 'success': False,