from datetime import datetime from typing import List, Dict import warnings import copy # 注意:根据实际项目路径调整导入,若本地测试可注释掉 from ..core.logging_config import get_logger import json logger = get_logger(__name__) class ConstructionMonitorUtils: def __init__(self): # 原始工况周期映射表(保持不变) self.base_periods = { "仰拱(底板)施工完成后,第1个月": 7, "仰拱(底板)施工完成后,第2至3个月": 14, "仰拱(底板)施工完成后,3个月以后": 30, "仰拱(底板)施工完成后,第1个月": 7, # 原:仰拱(底板)施工完成后,第1个月 "仰拱(底板)施工完成后,第2至3个月": 14, # 原:仰拱(底板)施工完成后,第2至3个月 "仰拱(底板)施工完成后,3个月以后": 30, # 原:仰拱(底板)施工完成后,3个月以后 "无砟轨道铺设后,第1至3个月": 30, # 原:无砟轨道铺设后,第1至3个月 "无砟轨道铺设后,4至12个月": 90, # 原:无砟轨道铺设后,4至12个月 "无砟轨道铺设后,12个月以后": 180, # 原:无砟轨道铺设后,12个月以后 "墩台施工到一定高度": 30, # 无格式差异,保留原样 "墩台混凝土施工": 30, # 无格式差异,保留原样 "预制梁桥,架梁前": 30, # 原:预制梁桥,架梁前 "预制梁桥,预制梁架设前": 1, # 原:预制梁桥,预制梁架设前 "预制梁桥,预制梁架设后": 7, # 原:预制梁桥,预制梁架设后 "桥位施工桥梁,制梁前": 30, # 原:桥位施工桥梁,制梁前 "桥位施工桥梁,上部结构施工中": 1, # 原:桥位施工桥梁,上部结构施工中 "架桥机(运梁车)通过": 7, # 无格式差异,保留原样 "桥梁主体工程完工后,第1至3个月": 7, # 原:桥梁主体工程完工后,第1至3个月 "桥梁主体工程完工后,第4至6个月": 14, # 原:桥梁主体工程完工后,第4至6个月 "桥梁主体工程完工后,6个月以后": 30, # 原:桥梁主体工程完工后,6个月以后 '' "轨道铺设期间,前": 30, "轨道铺设期间,后": 14, "轨道铺设完成后,第1个月": 14, "轨道铺设完成后,2至3个月": 30, "轨道铺设完成后,4至12个月": 90, "轨道铺设完成后,12个月以后": 180, "铺路或堆载,一般情况": 1, "填筑或堆载,一般情况": 1, "铺路或堆载,沉降量突变情况": 1, "填筑或堆载,两次填筑间隔时间较长情况":3, "铺路或堆载,两次铺路间隔时间较长情况": 3, "堆载预压或路基填筑完成,第1至3个月":7, # 原:堆载预压或路基铺路完成,第1至3个月 "堆载预压或路基填筑完成,第4至6个月": 14, # 原:堆载预压或路基铺路完成,第4至6个月 "堆载预压或路基填筑完成,6个月以后": 30, # 原:堆载预压或路基铺路完成,6个月以后 "架桥机(运梁车) 首次通过前": 1, # 原:架桥机(运梁车)首次通过前(仅加空格) "架桥机(运梁车) 首次通过后,前3天": 1, # 原:架桥机(运梁车)首次通过后,前3天 "架桥机(运梁车) 首次通过后": 7, # 原:架桥机(运梁车)首次通过后(仅加空格) "轨道板(道床)铺设后,第1个月": 14, # 原:轨道板(道床)铺设后,第1个月 "轨道板(道床)铺设后,第2至3个月": 30, # 原:轨道板(道床)铺设后,第2至3个月 "轨道板(道床)铺设后,3个月以后": 90 # 未出现在待处理集,保留原始格式 } # 构建中英文括号+逗号兼容映射表 self.compatible_periods = self._build_compatible_brackets_map() def _build_compatible_brackets_map(self) -> Dict[str, int]: """构建支持中英文括号、中英文逗号,且忽略所有空格的兼容映射表""" compatible_map = {} for original_key, period in self.base_periods.items(): # ========== 第一步:处理原始key的空格和符号,生成基础变体 ========== # 1. 清洗空格:去除首尾+全角空格+合并连续空格+最终删除所有空格 key_no_space = original_key.strip().replace(" ", " ").replace(" ", " ").replace(" ", "") # 2. 原始key(未清洗空格) compatible_map[original_key] = period # 3. 无空格的原始符号key if key_no_space not in compatible_map: compatible_map[key_no_space] = period # ========== 第二步:生成中文括号变体(含空格/无空格) ========== # 带空格的中文括号key chinese_bracket_key = original_key.replace("(", "(").replace(")", ")") if chinese_bracket_key != original_key and chinese_bracket_key not in compatible_map: compatible_map[chinese_bracket_key] = period # 无空格的中文括号key chinese_bracket_no_space = key_no_space.replace("(", "(").replace(")", ")") if chinese_bracket_no_space not in compatible_map: compatible_map[chinese_bracket_no_space] = period # ========== 第三步:生成中文逗号变体(含空格/无空格) ========== # 带空格的中文逗号key chinese_comma_key = original_key.replace(",", ",") if chinese_comma_key != original_key and chinese_comma_key not in compatible_map: compatible_map[chinese_comma_key] = period # 无空格的中文逗号key chinese_comma_no_space = key_no_space.replace(",", ",") if chinese_comma_no_space not in compatible_map: compatible_map[chinese_comma_no_space] = period # ========== 第四步:生成中文括号+逗号混合变体(含空格/无空格) ========== # 带空格的混合变体key mixed_key = chinese_bracket_key.replace(",", ",") if mixed_key != original_key and mixed_key not in compatible_map: compatible_map[mixed_key] = period # 无空格的混合变体key mixed_no_space = chinese_bracket_no_space.replace(",", ",") if mixed_no_space not in compatible_map: compatible_map[mixed_no_space] = period return compatible_map def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]: result = {"winter": [], "data": [], "error_data": []} if not input_data: return result calc_date = current_date.date() if current_date else datetime.now().date() for point_idx, point_data in enumerate(input_data): if not point_data: continue # 过滤逻辑:仅保留 useflag 存在且值≠0 的记录 filtered_point_data = [ item for item in point_data if "useflag" in item and item["useflag"] != 0 # 核心条件:字段存在 + 非0 ] # 过滤后无数据则跳过当前测点 if not filtered_point_data: continue # 使用过滤后的数据处理 latest_item = filtered_point_data[0] latest_condition = latest_item.get("workinfoname") if not latest_condition: result["error_data"].append(latest_item) warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'workinfoname'字段", UserWarning) continue base_condition = None # 新增:标记是否为冬休回溯到合法工况的场景 is_winter_break = False # 初始化冬休标识 if latest_condition != "冬休": if latest_condition not in self.compatible_periods: result["error_data"].append(latest_item) with open("error_data.txt", "a", encoding="utf-8") as f: json.dump(latest_condition, f, ensure_ascii=False, indent=4) f.write("\n") warnings.warn(f"【数据错误】测点{point_idx}最新数据存在未定义工况: {latest_condition}", UserWarning) continue base_condition = latest_condition else: # 遍历过滤后的历史数据 for history_item in filtered_point_data[1:]: history_condition = history_item.get("workinfoname") if not history_condition: result["error_data"].append(history_item) warnings.warn(f"【数据错误】测点{point_idx}的历史数据缺少'workinfoname'字段", UserWarning) continue if history_condition != "冬休": if history_condition not in self.compatible_periods: result["error_data"].append(history_item) with open("error_data.txt", "a", encoding="utf-8") as f: json.dump(history_condition, f, ensure_ascii=False, indent=4) f.write("\n") warnings.warn(f"【数据错误】测点{point_idx}历史数据存在未定义工况: {history_condition}", UserWarning) base_condition = None break base_condition = history_condition is_winter_break = True # 触发:冬休且回溯到合法历史工况 break item_copy = copy.deepcopy(latest_item) create_date_val = latest_item.get("MTIME_W") if not create_date_val: result["error_data"].append(item_copy) warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'MTIME_W'字段", UserWarning) continue try: if isinstance(create_date_val, datetime): create_date = create_date_val.date() else: create_date = datetime.strptime(create_date_val, "%Y-%m-%d %H:%M:%S").date() except ValueError as e: result["error_data"].append(item_copy) warnings.warn(f"【数据错误】测点{point_idx}最新数据的日期格式错误:{create_date_val},错误:{str(e)}", UserWarning) continue if not base_condition: result["winter"].append(item_copy) continue # 核心修改:冬休回溯场景下调整测量间隔(基准周期) original_period = self.compatible_periods[base_condition] if is_winter_break: # 规则1:原周期为3天则改为7天;规则2:原周期为14天则改为30天;规则3:其他周期直接翻倍 if original_period == 3: adjusted_period = 7 elif original_period == 14: adjusted_period = 30 else: adjusted_period = original_period * 2 else: # 非冬休场景,使用原始周期 adjusted_period = original_period # 基于调整后的周期计算剩余天数 days_passed = (calc_date - create_date).days due_days = adjusted_period - days_passed if due_days < 0: item_copy["remaining"] = 0 - int(abs(due_days)) result["error_data"].append(item_copy) warn_msg = ( f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'({create_date})" f"已超期{abs(due_days)}天!基准工况:{base_condition}," f"原始周期{original_period}天{',冬休调整后周期'+str(adjusted_period)+'天' if is_winter_break else ''}" ) logger.warning(warn_msg) warnings.warn(warn_msg, UserWarning) elif start <= due_days <= end: item_copy["remaining"] = due_days result["data"].append(item_copy) # 相同NYID保留剩余天数最少的记录 if result["data"]: nyid_min_remaining = {} for record in result["data"]: nyid = record.get("NYID") if not nyid: continue if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]: nyid_min_remaining[nyid] = record result["data"] = list(nyid_min_remaining.values()) # logger.warning(f"【数据错误】共发现{len(result['error_data'])}条错误数据") # logger.warning(result) # import json # with open("./error_data.json", "w", encoding="utf-8") as f: # json.dump(result["error_data"], f, ensure_ascii=False, indent=4) return result