from datetime import datetime from typing import List, Dict import warnings import copy # 注意:根据实际项目路径调整导入,若本地测试可注释掉 from ..core.logging_config import get_logger logger = get_logger(__name__) class ConstructionMonitorUtils: def __init__(self): # 原始工况周期映射表(保持不变) self.base_periods = { "仰拱(底板)施工完成后,第1个月": 7, "仰拱(底板)施工完成后,第2至3个月": 14, "仰拱(底板)施工完成后,3个月以后": 30, "无砟轨道铺设后,第1至3个月": 30, "无砟轨道铺设后,4至12个月": 90, "无砟轨道铺设后,12个月以后": 180, "墩台施工到一定高度": 30, "墩台混凝土施工": 30, "预制梁桥,架梁前": 30, "预制梁桥,预制梁架设前": 1, "预制梁桥,预制梁架设后": 7, "桥位施工桥梁,制梁前": 30, "桥位施工桥梁,上部结构施工中": 1, "架桥机(运梁车)通过": 7, "桥梁主体工程完工后,第1至3个月": 7, # 包含英文逗号的原始数据 "桥梁主体工程完工后,第4至6个月": 14, "桥梁主体工程完工后,6个月以后": 30, "轨道铺设期间,前": 30, "轨道铺设期间,后": 14, "轨道铺设完成后,第1个月": 14, "轨道铺设完成后,2至3个月": 30, "轨道铺设完成后,4至12个月": 90, "轨道铺设完成后,12个月以后": 180, "填筑或堆载,一般情况": 1, "填筑或堆载,沉降量突变情况": 1, "填筑或堆载,两次填筑间隔时间较长情况": 3, # 该工况原周期为3天 "堆载预压或路基填筑完成,第1至3个月": 7, "堆载预压或路基填筑完成,第4至6个月": 14, "堆载预压或路基填筑完成,6个月以后": 30, "架桥机(运梁车)首次通过前": 1, "架桥机(运梁车)首次通过后,前3天": 1, "架桥机(运梁车)首次通过后": 7, "轨道板(道床)铺设后,第1个月": 14, "轨道板(道床)铺设后,第2至3个月": 30, "轨道板(道床)铺设后,3个月以后": 90, "架桥机(运梁车) 首次通过后": 7, "架桥机(运梁车) 首次通过前": 1, } # 构建中英文括号+逗号兼容映射表 self.compatible_periods = self._build_compatible_brackets_map() def _build_compatible_brackets_map(self) -> Dict[str, int]: """构建支持中英文括号、中英文逗号的兼容映射表""" compatible_map = {} for original_key, period in self.base_periods.items(): # 1. 保留原始key compatible_map[original_key] = period # 2. 生成中文括号版key chinese_bracket_key = original_key.replace("(", "(").replace(")", ")") if chinese_bracket_key != original_key: compatible_map[chinese_bracket_key] = period # 3. 生成英文逗号转中文逗号版key chinese_comma_key = original_key.replace(",", ",") if chinese_comma_key != original_key: compatible_map[chinese_comma_key] = period # 4. 生成中文括号+中文逗号混合版key mixed_key = chinese_bracket_key.replace(",", ",") if mixed_key != original_key and mixed_key not in compatible_map: compatible_map[mixed_key] = period return compatible_map def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]: result = {"winter": [], "data": [], "error_data": []} if not input_data: return result calc_date = current_date.date() if current_date else datetime.now().date() for point_idx, point_data in enumerate(input_data): if not point_data: continue # 过滤逻辑:仅保留 useflag 存在且值≠0 的记录 filtered_point_data = [ item for item in point_data if "useflag" in item and item["useflag"] != 0 # 核心条件:字段存在 + 非0 ] # 过滤后无数据则跳过当前测点 if not filtered_point_data: continue # 使用过滤后的数据处理 latest_item = filtered_point_data[0] latest_condition = latest_item.get("workinfoname") if not latest_condition: result["error_data"].append(latest_item) warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'workinfoname'字段", UserWarning) continue base_condition = None # 新增:标记是否为冬休回溯到合法工况的场景 is_winter_break = False # 初始化冬休标识 if latest_condition != "冬休": if latest_condition not in self.compatible_periods: result["error_data"].append(latest_item) warnings.warn(f"【数据错误】测点{point_idx}最新数据存在未定义工况: {latest_condition}", UserWarning) continue base_condition = latest_condition else: # 遍历过滤后的历史数据 for history_item in filtered_point_data[1:]: history_condition = history_item.get("workinfoname") if not history_condition: result["error_data"].append(history_item) warnings.warn(f"【数据错误】测点{point_idx}的历史数据缺少'workinfoname'字段", UserWarning) continue if history_condition != "冬休": if history_condition not in self.compatible_periods: result["error_data"].append(history_item) warnings.warn(f"【数据错误】测点{point_idx}历史数据存在未定义工况: {history_condition}", UserWarning) base_condition = None break base_condition = history_condition is_winter_break = True # 触发:冬休且回溯到合法历史工况 break item_copy = copy.deepcopy(latest_item) create_date_val = latest_item.get("createdate") if not create_date_val: result["error_data"].append(item_copy) warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'createdate'字段", UserWarning) continue try: if isinstance(create_date_val, datetime): create_date = create_date_val.date() else: create_date = datetime.strptime(create_date_val, "%Y-%m-%d %H:%M:%S").date() except ValueError as e: result["error_data"].append(item_copy) warnings.warn(f"【数据错误】测点{point_idx}最新数据的日期格式错误:{create_date_val},错误:{str(e)}", UserWarning) continue if not base_condition: result["winter"].append(item_copy) continue # 核心修改:冬休回溯场景下调整测量间隔(基准周期) original_period = self.compatible_periods[base_condition] if is_winter_break: # 规则1:原周期为3天则改为7天;规则2:其他周期直接翻倍 if original_period == 3: adjusted_period = 7 else: adjusted_period = original_period * 2 else: # 非冬休场景,使用原始周期 adjusted_period = original_period # 基于调整后的周期计算剩余天数 days_passed = (calc_date - create_date).days due_days = adjusted_period - days_passed if due_days < 0: item_copy["remaining"] = 0 - int(abs(due_days)) result["error_data"].append(item_copy) warn_msg = ( f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'({create_date})" f"已超期{abs(due_days)}天!基准工况:{base_condition}," f"原始周期{original_period}天{',冬休调整后周期'+str(adjusted_period)+'天' if is_winter_break else ''}" ) logger.warning(warn_msg) warnings.warn(warn_msg, UserWarning) elif start <= due_days <= end: item_copy["remaining"] = due_days result["data"].append(item_copy) # 相同NYID保留剩余天数最少的记录 if result["data"]: nyid_min_remaining = {} for record in result["data"]: nyid = record.get("NYID") if not nyid: continue if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]: nyid_min_remaining[nyid] = record result["data"] = list(nyid_min_remaining.values()) return result