222 lines
13 KiB
Python
222 lines
13 KiB
Python
from datetime import datetime
|
||
from typing import List, Dict
|
||
import warnings
|
||
import copy
|
||
# 注意:根据实际项目路径调整导入,若本地测试可注释掉
|
||
from ..core.logging_config import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
class ConstructionMonitorUtils:
|
||
def __init__(self):
|
||
# 原始工况周期映射表(保持不变)
|
||
self.base_periods = {
|
||
"仰拱(底板)施工完成后,第1个月": 7, # 原:仰拱(底板)施工完成后,第1个月
|
||
"仰拱(底板)施工完成后,第2至3个月": 14, # 原:仰拱(底板)施工完成后,第2至3个月
|
||
"仰拱(底板)施工完成后,3个月以后": 30, # 原:仰拱(底板)施工完成后,3个月以后
|
||
"无砟轨道铺设后,第1至3个月": 30, # 原:无砟轨道铺设后,第1至3个月
|
||
"无砟轨道铺设后,4至12个月": 90, # 原:无砟轨道铺设后,4至12个月
|
||
"无砟轨道铺设后,12个月以后": 180, # 原:无砟轨道铺设后,12个月以后
|
||
"墩台施工到一定高度": 30, # 无格式差异,保留原样
|
||
"墩台混凝土施工": 30, # 无格式差异,保留原样
|
||
"预制梁桥,架梁前": 30, # 原:预制梁桥,架梁前
|
||
"预制梁桥,预制梁架设前": 1, # 原:预制梁桥,预制梁架设前
|
||
"预制梁桥,预制梁架设后": 7, # 原:预制梁桥,预制梁架设后
|
||
"桥位施工桥梁,制梁前": 30, # 原:桥位施工桥梁,制梁前
|
||
"桥位施工桥梁,上部结构施工中": 1, # 原:桥位施工桥梁,上部结构施工中
|
||
"架桥机(运梁车)通过": 7, # 无格式差异,保留原样
|
||
"桥梁主体工程完工后,第1至3个月": 7, # 原:桥梁主体工程完工后,第1至3个月
|
||
"桥梁主体工程完工后,第4至6个月": 14, # 原:桥梁主体工程完工后,第4至6个月
|
||
"桥梁主体工程完工后,6个月以后": 30, # 原:桥梁主体工程完工后,6个月以后
|
||
"轨道铺设期间,前": 30,
|
||
"轨道铺设期间,后": 14,
|
||
"轨道铺设完成后,第1个月": 14,
|
||
"轨道铺设完成后,2至3个月": 30,
|
||
"轨道铺设完成后,4至12个月": 90,
|
||
"轨道铺设完成后,12个月以后": 180,
|
||
"填筑或堆载,一般情况": 1,
|
||
"填筑或堆载,沉降量突变情况": 1,
|
||
"填筑或堆载,两次填筑间隔时间较长情况": 3,
|
||
"堆载预压或路基填筑完成,第1至3个月": 7, # 原:堆载预压或路基填筑完成,第1至3个月
|
||
"堆载预压或路基填筑完成,第4至6个月": 14, # 原:堆载预压或路基填筑完成,第4至6个月
|
||
"堆载预压或路基填筑完成,6个月以后": 30, # 原:堆载预压或路基填筑完成,6个月以后
|
||
"架桥机(运梁车) 首次通过前": 1, # 原:架桥机(运梁车)首次通过前(仅加空格)
|
||
"架桥机(运梁车) 首次通过后,前3天": 1, # 原:架桥机(运梁车)首次通过后,前3天
|
||
"架桥机(运梁车) 首次通过后": 7, # 原:架桥机(运梁车)首次通过后(仅加空格)
|
||
"轨道板(道床)铺设后,第1个月": 14, # 原:轨道板(道床)铺设后,第1个月
|
||
"轨道板(道床)铺设后,第2至3个月": 30, # 原:轨道板(道床)铺设后,第2至3个月
|
||
"轨道板(道床)铺设后,3个月以后": 90 # 未出现在待处理集,保留原始格式
|
||
}
|
||
# 构建中英文括号+逗号兼容映射表
|
||
self.compatible_periods = self._build_compatible_brackets_map()
|
||
|
||
|
||
def _build_compatible_brackets_map(self) -> Dict[str, int]:
|
||
"""构建支持中英文括号、中英文逗号,且忽略所有空格的兼容映射表"""
|
||
compatible_map = {}
|
||
for original_key, period in self.base_periods.items():
|
||
# ========== 第一步:处理原始key的空格和符号,生成基础变体 ==========
|
||
# 1. 清洗空格:去除首尾+全角空格+合并连续空格+最终删除所有空格
|
||
key_no_space = original_key.strip().replace(" ", " ").replace(" ", " ").replace(" ", "")
|
||
# 2. 原始key(未清洗空格)
|
||
compatible_map[original_key] = period
|
||
# 3. 无空格的原始符号key
|
||
if key_no_space not in compatible_map:
|
||
compatible_map[key_no_space] = period
|
||
|
||
# ========== 第二步:生成中文括号变体(含空格/无空格) ==========
|
||
# 带空格的中文括号key
|
||
chinese_bracket_key = original_key.replace("(", "(").replace(")", ")")
|
||
if chinese_bracket_key != original_key and chinese_bracket_key not in compatible_map:
|
||
compatible_map[chinese_bracket_key] = period
|
||
# 无空格的中文括号key
|
||
chinese_bracket_no_space = key_no_space.replace("(", "(").replace(")", ")")
|
||
if chinese_bracket_no_space not in compatible_map:
|
||
compatible_map[chinese_bracket_no_space] = period
|
||
|
||
# ========== 第三步:生成中文逗号变体(含空格/无空格) ==========
|
||
# 带空格的中文逗号key
|
||
chinese_comma_key = original_key.replace(",", ",")
|
||
if chinese_comma_key != original_key and chinese_comma_key not in compatible_map:
|
||
compatible_map[chinese_comma_key] = period
|
||
# 无空格的中文逗号key
|
||
chinese_comma_no_space = key_no_space.replace(",", ",")
|
||
if chinese_comma_no_space not in compatible_map:
|
||
compatible_map[chinese_comma_no_space] = period
|
||
|
||
# ========== 第四步:生成中文括号+逗号混合变体(含空格/无空格) ==========
|
||
# 带空格的混合变体key
|
||
mixed_key = chinese_bracket_key.replace(",", ",")
|
||
if mixed_key != original_key and mixed_key not in compatible_map:
|
||
compatible_map[mixed_key] = period
|
||
# 无空格的混合变体key
|
||
mixed_no_space = chinese_bracket_no_space.replace(",", ",")
|
||
if mixed_no_space not in compatible_map:
|
||
compatible_map[mixed_no_space] = period
|
||
|
||
return compatible_map
|
||
|
||
def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]:
|
||
result = {"winter": [], "data": [], "error_data": []}
|
||
if not input_data:
|
||
return result
|
||
|
||
calc_date = current_date.date() if current_date else datetime.now().date()
|
||
|
||
for point_idx, point_data in enumerate(input_data):
|
||
if not point_data:
|
||
continue
|
||
|
||
# 过滤逻辑:仅保留 useflag 存在且值≠0 的记录
|
||
filtered_point_data = [
|
||
item for item in point_data
|
||
if "useflag" in item and item["useflag"] != 0 # 核心条件:字段存在 + 非0
|
||
]
|
||
# 过滤后无数据则跳过当前测点
|
||
if not filtered_point_data:
|
||
continue
|
||
|
||
# 使用过滤后的数据处理
|
||
latest_item = filtered_point_data[0]
|
||
latest_condition = latest_item.get("workinfoname")
|
||
if not latest_condition:
|
||
result["error_data"].append(latest_item)
|
||
warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'workinfoname'字段", UserWarning)
|
||
continue
|
||
|
||
base_condition = None
|
||
# 新增:标记是否为冬休回溯到合法工况的场景
|
||
is_winter_break = False # 初始化冬休标识
|
||
if latest_condition != "冬休":
|
||
if latest_condition not in self.compatible_periods:
|
||
result["error_data"].append(latest_item)
|
||
warnings.warn(f"【数据错误】测点{point_idx}最新数据存在未定义工况: {latest_condition}", UserWarning)
|
||
continue
|
||
base_condition = latest_condition
|
||
else:
|
||
# 遍历过滤后的历史数据
|
||
for history_item in filtered_point_data[1:]:
|
||
history_condition = history_item.get("workinfoname")
|
||
if not history_condition:
|
||
result["error_data"].append(history_item)
|
||
warnings.warn(f"【数据错误】测点{point_idx}的历史数据缺少'workinfoname'字段", UserWarning)
|
||
continue
|
||
if history_condition != "冬休":
|
||
if history_condition not in self.compatible_periods:
|
||
result["error_data"].append(history_item)
|
||
warnings.warn(f"【数据错误】测点{point_idx}历史数据存在未定义工况: {history_condition}", UserWarning)
|
||
base_condition = None
|
||
break
|
||
base_condition = history_condition
|
||
is_winter_break = True # 触发:冬休且回溯到合法历史工况
|
||
break
|
||
|
||
item_copy = copy.deepcopy(latest_item)
|
||
create_date_val = latest_item.get("MTIME_W")
|
||
if not create_date_val:
|
||
result["error_data"].append(item_copy)
|
||
warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'MTIME_W'字段", UserWarning)
|
||
continue
|
||
|
||
try:
|
||
if isinstance(create_date_val, datetime):
|
||
create_date = create_date_val.date()
|
||
else:
|
||
create_date = datetime.strptime(create_date_val, "%Y-%m-%d %H:%M:%S").date()
|
||
except ValueError as e:
|
||
result["error_data"].append(item_copy)
|
||
warnings.warn(f"【数据错误】测点{point_idx}最新数据的日期格式错误:{create_date_val},错误:{str(e)}", UserWarning)
|
||
continue
|
||
|
||
if not base_condition:
|
||
result["winter"].append(item_copy)
|
||
continue
|
||
|
||
# 核心修改:冬休回溯场景下调整测量间隔(基准周期)
|
||
original_period = self.compatible_periods[base_condition]
|
||
if is_winter_break:
|
||
# 规则1:原周期为3天则改为7天;规则2:其他周期直接翻倍
|
||
if original_period == 3:
|
||
adjusted_period = 7
|
||
else:
|
||
adjusted_period = original_period * 2
|
||
else:
|
||
# 非冬休场景,使用原始周期
|
||
adjusted_period = original_period
|
||
|
||
# 基于调整后的周期计算剩余天数
|
||
days_passed = (calc_date - create_date).days
|
||
due_days = adjusted_period - days_passed
|
||
|
||
if due_days < 0:
|
||
item_copy["remaining"] = 0 - int(abs(due_days))
|
||
result["error_data"].append(item_copy)
|
||
warn_msg = (
|
||
f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'({create_date})"
|
||
f"已超期{abs(due_days)}天!基准工况:{base_condition},"
|
||
f"原始周期{original_period}天{',冬休调整后周期'+str(adjusted_period)+'天' if is_winter_break else ''}"
|
||
)
|
||
logger.warning(warn_msg)
|
||
warnings.warn(warn_msg, UserWarning)
|
||
elif start <= due_days <= end:
|
||
item_copy["remaining"] = due_days
|
||
result["data"].append(item_copy)
|
||
|
||
# 相同NYID保留剩余天数最少的记录
|
||
if result["data"]:
|
||
nyid_min_remaining = {}
|
||
for record in result["data"]:
|
||
nyid = record.get("NYID")
|
||
if not nyid:
|
||
continue
|
||
|
||
if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]:
|
||
nyid_min_remaining[nyid] = record
|
||
|
||
result["data"] = list(nyid_min_remaining.values())
|
||
# logger.warning(f"【数据错误】共发现{len(result['error_data'])}条错误数据")
|
||
# logger.warning(result)
|
||
# import json
|
||
# with open("./error_data.json", "w", encoding="utf-8") as f:
|
||
# json.dump(result["error_data"], f, ensure_ascii=False, indent=4)
|
||
return result |