From 83925382ed2e017653d3bb37a30fab40c1e837f8 Mon Sep 17 00:00:00 2001 From: whm <973418690@qq.com> Date: Tue, 18 Nov 2025 12:09:26 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BC=98=E5=8C=96daily=E7=9A=84=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E7=BB=93=E6=9E=84=202.=E4=BF=AE=E6=94=B9=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/daily.py | 62 ++++++++-------- app/utils/construction_monitor.py | 4 +- app/utils/scheduler.py | 114 ++++++++++++------------------ 3 files changed, 76 insertions(+), 104 deletions(-) diff --git a/app/services/daily.py b/app/services/daily.py index 50b621e..12fadc6 100644 --- a/app/services/daily.py +++ b/app/services/daily.py @@ -90,15 +90,17 @@ class DailyDataService(BaseService[DailyData]): def get_nyid_by_point_id( self, db: Session, - point_ids: List[int] = None, + point_ids: Optional[List[int]] = None, max_num: int = 1 ) -> List[List[dict]]: """ - 获取指定point_id的记录,修复子查询中模型对象访问错误,同时过滤useflag=0的数据 + 获取指定point_id的记录,每个point_id的前max_num条记录放在同一个子列表中 + 返回格式:[[point1_records...], [point2_records...]] """ # 处理参数默认值 point_ids = point_ids or [] - max_num = max(max_num, 1) + if max_num <= 0: + return [] # 窗口函数:按point_id分组,每组内按NYID降序编号 row_num = over( @@ -107,60 +109,54 @@ class DailyDataService(BaseService[DailyData]): order_by=desc(SettlementData.NYID) ).label("row_num") - # 子查询:查询模型的所有字段 + 行号(不保留模型对象,只展平字段) - # 先获取模型的所有字段列表 + # 模型字段列表 model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] - # -------------------------- 新增过滤条件:useflag IS NOT NULL AND useflag != 0 -------------------------- - # 基础条件:过滤useflag为0或不存在的记录 + + # 基础条件 base_conditions = [ - SettlementData.useflag.isnot(None), # 确保useflag字段存在(非NULL) - SettlementData.useflag != 0 # 确保useflag值不为0 + SettlementData.useflag.isnot(None), + SettlementData.useflag != 0 ] - # 若指定了point_ids,添加point_id过滤条件 if point_ids: base_conditions.append(SettlementData.point_id.in_(point_ids)) + # 子查询 subquery = ( - select(*model_columns, row_num) # 展开所有字段 + 行号 - .where(*base_conditions) # 应用组合条件(包含useflag过滤) + select(*model_columns, row_num) + .where(*base_conditions) .subquery() ) - # ------------------------------------------------------------------------------------------------------ - # 主查询:筛选行号<=max_num的记录 + # 主查询:筛选每个point_id的前max_num条 query = ( select(subquery) .where(subquery.c.row_num <= max_num) .order_by(subquery.c.point_id, subquery.c.row_num) ) - # 执行查询(结果为包含字段值的行对象) + # 执行查询 results = db.execute(query).all() - grouped: Dict[int, List[dict]] = {} - - # 获取模型字段名列表(用于映射行对象到字典) + grouped: Dict[int, List[dict]] = {} # 键:point_id,值:该point_id的所有记录(子列表) field_names = [col.name for col in SettlementData.__table__.columns] for row in results: - # 将行对象转换为字典(忽略最后一个字段row_num) - item_dict = { - field: getattr(row, field) - for field in field_names - } - pid = item_dict["point_id"] - + item_dict = {field: getattr(row, field) for field in field_names} + try: + pid = int(item_dict["point_id"]) # 确保point_id为整数 + except (KeyError, ValueError): + continue # 跳过无效记录 + + # 同一point_id的记录放入同一个列表 if pid not in grouped: - grouped[pid] = [] - grouped[pid].append(item_dict) + grouped[pid] = [] # 初始化子列表 + grouped[pid].append(item_dict) # 追加到子列表 - # 按输入point_ids顺序整理结果 + # 按输入point_ids顺序整理结果(关键:每个point_id对应一个子列表) if not point_ids: - point_ids = sorted(grouped.keys()) + point_ids = sorted(grouped.keys()) # 若无指定,按point_id排序 - # 构建[[{}], [{}]]格式 - return [ - [record] for pid in point_ids for record in grouped.get(pid, []) - ] + # 构建最终结果:每个point_id的记录作为一个子列表 + return [grouped.get(pid, []) for pid in point_ids] # 获取所有的今日数据 def get_all_daily_data( diff --git a/app/utils/construction_monitor.py b/app/utils/construction_monitor.py index 36fd12e..8ffeaa9 100644 --- a/app/utils/construction_monitor.py +++ b/app/utils/construction_monitor.py @@ -2,7 +2,8 @@ from datetime import datetime from typing import List, Dict import warnings import copy - +from ..core.logging_config import get_logger +logger = get_logger(__name__) class ConstructionMonitorUtils: def __init__(self): # 原始工况周期映射表(保持不变) @@ -155,6 +156,7 @@ class ConstructionMonitorUtils: f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'({create_date})" f"已超期{abs(due_days)}天!基准工况:{base_condition},周期{period}天" ) + logger.warning(warn_msg) warnings.warn(warn_msg, UserWarning) elif start <= due_days <= end: item_copy["remaining"] = due_days diff --git a/app/utils/scheduler.py b/app/utils/scheduler.py index 93a7c83..55dcd01 100644 --- a/app/utils/scheduler.py +++ b/app/utils/scheduler.py @@ -1,6 +1,7 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from sqlalchemy import text from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR from ..core.config import settings from sqlalchemy.orm import Session @@ -17,6 +18,8 @@ from ..models.daily import DailyData from ..models.settlement_data import SettlementData from typing import List from ..utils.construction_monitor import ConstructionMonitorUtils +import time +import json # 获取日志记录器 logger = get_logger(__name__) @@ -216,7 +219,7 @@ def database_cleanup_task(): return "数据库清理完成" # 每日自动写入获取最新工况信息 -def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0): +def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0): """定时任务:获取max NYID关联数据并批量创建DailyData记录""" db: Session = None try: @@ -225,23 +228,25 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0): db = SessionLocal() logger.info("定时任务开始执行:获取max NYID关联数据并处理") # 核心新增:清空DailyData表所有数据 - delete_count = db.query(DailyData).delete() - db.commit() - logger.info(f"DailyData表清空完成,共删除{delete_count}条历史记录") + # delete_count = db.query(DailyData).delete() + # db.commit() + db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}")) + db.commit() # 必须提交事务 + # logger.info(f"DailyData表清空完成,共删除{delete_count}条历史记录") # 1. 获取沉降数据(返回 List[List[dict]]) daily_service = DailyDataService() result = daily_service.get_nyid_by_point_id(db, [], 1) - + # 2. 计算到期数据 monitor = ConstructionMonitorUtils() - daily_data = monitor.get_due_data(result, start, end) + daily_data = monitor.get_due_data(result,start=start,end=end) data = daily_data['data'] error_data = daily_data['error_data'] - + winters = daily_data['winter'] logger.info(f"首次获取数据完成,共{len(result)}条记录") - + # 3. 循环处理冬休数据,追溯历史非冬休记录 max_num = 1 while winters: @@ -250,86 +255,55 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0): new_list = [w['point_id'] for w in winters] # 获取更多历史记录 nyid_list = daily_service.get_nyid_by_point_id(db, new_list, max_num) - w_list = monitor.get_due_data(nyid_list, start, end) + w_list = monitor.get_due_data(nyid_list,start=start,end=end) # 更新冬休、待处理、错误数据 winters = w_list['winter'] data.extend(w_list['data']) # 过期数据一并处理 # data.extend(w_list['error_data']) error_data.extend(w_list['error_data']) + print(w_list) data.extend(error_data) # 4. 初始化服务实例 level_service = LevelDataService() checkpoint_db = CheckpointService() section_db = SectionDataService() account_service = AccountService() - - # 5. 批量查询优化 - logger.info("批量获取关联数据") - - # 提取所有需要的ID列表 - nyid_list = list(set(d['NYID'] for d in data if d.get('NYID'))) - point_id_list = list(set(d['point_id'] for d in data if d.get('point_id'))) - - # 批量查询LevelData - logger.info(f"批量查询LevelData,nyid数量: {len(nyid_list)}") - level_results = level_service.get_by_nyids(db, nyid_list) - level_dict = {level.NYID: level for level in level_results} - - # 批量查询CheckpointData - logger.info(f"批量查询CheckpointData,point_id数量: {len(point_id_list)}") - checkpoint_results = checkpoint_db.get_by_point_ids_batch(db, point_id_list) - checkpoint_dict = {cp.point_id: cp for cp in checkpoint_results} - - # 提取所有section_id - section_id_list = list(set( - cp.section_id for cp in checkpoint_results - if cp.section_id and isinstance(cp, object) - )) - - # 批量查询SectionData - logger.info(f"批量查询SectionData,section_id数量: {len(section_id_list)}") - section_results = section_db.get_by_section_ids_batch(db, section_id_list) - section_dict = {s.section_id: s for s in section_results} - - # 提取所有account_id - account_id_list = list(set( - s.account_id for s in section_results - if s.account_id and isinstance(s, object) - )) - - # 批量查询AccountData - logger.info(f"批量查询AccountData,account_id数量: {len(account_id_list)}") - account_results = account_service.get_accounts_batch(db, account_id_list) - account_dict = {acc.id: acc for acc in account_results} - - logger.info("批量查询完成,开始关联数据") - - # 6. 关联数据到原记录 + print(len(data)) + # 5. 关联其他表数据(核心逻辑保留) for d in data: - # 关联LevelData - level_instance = level_dict.get(d['NYID']) + # 处理 LevelData(假设返回列表,取第一条) + level_results = level_service.get_by_nyid(db, d['NYID']) + level_instance = level_results[0] if isinstance(level_results, list) and level_results else level_results d['level_data'] = level_instance.to_dict() if level_instance else None - - # 关联CheckpointData - checkpoint_instance = checkpoint_dict.get(d['point_id']) + + # 处理 CheckpointData(返回单实例,直接使用) + checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id']) d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None - - # 关联SectionData - section_id = d['checkpoint_data']['section_id'] if d.get('checkpoint_data') else None - section_instance = section_dict.get(section_id) if section_id else None - d['section_data'] = section_instance.to_dict() if section_instance else None - - # 关联AccountData - account_id = d.get('section_data', {}).get('account_id') if d.get('section_data') else None - account_instance = account_dict.get(account_id) if account_id else None - d['account_data'] = account_instance.__dict__ if account_instance else None - - # 7. 构造DailyData数据并批量创建 + + # 处理 SectionData(根据checkpoint_data关联) + if d['checkpoint_data']: + section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id']) + d['section_data'] = section_instance.to_dict() if section_instance else None + else: + d['section_data'] = None + + # 处理 AccountData + if d.get('section_data') and d['section_data'].get('account_id'): + account_response = account_service.get_account(db, account_id=d['section_data']['account_id']) + d['account_data'] = account_response.__dict__ if account_response else None + else: + d['account_data'] = None + + # 6. 构造DailyData数据并批量创建 + # daily_create_data1 = set() daily_create_data = [] + nyids = [] for d in data: # 过滤无效数据(避免缺失关键字段报错) if all(key in d for key in ['NYID', 'point_id']) and d.get('level_data') and d.get('account_data') and d.get('section_data'): + if d['NYID'] in nyids: + continue tem = { 'NYID': d['NYID'], 'point_id': d['point_id'], @@ -338,8 +312,8 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0): 'section_id': d['section_data']['section_id'], 'remaining': (0-int(d['overdue'])) if 'overdue' in d else d['remaining'], } + nyids.append(d['NYID']) daily_create_data.append(tem) - # 批量创建记录 if daily_create_data: created_records = daily_service.batch_create_by_account_nyid(db, daily_create_data)