from datetime import datetime from typing import List, Dict import warnings import copy # 注意:根据实际项目路径调整导入,若本地测试可注释掉 from ..core.logging_config import get_logger import json from .operating_mode_config import BASE_PERIODS logger = get_logger(__name__) class ConstructionMonitorUtils: def __init__(self): # 使用公共配置的工况周期映射表 self.base_periods = BASE_PERIODS.copy() # 构建中英文括号+逗号兼容映射表 self.compatible_periods = self._build_compatible_brackets_map() def _build_compatible_brackets_map(self) -> Dict[str, int]: """构建支持中英文括号、中英文逗号,且忽略所有空格的兼容映射表""" compatible_map = {} for original_key, period in self.base_periods.items(): # ========== 第一步:处理原始key的空格和符号,生成基础变体 ========== # 1. 清洗空格:去除首尾+全角空格+合并连续空格+最终删除所有空格 key_no_space = original_key.strip().replace(" ", " ").replace(" ", " ").replace(" ", "") # 2. 原始key(未清洗空格) compatible_map[original_key] = period # 3. 无空格的原始符号key if key_no_space not in compatible_map: compatible_map[key_no_space] = period # ========== 第二步:生成中文括号变体(含空格/无空格) ========== # 带空格的中文括号key chinese_bracket_key = original_key.replace("(", "(").replace(")", ")") if chinese_bracket_key != original_key and chinese_bracket_key not in compatible_map: compatible_map[chinese_bracket_key] = period # 无空格的中文括号key chinese_bracket_no_space = key_no_space.replace("(", "(").replace(")", ")") if chinese_bracket_no_space not in compatible_map: compatible_map[chinese_bracket_no_space] = period # ========== 第三步:生成中文逗号变体(含空格/无空格) ========== # 带空格的中文逗号key chinese_comma_key = original_key.replace(",", ",") if chinese_comma_key != original_key and chinese_comma_key not in compatible_map: compatible_map[chinese_comma_key] = period # 无空格的中文逗号key chinese_comma_no_space = key_no_space.replace(",", ",") if chinese_comma_no_space not in compatible_map: compatible_map[chinese_comma_no_space] = period # ========== 第四步:生成中文括号+逗号混合变体(含空格/无空格) ========== # 带空格的混合变体key mixed_key = chinese_bracket_key.replace(",", ",") if mixed_key != original_key and mixed_key not in compatible_map: compatible_map[mixed_key] = period # 无空格的混合变体key mixed_no_space = chinese_bracket_no_space.replace(",", ",") if mixed_no_space not in compatible_map: compatible_map[mixed_no_space] = period return compatible_map def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]: result = {"winter": [], "data": [], "error_data": []} if not input_data: return result calc_date = current_date.date() if current_date else datetime.now().date() for point_idx, point_data in enumerate(input_data): if not point_data: continue # 过滤逻辑:仅保留 useflag 存在且值≠0 的记录 filtered_point_data = [ item for item in point_data if "useflag" in item and item["useflag"] != 0 # 核心条件:字段存在 + 非0 ] # 过滤后无数据则跳过当前测点 if not filtered_point_data: continue # 使用过滤后的数据处理 latest_item = filtered_point_data[0] latest_condition = latest_item.get("workinfoname") if not latest_condition: result["error_data"].append(latest_item) warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'workinfoname'字段", UserWarning) continue base_condition = None # 新增:标记是否为冬休回溯到合法工况的场景 is_winter_break = False # 初始化冬休标识 if latest_condition != "冬休": if latest_condition not in self.compatible_periods: result["error_data"].append(latest_item) with open("error_data.txt", "a", encoding="utf-8") as f: json.dump(latest_condition, f, ensure_ascii=False, indent=4) f.write("\n") warnings.warn(f"【数据错误】测点{point_idx}最新数据存在未定义工况: {latest_condition}", UserWarning) continue base_condition = latest_condition else: # 遍历过滤后的历史数据 for history_item in filtered_point_data[1:]: history_condition = history_item.get("workinfoname") if not history_condition: result["error_data"].append(history_item) warnings.warn(f"【数据错误】测点{point_idx}的历史数据缺少'workinfoname'字段", UserWarning) continue if history_condition != "冬休": if history_condition not in self.compatible_periods: result["error_data"].append(history_item) with open("error_data.txt", "a", encoding="utf-8") as f: json.dump(history_condition, f, ensure_ascii=False, indent=4) f.write("\n") warnings.warn(f"【数据错误】测点{point_idx}历史数据存在未定义工况: {history_condition}", UserWarning) base_condition = None break base_condition = history_condition is_winter_break = True # 触发:冬休且回溯到合法历史工况 break item_copy = copy.deepcopy(latest_item) create_date_val = latest_item.get("MTIME_W") if not create_date_val: result["error_data"].append(item_copy) warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'MTIME_W'字段", UserWarning) continue try: if isinstance(create_date_val, datetime): create_date = create_date_val.date() else: create_date = datetime.strptime(create_date_val, "%Y-%m-%d %H:%M:%S").date() except ValueError as e: result["error_data"].append(item_copy) warnings.warn(f"【数据错误】测点{point_idx}最新数据的日期格式错误:{create_date_val},错误:{str(e)}", UserWarning) continue if not base_condition: result["winter"].append(item_copy) continue # 核心修改:冬休回溯场景下调整测量间隔(基准周期) original_period = self.compatible_periods[base_condition] if is_winter_break: # 规则1:原周期为3天则改为7天;规则2:原周期为14天则改为30天;规则3:其他周期直接翻倍 if original_period == 3: adjusted_period = 7 elif original_period == 14: adjusted_period = 30 else: adjusted_period = original_period * 2 else: # 非冬休场景,使用原始周期 adjusted_period = original_period # 基于调整后的周期计算剩余天数 days_passed = (calc_date - create_date).days due_days = adjusted_period - days_passed if due_days < 0: item_copy["remaining"] = 0 - int(abs(due_days)) result["error_data"].append(item_copy) warn_msg = ( f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'({create_date})" f"已超期{abs(due_days)}天!基准工况:{base_condition}," f"原始周期{original_period}天{',冬休调整后周期'+str(adjusted_period)+'天' if is_winter_break else ''}" ) logger.warning(warn_msg) warnings.warn(warn_msg, UserWarning) elif start <= due_days <= end: item_copy["remaining"] = due_days result["data"].append(item_copy) # 相同NYID保留剩余天数最少的记录 if result["data"]: nyid_min_remaining = {} for record in result["data"]: nyid = record.get("NYID") if not nyid: continue if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]: nyid_min_remaining[nyid] = record result["data"] = list(nyid_min_remaining.values()) # logger.warning(f"【数据错误】共发现{len(result['error_data'])}条错误数据") # logger.warning(result) # import json # with open("./error_data.json", "w", encoding="utf-8") as f: # json.dump(result["error_data"], f, ensure_ascii=False, indent=4) return result