This commit is contained in:
lhx
2025-11-18 17:49:38 +08:00
3 changed files with 76 additions and 104 deletions

View File

@@ -90,15 +90,17 @@ class DailyDataService(BaseService[DailyData]):
def get_nyid_by_point_id( def get_nyid_by_point_id(
self, self,
db: Session, db: Session,
point_ids: List[int] = None, point_ids: Optional[List[int]] = None,
max_num: int = 1 max_num: int = 1
) -> List[List[dict]]: ) -> List[List[dict]]:
""" """
获取指定point_id的记录修复子查询中模型对象访问错误同时过滤useflag=0的数据 获取指定point_id的记录每个point_id的前max_num条记录放在同一个子列表中
返回格式:[[point1_records...], [point2_records...]]
""" """
# 处理参数默认值 # 处理参数默认值
point_ids = point_ids or [] point_ids = point_ids or []
max_num = max(max_num, 1) if max_num <= 0:
return []
# 窗口函数按point_id分组每组内按NYID降序编号 # 窗口函数按point_id分组每组内按NYID降序编号
row_num = over( row_num = over(
@@ -107,60 +109,54 @@ class DailyDataService(BaseService[DailyData]):
order_by=desc(SettlementData.NYID) order_by=desc(SettlementData.NYID)
).label("row_num") ).label("row_num")
# 子查询:查询模型的所有字段 + 行号(不保留模型对象,只展平字段) # 模型字段列表
# 先获取模型的所有字段列表
model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns]
# -------------------------- 新增过滤条件useflag IS NOT NULL AND useflag != 0 --------------------------
# 基础条件过滤useflag为0或不存在的记录 # 基础条件
base_conditions = [ base_conditions = [
SettlementData.useflag.isnot(None), # 确保useflag字段存在非NULL SettlementData.useflag.isnot(None),
SettlementData.useflag != 0 # 确保useflag值不为0 SettlementData.useflag != 0
] ]
# 若指定了point_ids添加point_id过滤条件
if point_ids: if point_ids:
base_conditions.append(SettlementData.point_id.in_(point_ids)) base_conditions.append(SettlementData.point_id.in_(point_ids))
# 子查询
subquery = ( subquery = (
select(*model_columns, row_num) # 展开所有字段 + 行号 select(*model_columns, row_num)
.where(*base_conditions) # 应用组合条件包含useflag过滤 .where(*base_conditions)
.subquery() .subquery()
) )
# ------------------------------------------------------------------------------------------------------
# 主查询:筛选行号<=max_num的记录 # 主查询:筛选每个point_id的前max_num
query = ( query = (
select(subquery) select(subquery)
.where(subquery.c.row_num <= max_num) .where(subquery.c.row_num <= max_num)
.order_by(subquery.c.point_id, subquery.c.row_num) .order_by(subquery.c.point_id, subquery.c.row_num)
) )
# 执行查询(结果为包含字段值的行对象) # 执行查询
results = db.execute(query).all() results = db.execute(query).all()
grouped: Dict[int, List[dict]] = {} grouped: Dict[int, List[dict]] = {} # 键point_id该point_id的所有记录子列表
# 获取模型字段名列表(用于映射行对象到字典)
field_names = [col.name for col in SettlementData.__table__.columns] field_names = [col.name for col in SettlementData.__table__.columns]
for row in results: for row in results:
# 将行对象转换为字典忽略最后一个字段row_num item_dict = {field: getattr(row, field) for field in field_names}
item_dict = { try:
field: getattr(row, field) pid = int(item_dict["point_id"]) # 确保point_id为整数
for field in field_names except (KeyError, ValueError):
} continue # 跳过无效记录
pid = item_dict["point_id"]
# 同一point_id的记录放入同一个列表
if pid not in grouped: if pid not in grouped:
grouped[pid] = [] grouped[pid] = [] # 初始化子列表
grouped[pid].append(item_dict) grouped[pid].append(item_dict) # 追加到子列表
# 按输入point_ids顺序整理结果 # 按输入point_ids顺序整理结果关键每个point_id对应一个子列表
if not point_ids: if not point_ids:
point_ids = sorted(grouped.keys()) point_ids = sorted(grouped.keys()) # 若无指定按point_id排序
# 构建[[{}], [{}]]格式 # 构建最终结果每个point_id的记录作为一个子列表
return [ return [grouped.get(pid, []) for pid in point_ids]
[record] for pid in point_ids for record in grouped.get(pid, [])
]
# 获取所有的今日数据 # 获取所有的今日数据
def get_all_daily_data( def get_all_daily_data(

View File

@@ -2,7 +2,8 @@ from datetime import datetime
from typing import List, Dict from typing import List, Dict
import warnings import warnings
import copy import copy
from ..core.logging_config import get_logger
logger = get_logger(__name__)
class ConstructionMonitorUtils: class ConstructionMonitorUtils:
def __init__(self): def __init__(self):
# 原始工况周期映射表(保持不变) # 原始工况周期映射表(保持不变)
@@ -155,6 +156,7 @@ class ConstructionMonitorUtils:
f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'{create_date}" f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'{create_date}"
f"已超期{abs(due_days)}天!基准工况:{base_condition},周期{period}" f"已超期{abs(due_days)}天!基准工况:{base_condition},周期{period}"
) )
logger.warning(warn_msg)
warnings.warn(warn_msg, UserWarning) warnings.warn(warn_msg, UserWarning)
elif start <= due_days <= end: elif start <= due_days <= end:
item_copy["remaining"] = due_days item_copy["remaining"] = due_days

View File

@@ -1,6 +1,7 @@
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from sqlalchemy import text
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from ..core.config import settings from ..core.config import settings
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -17,6 +18,8 @@ from ..models.daily import DailyData
from ..models.settlement_data import SettlementData from ..models.settlement_data import SettlementData
from typing import List from typing import List
from ..utils.construction_monitor import ConstructionMonitorUtils from ..utils.construction_monitor import ConstructionMonitorUtils
import time
import json
# 获取日志记录器 # 获取日志记录器
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -216,7 +219,7 @@ def database_cleanup_task():
return "数据库清理完成" return "数据库清理完成"
# 每日自动写入获取最新工况信息 # 每日自动写入获取最新工况信息
def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0): def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0):
"""定时任务获取max NYID关联数据并批量创建DailyData记录""" """定时任务获取max NYID关联数据并批量创建DailyData记录"""
db: Session = None db: Session = None
try: try:
@@ -225,23 +228,25 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0):
db = SessionLocal() db = SessionLocal()
logger.info("定时任务开始执行获取max NYID关联数据并处理") logger.info("定时任务开始执行获取max NYID关联数据并处理")
# 核心新增清空DailyData表所有数据 # 核心新增清空DailyData表所有数据
delete_count = db.query(DailyData).delete() # delete_count = db.query(DailyData).delete()
db.commit() # db.commit()
logger.info(f"DailyData表清空完成共删除{delete_count}条历史记录") db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}"))
db.commit() # 必须提交事务
# logger.info(f"DailyData表清空完成共删除{delete_count}条历史记录")
# 1. 获取沉降数据(返回 List[List[dict]] # 1. 获取沉降数据(返回 List[List[dict]]
daily_service = DailyDataService() daily_service = DailyDataService()
result = daily_service.get_nyid_by_point_id(db, [], 1) result = daily_service.get_nyid_by_point_id(db, [], 1)
# 2. 计算到期数据 # 2. 计算到期数据
monitor = ConstructionMonitorUtils() monitor = ConstructionMonitorUtils()
daily_data = monitor.get_due_data(result, start, end) daily_data = monitor.get_due_data(result,start=start,end=end)
data = daily_data['data'] data = daily_data['data']
error_data = daily_data['error_data'] error_data = daily_data['error_data']
winters = daily_data['winter'] winters = daily_data['winter']
logger.info(f"首次获取数据完成,共{len(result)}条记录") logger.info(f"首次获取数据完成,共{len(result)}条记录")
# 3. 循环处理冬休数据,追溯历史非冬休记录 # 3. 循环处理冬休数据,追溯历史非冬休记录
max_num = 1 max_num = 1
while winters: while winters:
@@ -250,86 +255,55 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0):
new_list = [w['point_id'] for w in winters] new_list = [w['point_id'] for w in winters]
# 获取更多历史记录 # 获取更多历史记录
nyid_list = daily_service.get_nyid_by_point_id(db, new_list, max_num) nyid_list = daily_service.get_nyid_by_point_id(db, new_list, max_num)
w_list = monitor.get_due_data(nyid_list, start, end) w_list = monitor.get_due_data(nyid_list,start=start,end=end)
# 更新冬休、待处理、错误数据 # 更新冬休、待处理、错误数据
winters = w_list['winter'] winters = w_list['winter']
data.extend(w_list['data']) data.extend(w_list['data'])
# 过期数据一并处理 # 过期数据一并处理
# data.extend(w_list['error_data']) # data.extend(w_list['error_data'])
error_data.extend(w_list['error_data']) error_data.extend(w_list['error_data'])
print(w_list)
data.extend(error_data) data.extend(error_data)
# 4. 初始化服务实例 # 4. 初始化服务实例
level_service = LevelDataService() level_service = LevelDataService()
checkpoint_db = CheckpointService() checkpoint_db = CheckpointService()
section_db = SectionDataService() section_db = SectionDataService()
account_service = AccountService() account_service = AccountService()
print(len(data))
# 5. 批量查询优化 # 5. 关联其他表数据(核心逻辑保留)
logger.info("批量获取关联数据")
# 提取所有需要的ID列表
nyid_list = list(set(d['NYID'] for d in data if d.get('NYID')))
point_id_list = list(set(d['point_id'] for d in data if d.get('point_id')))
# 批量查询LevelData
logger.info(f"批量查询LevelDatanyid数量: {len(nyid_list)}")
level_results = level_service.get_by_nyids(db, nyid_list)
level_dict = {level.NYID: level for level in level_results}
# 批量查询CheckpointData
logger.info(f"批量查询CheckpointDatapoint_id数量: {len(point_id_list)}")
checkpoint_results = checkpoint_db.get_by_point_ids_batch(db, point_id_list)
checkpoint_dict = {cp.point_id: cp for cp in checkpoint_results}
# 提取所有section_id
section_id_list = list(set(
cp.section_id for cp in checkpoint_results
if cp.section_id and isinstance(cp, object)
))
# 批量查询SectionData
logger.info(f"批量查询SectionDatasection_id数量: {len(section_id_list)}")
section_results = section_db.get_by_section_ids_batch(db, section_id_list)
section_dict = {s.section_id: s for s in section_results}
# 提取所有account_id
account_id_list = list(set(
s.account_id for s in section_results
if s.account_id and isinstance(s, object)
))
# 批量查询AccountData
logger.info(f"批量查询AccountDataaccount_id数量: {len(account_id_list)}")
account_results = account_service.get_accounts_batch(db, account_id_list)
account_dict = {acc.id: acc for acc in account_results}
logger.info("批量查询完成,开始关联数据")
# 6. 关联数据到原记录
for d in data: for d in data:
# 关联LevelData # 处理 LevelData(假设返回列表,取第一条)
level_instance = level_dict.get(d['NYID']) level_results = level_service.get_by_nyid(db, d['NYID'])
level_instance = level_results[0] if isinstance(level_results, list) and level_results else level_results
d['level_data'] = level_instance.to_dict() if level_instance else None d['level_data'] = level_instance.to_dict() if level_instance else None
# 关联CheckpointData # 处理 CheckpointData(返回单实例,直接使用)
checkpoint_instance = checkpoint_dict.get(d['point_id']) checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id'])
d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None
# 关联SectionData # 处理 SectionData根据checkpoint_data关联
section_id = d['checkpoint_data']['section_id'] if d.get('checkpoint_data') else None if d['checkpoint_data']:
section_instance = section_dict.get(section_id) if section_id else None section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id'])
d['section_data'] = section_instance.to_dict() if section_instance else None d['section_data'] = section_instance.to_dict() if section_instance else None
else:
# 关联AccountData d['section_data'] = None
account_id = d.get('section_data', {}).get('account_id') if d.get('section_data') else None
account_instance = account_dict.get(account_id) if account_id else None # 处理 AccountData
d['account_data'] = account_instance.__dict__ if account_instance else None if d.get('section_data') and d['section_data'].get('account_id'):
account_response = account_service.get_account(db, account_id=d['section_data']['account_id'])
# 7. 构造DailyData数据并批量创建 d['account_data'] = account_response.__dict__ if account_response else None
else:
d['account_data'] = None
# 6. 构造DailyData数据并批量创建
# daily_create_data1 = set()
daily_create_data = [] daily_create_data = []
nyids = []
for d in data: for d in data:
# 过滤无效数据(避免缺失关键字段报错) # 过滤无效数据(避免缺失关键字段报错)
if all(key in d for key in ['NYID', 'point_id']) and d.get('level_data') and d.get('account_data') and d.get('section_data'): if all(key in d for key in ['NYID', 'point_id']) and d.get('level_data') and d.get('account_data') and d.get('section_data'):
if d['NYID'] in nyids:
continue
tem = { tem = {
'NYID': d['NYID'], 'NYID': d['NYID'],
'point_id': d['point_id'], 'point_id': d['point_id'],
@@ -338,8 +312,8 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0, end: int = 0):
'section_id': d['section_data']['section_id'], 'section_id': d['section_data']['section_id'],
'remaining': (0-int(d['overdue'])) if 'overdue' in d else d['remaining'], 'remaining': (0-int(d['overdue'])) if 'overdue' in d else d['remaining'],
} }
nyids.append(d['NYID'])
daily_create_data.append(tem) daily_create_data.append(tem)
# 批量创建记录 # 批量创建记录
if daily_create_data: if daily_create_data:
created_records = daily_service.batch_create_by_account_nyid(db, daily_create_data) created_records = daily_service.batch_create_by_account_nyid(db, daily_create_data)