From d80dd0f5ecb428627969661cd15377666a0b4cf7 Mon Sep 17 00:00:00 2001 From: whm <973418690@qq.com> Date: Mon, 16 Mar 2026 10:59:18 +0800 Subject: [PATCH] =?UTF-8?q?1.=E8=A1=A5=E5=85=85=E6=95=B0=E6=8D=AE=E5=8A=A0?= =?UTF-8?q?=E5=88=B0error=E8=A1=A8=E4=B8=AD=EF=BC=8C=E6=8E=A8=E7=90=86?= =?UTF-8?q?=E7=9A=84=E6=94=B9=E5=9B=9E=E4=B9=8B=E5=89=8D=E7=9A=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/comprehensive_data.py | 4 +- app/models/error_linecode.py | 10 +++++ app/models/lose_data.py | 1 + app/utils/construction_monitor.py | 12 +++--- app/utils/scheduler.py | 61 ++++++++++++++++++++++++------- 5 files changed, 66 insertions(+), 22 deletions(-) create mode 100644 app/models/error_linecode.py diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py index 3f1ac23..d0b0a5d 100644 --- a/app/api/comprehensive_data.py +++ b/app/api/comprehensive_data.py @@ -613,8 +613,8 @@ def refresh_today_data(request: TodayDataRequest, db: Session = Depends(get_db)) return DataResponse( code=ResponseCode.QUERY_FAILED, message=f"定时任务触发失败:{str(e)}", - total=len(daily_data), - data={} + total=0, + data=[] ) # account_id获取所有断面数据 @router.post("/get_all_section_by_account", response_model=DataResponse) diff --git a/app/models/error_linecode.py b/app/models/error_linecode.py new file mode 100644 index 0000000..9b2f6a4 --- /dev/null +++ b/app/models/error_linecode.py @@ -0,0 +1,10 @@ +from sqlalchemy import Column, Integer, String +from ..core.database import Base + + +class ErrorLinecode(Base): + """推理推不出来的水准线路(如全为冬休),记录到该表。数据库表 id 必须为 AUTO_INCREMENT。""" + __tablename__ = "error_linecode" + + id = Column(Integer, primary_key=True, index=True, autoincrement=True) + linecode = Column(String(255), nullable=False, comment="水准线路编码", index=True) diff --git a/app/models/lose_data.py b/app/models/lose_data.py index 1c65003..34f00c6 100644 --- a/app/models/lose_data.py +++ b/app/models/lose_data.py @@ -5,6 +5,7 @@ from ..core.database import Base class LoseData(Base): """缺失数据记录表:记录各水准线路(期数)的原始/沉降数据缺失情况""" __tablename__ = "lose_data" + __table_args__ = {"extend_existing": True} id = Column(Integer, primary_key=True, index=True, autoincrement=True, comment="ID") account_id = Column(Integer, nullable=False, comment="账户id", index=True) diff --git a/app/utils/construction_monitor.py b/app/utils/construction_monitor.py index e2d5235..74f2d7d 100644 --- a/app/utils/construction_monitor.py +++ b/app/utils/construction_monitor.py @@ -65,7 +65,7 @@ class ConstructionMonitorUtils: return compatible_map def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]: - result = {"winter": [], "data": [], "error_data": []} + result = {"winter": [], "data": [], "error_data": [], "error_linecodes": []} if not input_data: return result @@ -139,12 +139,12 @@ class ConstructionMonitorUtils: continue if not base_condition: - # 当前为冬休且历史全是冬休 → 视为数据未补全,用 bu_all=-365 表示;remaining 保持原样不改 + # 当前为冬休或历史全是冬休 → 归入冬休;若本次是冬休且推不出,记入 error_linecodes 供写入 error_linecode 表 + result["winter"].append(item_copy) if latest_condition == "冬休": - item_copy["bu_all"] = -365 - result["data"].append(item_copy) - else: - result["winter"].append(item_copy) + linecode = latest_item.get("__linecode") + if linecode and linecode not in result["error_linecodes"]: + result["error_linecodes"].append(linecode) continue # 核心修改:冬休回溯场景下调整测量间隔(基准周期) diff --git a/app/utils/scheduler.py b/app/utils/scheduler.py index 820067d..1c5f7fe 100644 --- a/app/utils/scheduler.py +++ b/app/utils/scheduler.py @@ -17,6 +17,7 @@ from ..services.account import AccountService from ..models.daily import DailyData from ..models.settlement_data import SettlementData from ..models.level_data import LevelData +from ..models.error_linecode import ErrorLinecode from ..services.settlement_data import SettlementDataService from typing import List, Tuple, Any from ..utils.construction_monitor import ConstructionMonitorUtils @@ -243,21 +244,37 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0): account_service = AccountService() monitor = ConstructionMonitorUtils() + logger.info("正在查询水准线路列表…") linecodes = [r[0] for r in db.query(LevelData.linecode).distinct().all()] - linecode_level_settlement: List[Tuple[str, Any, dict]] = [] - for linecode in linecodes: - level_instance = level_service.get_last_by_linecode(db, linecode) - if not level_instance: + logger.info(f"共 {len(linecodes)} 个水准线路,开始拉取多期沉降…") + # 每个 linecode 传多期沉降(按 NYID 降序),供冬休「一直推」直到推出或推不出 + input_data: List[List[dict]] = [] + for idx, linecode in enumerate(linecodes): + level_list = level_service.get_by_linecode(db, linecode) + if not level_list: continue - nyid = level_instance.NYID - settlement_dict = settlement_service.get_one_dict_by_nyid(db, nyid) - if not settlement_dict: - continue - settlement_dict['__linecode'] = linecode - settlement_dict['__level_data'] = level_instance.to_dict() - linecode_level_settlement.append((linecode, level_instance, settlement_dict)) - - input_data = [[s] for (_, _, s) in linecode_level_settlement] + # 按 NYID 降序(最新在前),最多取 30 期 + level_list_sorted = sorted( + level_list, + key=lambda x: int(x.NYID) if str(x.NYID).isdigit() else 0, + reverse=True, + )[:30] + point_data_for_linecode: List[dict] = [] + level_latest = level_list_sorted[0] + for level_instance in level_list_sorted: + nyid = level_instance.NYID + settlement_dict = settlement_service.get_one_dict_by_nyid(db, nyid) + if not settlement_dict: + continue + settlement_dict["__linecode"] = linecode + # 仅最新一期带 __level_data,供后续写 daily 用 + settlement_dict["__level_data"] = level_latest.to_dict() + point_data_for_linecode.append(settlement_dict) + if point_data_for_linecode: + input_data.append(point_data_for_linecode) + if (idx + 1) % 50 == 0 or idx == len(linecodes) - 1: + logger.info(f"已处理线路 {idx + 1}/{len(linecodes)},当前共 {len(input_data)} 条可推理") + logger.info(f"多期沉降拉取完成,共 {len(input_data)} 条线路,开始推理…") if not input_data: logger.warning("未找到任何 linecode 对应的最新期沉降数据,跳过写 daily") db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}")) @@ -267,7 +284,23 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0): # 2. 计算到期数据(remaining / 冬休等) daily_data = monitor.get_due_data(input_data, start=start, end=end) - data = daily_data['data'] + logger.info("推理完成") + data = daily_data["data"] + # 冬休一直推、推不出来的 linecode 写入 error_linecode 表(表 id 需为 AUTO_INCREMENT) + try: + for linecode in daily_data.get("error_linecodes", []): + if not linecode: + continue + exists = db.query(ErrorLinecode).filter(ErrorLinecode.linecode == linecode).first() + if not exists: + db.add(ErrorLinecode(linecode=linecode)) + if daily_data.get("error_linecodes"): + logger.info(f"推理推不出来的线路将写入 error_linecode: {daily_data['error_linecodes']}") + except Exception as elc: + db.rollback() + logger.warning( + f"error_linecode 写入失败(请确保表 id 为 AUTO_INCREMENT): {elc},已跳过,继续写 daily" + ) logger.info(f"按 level_data 最新期获取数据完成,共{len(data)}条有效记录") # 3. 关联 level / checkpoint / section / account(level 已带在 __level_data)