From d8b6247094fabbdf364c86a917799f1a04cbc045 Mon Sep 17 00:00:00 2001 From: lhx Date: Tue, 18 Nov 2025 09:41:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/account.py | 7 +++ app/services/checkpoint.py | 6 +++ app/services/section_data.py | 7 +++ app/utils/scheduler.py | 96 +++++++++++++++++++++++++----------- 4 files changed, 86 insertions(+), 30 deletions(-) diff --git a/app/services/account.py b/app/services/account.py index d544f15..270f688 100644 --- a/app/services/account.py +++ b/app/services/account.py @@ -58,6 +58,13 @@ class AccountService: return AccountResponse.from_orm_account(account) return None + @staticmethod + def get_accounts_batch(db: Session, account_ids: List[int]) -> List[Account]: + """批量根据ID列表获取账号数据(返回模型对象,用于批量关联)""" + if not account_ids: + return [] + return db.query(Account).filter(Account.id.in_(account_ids)).all() + @staticmethod def get_account_by_username(db: Session, username: str) -> Optional[Account]: """根据用户名获取账号""" diff --git a/app/services/checkpoint.py b/app/services/checkpoint.py index 0d035d3..e2198e6 100644 --- a/app/services/checkpoint.py +++ b/app/services/checkpoint.py @@ -15,6 +15,12 @@ class CheckpointService(BaseService[Checkpoint]): checkpoints = self.get_by_field(db, "point_id", point_id) return checkpoints[0] if checkpoints else None + def get_by_point_ids_batch(self, db: Session, point_ids: List[str]) -> List[Checkpoint]: + """批量根据观测点ID列表获取观测点数据(使用IN查询优化性能)""" + if not point_ids: + return [] + return db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all() + def search_checkpoints(self, db: Session, aname: Optional[str] = None, section_id: Optional[str] = None, diff --git a/app/services/section_data.py b/app/services/section_data.py index a467b1b..33d206a 100644 --- a/app/services/section_data.py +++ b/app/services/section_data.py @@ -30,6 +30,13 @@ class SectionDataService(BaseService[SectionData]): if not account_ids: return [] return db.query(SectionData).filter(SectionData.account_id.in_(account_ids)).all() + + def get_by_section_ids_batch(self, db: Session, section_ids: List[str]) -> List[SectionData]: + """批量根据断面ID列表获取断面数据(使用IN查询优化性能)""" + if not section_ids: + return [] + return db.query(SectionData).filter(SectionData.section_id.in_(section_ids)).all() + def get_by_number(self, db: Session, number: str) -> List[SectionData]: """根据桥梁墩(台)编号获取断面数据""" return self.get_by_field(db, "number", number) diff --git a/app/utils/scheduler.py b/app/utils/scheduler.py index 0d6c83b..93a7c83 100644 --- a/app/utils/scheduler.py +++ b/app/utils/scheduler.py @@ -7,7 +7,7 @@ from sqlalchemy.orm import Session from ..core.database import SessionLocal from ..core.logging_config import get_logger from ..models.account import Account -# from ..services.daily import get_nyid_by_point_id +# from ..services.daily import get_nyid_by_point_id from ..services.daily import DailyDataService from ..services.level_data import LevelDataService from ..services.checkpoint import CheckpointService @@ -76,7 +76,7 @@ class TaskScheduler: """设置系统定时任务""" try: # 检查是否已存在每日重置任务 - + existing_job = self.scheduler.get_job("daily_reset_today_updated") # existing_job = self.scheduler.get_job("get_max_nyid") if not existing_job: @@ -232,16 +232,16 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0): # 1. 获取沉降数据(返回 List[List[dict]]) daily_service = DailyDataService() result = daily_service.get_nyid_by_point_id(db, [], 1) - + # 2. 计算到期数据 monitor = ConstructionMonitorUtils() daily_data = monitor.get_due_data(result, start, end) data = daily_data['data'] error_data = daily_data['error_data'] - + winters = daily_data['winter'] logger.info(f"首次获取数据完成,共{len(result)}条记录") - + # 3. 循环处理冬休数据,追溯历史非冬休记录 max_num = 1 while winters: @@ -263,33 +263,69 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0): checkpoint_db = CheckpointService() section_db = SectionDataService() account_service = AccountService() - - # 5. 关联其他表数据(核心逻辑保留) + + # 5. 批量查询优化 + logger.info("批量获取关联数据") + + # 提取所有需要的ID列表 + nyid_list = list(set(d['NYID'] for d in data if d.get('NYID'))) + point_id_list = list(set(d['point_id'] for d in data if d.get('point_id'))) + + # 批量查询LevelData + logger.info(f"批量查询LevelData,nyid数量: {len(nyid_list)}") + level_results = level_service.get_by_nyids(db, nyid_list) + level_dict = {level.NYID: level for level in level_results} + + # 批量查询CheckpointData + logger.info(f"批量查询CheckpointData,point_id数量: {len(point_id_list)}") + checkpoint_results = checkpoint_db.get_by_point_ids_batch(db, point_id_list) + checkpoint_dict = {cp.point_id: cp for cp in checkpoint_results} + + # 提取所有section_id + section_id_list = list(set( + cp.section_id for cp in checkpoint_results + if cp.section_id and isinstance(cp, object) + )) + + # 批量查询SectionData + logger.info(f"批量查询SectionData,section_id数量: {len(section_id_list)}") + section_results = section_db.get_by_section_ids_batch(db, section_id_list) + section_dict = {s.section_id: s for s in section_results} + + # 提取所有account_id + account_id_list = list(set( + s.account_id for s in section_results + if s.account_id and isinstance(s, object) + )) + + # 批量查询AccountData + logger.info(f"批量查询AccountData,account_id数量: {len(account_id_list)}") + account_results = account_service.get_accounts_batch(db, account_id_list) + account_dict = {acc.id: acc for acc in account_results} + + logger.info("批量查询完成,开始关联数据") + + # 6. 关联数据到原记录 for d in data: - # 处理 LevelData(假设返回列表,取第一条) - level_results = level_service.get_by_nyid(db, d['NYID']) - level_instance = level_results[0] if isinstance(level_results, list) and level_results else level_results + # 关联LevelData + level_instance = level_dict.get(d['NYID']) d['level_data'] = level_instance.to_dict() if level_instance else None - - # 处理 CheckpointData(返回单实例,直接使用) - checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id']) + + # 关联CheckpointData + checkpoint_instance = checkpoint_dict.get(d['point_id']) d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None - - # 处理 SectionData(根据checkpoint_data关联) - if d['checkpoint_data']: - section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id']) - d['section_data'] = section_instance.to_dict() if section_instance else None - else: - d['section_data'] = None - - # 处理 AccountData - if d.get('section_data') and d['section_data'].get('account_id'): - account_response = account_service.get_account(db, account_id=d['section_data']['account_id']) - d['account_data'] = account_response.__dict__ if account_response else None - else: - d['account_data'] = None - - # 6. 构造DailyData数据并批量创建 + + # 关联SectionData + section_id = d['checkpoint_data']['section_id'] if d.get('checkpoint_data') else None + section_instance = section_dict.get(section_id) if section_id else None + d['section_data'] = section_instance.to_dict() if section_instance else None + + # 关联AccountData + account_id = d.get('section_data', {}).get('account_id') if d.get('section_data') else None + account_instance = account_dict.get(account_id) if account_id else None + d['account_data'] = account_instance.__dict__ if account_instance else None + + # 7. 构造DailyData数据并批量创建 daily_create_data = [] for d in data: # 过滤无效数据(避免缺失关键字段报错) @@ -303,7 +339,7 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0): 'remaining': (0-int(d['overdue'])) if 'overdue' in d else d['remaining'], } daily_create_data.append(tem) - + # 批量创建记录 if daily_create_data: created_records = daily_service.batch_create_by_account_nyid(db, daily_create_data)