Files
railway_cloud/app/utils/construction_monitor.py

199 lines
10 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime
from typing import List, Dict
import warnings
import copy
# 注意:根据实际项目路径调整导入,若本地测试可注释掉
from ..core.logging_config import get_logger
import json
from .operating_mode_config import BASE_PERIODS
logger = get_logger(__name__)
class ConstructionMonitorUtils:
def __init__(self):
# 使用公共配置的工况周期映射表
self.base_periods = BASE_PERIODS.copy()
# 构建中英文括号+逗号兼容映射表
self.compatible_periods = self._build_compatible_brackets_map()
def _build_compatible_brackets_map(self) -> Dict[str, int]:
"""构建支持中英文括号、中英文逗号,且忽略所有空格的兼容映射表"""
compatible_map = {}
for original_key, period in self.base_periods.items():
# ========== 第一步处理原始key的空格和符号生成基础变体 ==========
# 1. 清洗空格:去除首尾+全角空格+合并连续空格+最终删除所有空格
key_no_space = original_key.strip().replace(" ", " ").replace(" ", " ").replace(" ", "")
# 2. 原始key未清洗空格
compatible_map[original_key] = period
# 3. 无空格的原始符号key
if key_no_space not in compatible_map:
compatible_map[key_no_space] = period
# ========== 第二步:生成中文括号变体(含空格/无空格) ==========
# 带空格的中文括号key
chinese_bracket_key = original_key.replace("(", "").replace(")", "")
if chinese_bracket_key != original_key and chinese_bracket_key not in compatible_map:
compatible_map[chinese_bracket_key] = period
# 无空格的中文括号key
chinese_bracket_no_space = key_no_space.replace("(", "").replace(")", "")
if chinese_bracket_no_space not in compatible_map:
compatible_map[chinese_bracket_no_space] = period
# ========== 第三步:生成中文逗号变体(含空格/无空格) ==========
# 带空格的中文逗号key
chinese_comma_key = original_key.replace(",", "")
if chinese_comma_key != original_key and chinese_comma_key not in compatible_map:
compatible_map[chinese_comma_key] = period
# 无空格的中文逗号key
chinese_comma_no_space = key_no_space.replace(",", "")
if chinese_comma_no_space not in compatible_map:
compatible_map[chinese_comma_no_space] = period
# ========== 第四步:生成中文括号+逗号混合变体(含空格/无空格) ==========
# 带空格的混合变体key
mixed_key = chinese_bracket_key.replace(",", "")
if mixed_key != original_key and mixed_key not in compatible_map:
compatible_map[mixed_key] = period
# 无空格的混合变体key
mixed_no_space = chinese_bracket_no_space.replace(",", "")
if mixed_no_space not in compatible_map:
compatible_map[mixed_no_space] = period
return compatible_map
def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]:
result = {"winter": [], "data": [], "error_data": []}
if not input_data:
return result
calc_date = current_date.date() if current_date else datetime.now().date()
for point_idx, point_data in enumerate(input_data):
if not point_data:
continue
# 推理用最新一期:取按 NYID 排序后的第一条(上游已保证倒序),不因 useflag 排除最新期
latest_item = point_data[0]
# 用于冬休回溯等:仅 useflag 有效的历史记录
filtered_point_data = [
item for item in point_data
if "useflag" in item and item["useflag"] != 0
]
latest_condition = latest_item.get("workinfoname")
if not latest_condition:
result["error_data"].append(latest_item)
warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'workinfoname'字段", UserWarning)
continue
base_condition = None
# 新增:标记是否为冬休回溯到合法工况的场景
is_winter_break = False # 初始化冬休标识
if latest_condition != "冬休":
if latest_condition not in self.compatible_periods:
result["error_data"].append(latest_item)
with open("error_data.txt", "a", encoding="utf-8") as f:
json.dump(latest_condition, f, ensure_ascii=False, indent=4)
f.write("\n")
warnings.warn(f"【数据错误】测点{point_idx}最新数据存在未定义工况: {latest_condition}", UserWarning)
continue
base_condition = latest_condition
else:
# 遍历过滤后的历史数据
for history_item in filtered_point_data[1:]:
history_condition = history_item.get("workinfoname")
if not history_condition:
result["error_data"].append(history_item)
warnings.warn(f"【数据错误】测点{point_idx}的历史数据缺少'workinfoname'字段", UserWarning)
continue
if history_condition != "冬休":
if history_condition not in self.compatible_periods:
result["error_data"].append(history_item)
with open("error_data.txt", "a", encoding="utf-8") as f:
json.dump(history_condition, f, ensure_ascii=False, indent=4)
f.write("\n")
warnings.warn(f"【数据错误】测点{point_idx}历史数据存在未定义工况: {history_condition}", UserWarning)
base_condition = None
break
base_condition = history_condition
is_winter_break = True # 触发:冬休且回溯到合法历史工况
break
item_copy = copy.deepcopy(latest_item)
create_date_val = latest_item.get("MTIME_W")
if not create_date_val:
result["error_data"].append(item_copy)
warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'MTIME_W'字段", UserWarning)
continue
try:
if isinstance(create_date_val, datetime):
create_date = create_date_val.date()
else:
create_date = datetime.strptime(create_date_val, "%Y-%m-%d %H:%M:%S").date()
except ValueError as e:
result["error_data"].append(item_copy)
warnings.warn(f"【数据错误】测点{point_idx}最新数据的日期格式错误:{create_date_val},错误:{str(e)}", UserWarning)
continue
if not base_condition:
# 当前为冬休且历史全是冬休 → 视为数据未补全remaining 固定为 -365
if latest_condition == "冬休":
item_copy["remaining"] = -365
result["data"].append(item_copy)
else:
result["winter"].append(item_copy)
continue
# 核心修改:冬休回溯场景下调整测量间隔(基准周期)
original_period = self.compatible_periods[base_condition]
if is_winter_break:
# 规则1原周期为3天则改为7天规则2原周期为14天则改为30天规则3其他周期直接翻倍
if original_period == 3:
adjusted_period = 7
elif original_period == 14:
adjusted_period = 30
else:
adjusted_period = original_period * 2
else:
# 非冬休场景,使用原始周期
adjusted_period = original_period
# 基于调整后的周期计算剩余天数
days_passed = (calc_date - create_date).days
due_days = adjusted_period - days_passed
if due_days < 0:
item_copy["remaining"] = 0 - int(abs(due_days))
result["error_data"].append(item_copy)
warn_msg = (
f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'{create_date}"
f"已超期{abs(due_days)}天!基准工况:{base_condition}"
f"原始周期{original_period}{',冬休调整后周期'+str(adjusted_period)+'' if is_winter_break else ''}"
)
logger.warning(warn_msg)
warnings.warn(warn_msg, UserWarning)
elif start <= due_days <= end:
item_copy["remaining"] = due_days
result["data"].append(item_copy)
# 相同NYID保留剩余天数最少的记录
if result["data"]:
nyid_min_remaining = {}
for record in result["data"]:
nyid = record.get("NYID")
if not nyid:
continue
if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]:
nyid_min_remaining[nyid] = record
result["data"] = list(nyid_min_remaining.values())
# logger.warning(f"【数据错误】共发现{len(result['error_data'])}条错误数据")
# logger.warning(result)
# import json
# with open("./error_data.json", "w", encoding="utf-8") as f:
# json.dump(result["error_data"], f, ensure_ascii=False, indent=4)
return result