1.补充数据加到error表中,推理的改回之前的
This commit is contained in:
@@ -17,6 +17,7 @@ from ..services.account import AccountService
|
||||
from ..models.daily import DailyData
|
||||
from ..models.settlement_data import SettlementData
|
||||
from ..models.level_data import LevelData
|
||||
from ..models.error_linecode import ErrorLinecode
|
||||
from ..services.settlement_data import SettlementDataService
|
||||
from typing import List, Tuple, Any
|
||||
from ..utils.construction_monitor import ConstructionMonitorUtils
|
||||
@@ -243,21 +244,37 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0):
|
||||
account_service = AccountService()
|
||||
monitor = ConstructionMonitorUtils()
|
||||
|
||||
logger.info("正在查询水准线路列表…")
|
||||
linecodes = [r[0] for r in db.query(LevelData.linecode).distinct().all()]
|
||||
linecode_level_settlement: List[Tuple[str, Any, dict]] = []
|
||||
for linecode in linecodes:
|
||||
level_instance = level_service.get_last_by_linecode(db, linecode)
|
||||
if not level_instance:
|
||||
logger.info(f"共 {len(linecodes)} 个水准线路,开始拉取多期沉降…")
|
||||
# 每个 linecode 传多期沉降(按 NYID 降序),供冬休「一直推」直到推出或推不出
|
||||
input_data: List[List[dict]] = []
|
||||
for idx, linecode in enumerate(linecodes):
|
||||
level_list = level_service.get_by_linecode(db, linecode)
|
||||
if not level_list:
|
||||
continue
|
||||
nyid = level_instance.NYID
|
||||
settlement_dict = settlement_service.get_one_dict_by_nyid(db, nyid)
|
||||
if not settlement_dict:
|
||||
continue
|
||||
settlement_dict['__linecode'] = linecode
|
||||
settlement_dict['__level_data'] = level_instance.to_dict()
|
||||
linecode_level_settlement.append((linecode, level_instance, settlement_dict))
|
||||
|
||||
input_data = [[s] for (_, _, s) in linecode_level_settlement]
|
||||
# 按 NYID 降序(最新在前),最多取 30 期
|
||||
level_list_sorted = sorted(
|
||||
level_list,
|
||||
key=lambda x: int(x.NYID) if str(x.NYID).isdigit() else 0,
|
||||
reverse=True,
|
||||
)[:30]
|
||||
point_data_for_linecode: List[dict] = []
|
||||
level_latest = level_list_sorted[0]
|
||||
for level_instance in level_list_sorted:
|
||||
nyid = level_instance.NYID
|
||||
settlement_dict = settlement_service.get_one_dict_by_nyid(db, nyid)
|
||||
if not settlement_dict:
|
||||
continue
|
||||
settlement_dict["__linecode"] = linecode
|
||||
# 仅最新一期带 __level_data,供后续写 daily 用
|
||||
settlement_dict["__level_data"] = level_latest.to_dict()
|
||||
point_data_for_linecode.append(settlement_dict)
|
||||
if point_data_for_linecode:
|
||||
input_data.append(point_data_for_linecode)
|
||||
if (idx + 1) % 50 == 0 or idx == len(linecodes) - 1:
|
||||
logger.info(f"已处理线路 {idx + 1}/{len(linecodes)},当前共 {len(input_data)} 条可推理")
|
||||
logger.info(f"多期沉降拉取完成,共 {len(input_data)} 条线路,开始推理…")
|
||||
if not input_data:
|
||||
logger.warning("未找到任何 linecode 对应的最新期沉降数据,跳过写 daily")
|
||||
db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}"))
|
||||
@@ -267,7 +284,23 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0):
|
||||
|
||||
# 2. 计算到期数据(remaining / 冬休等)
|
||||
daily_data = monitor.get_due_data(input_data, start=start, end=end)
|
||||
data = daily_data['data']
|
||||
logger.info("推理完成")
|
||||
data = daily_data["data"]
|
||||
# 冬休一直推、推不出来的 linecode 写入 error_linecode 表(表 id 需为 AUTO_INCREMENT)
|
||||
try:
|
||||
for linecode in daily_data.get("error_linecodes", []):
|
||||
if not linecode:
|
||||
continue
|
||||
exists = db.query(ErrorLinecode).filter(ErrorLinecode.linecode == linecode).first()
|
||||
if not exists:
|
||||
db.add(ErrorLinecode(linecode=linecode))
|
||||
if daily_data.get("error_linecodes"):
|
||||
logger.info(f"推理推不出来的线路将写入 error_linecode: {daily_data['error_linecodes']}")
|
||||
except Exception as elc:
|
||||
db.rollback()
|
||||
logger.warning(
|
||||
f"error_linecode 写入失败(请确保表 id 为 AUTO_INCREMENT): {elc},已跳过,继续写 daily"
|
||||
)
|
||||
logger.info(f"按 level_data 最新期获取数据完成,共{len(data)}条有效记录")
|
||||
|
||||
# 3. 关联 level / checkpoint / section / account(level 已带在 __level_data)
|
||||
|
||||
Reference in New Issue
Block a user