from datetime import datetime, date from .operating_mode_config import ( BASE_PERIODS, OLD_TO_NEW_MAP, CONDITION_GROUP, TRANSITION_RULES, WINTER_BREAK_LABELS, ) class OperatingModePredictor: """ 工况预测类(处理二维倒序数据,返回一维列表,仅保留各内嵌列表最新记录) 功能:根据输入的带时间序列的工况数据,推导每个监测点(point_id)的下一阶段工况 特性: 1. 输入为二维列表,每个内嵌列表对应一个point_id,且为倒序排列(最新记录在索引0) 2. 输出为一维列表,仅保留每个内嵌列表的最新记录,新增工况推导结果字段 3. 推导规则:有等效映射返回新工况,无等效保留旧工况;切换下一工况以base_periods为准 4. 时间计算仅按日期(天)维度,忽略时分秒 5. 冬休场景:仅以冬休前上一有效工况为判断基准,切换规则与非冬休完全一致 6. 适配中英文括号、逗号、空格,内部标准化匹配,外部返回规范名称 """ def __init__(self): """初始化类,加载核心配置""" # 1. 基础工况配置(最终返回的规范名称,含所有新旧工况) self.base_periods = BASE_PERIODS.copy() # 2. 旧→新等效映射(优先返回新工况) self.old_to_new_map = OLD_TO_NEW_MAP.copy() # 3. 工况分组(同义工况归为同一分组,复用切换规则) self.condition_group = CONDITION_GROUP.copy() # 4. 切换触发规则(沿用原逻辑,触发天数+目标工况) self.transition_rules = TRANSITION_RULES.copy() # 5. 冬休标识 self.winter_break_labels = WINTER_BREAK_LABELS.copy() # 辅助映射:标准化名称→base_periods中的规范名称(用于最终返回) self.std_to_canonical = { self._standardize_name(name): name for name in self.base_periods.keys() } # 辅助映射:标准化名称→分组ID self.std_to_group = { self._standardize_name(name): group_id for name, group_id in self.condition_group.items() } def _standardize_name(self, name): """ 标准化工况名称(内部匹配用):去空格、统一中英文符号 :param name: 原始工况名称 :return: 标准化后的名称 """ if not name: return "" # 去所有空格 std_name = name.replace(" ", "").strip() # 统一中英文符号 replace_map = { "(": "(", ")": ")", ",": ",", "。": ",", "~": "至", ";": ";", ":": ":", "'": "'", """: "\"" } for old, new in replace_map.items(): std_name = std_name.replace(old, new) return std_name def _parse_to_date(self, time_str): """解析时间字符串为date对象(仅保留年月日)""" if not time_str: return None try: dt = datetime.strptime(str(time_str).strip(), "%Y-%m-%d %H:%M:%S") return dt.date() except ValueError: return None def _get_time_statistics(self, data, target_workinfo): """从倒序数据中提取目标工况的时间统计信息""" # 筛选目标工况的有效记录 target_records = [ d for d in data if self._standardize_name(d.get("workinfoname")) == self._standardize_name(target_workinfo) and d.get("workinfoname") not in self.winter_break_labels ] if not target_records: return None, 0, 0 # 解析日期 target_dates = [] for item in target_records: d = self._parse_to_date(item.get("MTIME_W")) if d: target_dates.append(d) if not target_dates: return None, 0, 0 # 计算时间差(倒序数据:最新在0,最早在-1) last_date = target_dates[0] first_date = target_dates[-1] cumulative_days = (last_date - first_date).days today = date.today() days_to_today = (today - first_date).days if first_date else 0 return first_date, cumulative_days, days_to_today def _get_pre_winter_break_workinfo(self, inner_data_list): """提取冬休前的上一个有效工况""" if not inner_data_list: return None # 倒序遍历,跳过冬休,找第一个有效工况 for record in inner_data_list: current_work = record.get("workinfoname") if current_work and current_work not in self.winter_break_labels: # 优先返回等效新工况 return self.old_to_new_map.get(current_work, current_work) return None def _validate_point_id(self, inner_list): """校验内嵌列表内point_id是否一致""" if not inner_list: return None base_point_id = inner_list[0].get("point_id") for item in inner_list: if item.get("point_id") != base_point_id: return None return base_point_id def predict(self, data_2d_list): """ 公有核心方法:执行工况预测 :param data_2d_list: 二维倒序数据列表,格式 [[{},{},{}], [{},{},{}]] :return: 一维结果列表,格式 [{}, {}, {}] """ final_result = [] for inner_data_list in data_2d_list: if not isinstance(inner_data_list, list) or len(inner_data_list) == 0: continue # 取最新记录并复制(避免修改原数据) latest_record = inner_data_list[0].copy() point_id = self._validate_point_id(inner_data_list) # 校验point_id if not point_id: latest_record.update({ "status": "fail", "current_workinfo": None, "first_measure_date": None, "days_from_first_to_today": None, "next_workinfo": None, "judge_based_workinfo": None, "error_msg": "point_id不一致或缺失" }) final_result.append(latest_record) continue # 获取当前工况并标准化 current_workinfo = latest_record.get("workinfoname") if not current_workinfo or self._standardize_name(current_workinfo) not in self.std_to_canonical: latest_record.update({ "status": "fail", "current_workinfo": None, "first_measure_date": None, "days_from_first_to_today": None, "next_workinfo": None, "judge_based_workinfo": None, "error_msg": "工况无效或缺失" }) final_result.append(latest_record) continue # 冬休逻辑:取冬休前工况作为判断基准 if current_workinfo in self.winter_break_labels: judge_based_workinfo = self._get_pre_winter_break_workinfo(inner_data_list) if not judge_based_workinfo: latest_record.update({ "status": "fail", "current_workinfo": current_workinfo, "first_measure_date": None, "days_from_first_to_today": None, "next_workinfo": None, "judge_based_workinfo": None, "error_msg": "冬休前未找到有效工况" }) final_result.append(latest_record) continue else: # 非冬休:优先使用等效新工况作为判断基准 judge_based_workinfo = self.old_to_new_map.get(current_workinfo, current_workinfo) # 计算时间统计信息 first_dt, cumulative_days, days_to_today = self._get_time_statistics( inner_data_list, judge_based_workinfo ) first_measure_date = first_dt.strftime("%Y-%m-%d") if first_dt else None # 推导下一工况(以base_periods中的名称为准) std_judge_work = self._standardize_name(judge_based_workinfo) group_id = self.std_to_group.get(std_judge_work, "STATIC") rule = self.transition_rules.get(group_id, {}) trigger_days = rule.get("trigger_days") next_candidates = rule.get("next", []) if trigger_days is not None and cumulative_days >= trigger_days and next_candidates: # 切换工况:取base_periods中的规范名称 next_workinfo = next_candidates[0] else: # 不切换:有等效则返回新工况,无则返回旧工况 next_workinfo = self.old_to_new_map.get(judge_based_workinfo, judge_based_workinfo) # 组装最终结果 latest_record.update({ "status": "success", "current_workinfo": current_workinfo, # 保留原始输入工况 "judge_based_workinfo": judge_based_workinfo, # 实际判断用的工况(新工况优先) "first_measure_date": first_measure_date, "days_from_first_to_today": days_to_today, "next_workinfo": next_workinfo, # 推导结果:有等效返回新工况,无则返回旧工况 "point_id": point_id, "error_msg": "" }) final_result.append(latest_record) return final_result