229 lines
9.6 KiB
Python
229 lines
9.6 KiB
Python
from datetime import datetime, date
|
||
|
||
from .operating_mode_config import (
|
||
BASE_PERIODS,
|
||
OLD_TO_NEW_MAP,
|
||
CONDITION_GROUP,
|
||
TRANSITION_RULES,
|
||
WINTER_BREAK_LABELS,
|
||
)
|
||
|
||
|
||
class OperatingModePredictor:
|
||
"""
|
||
工况预测类(处理二维倒序数据,返回一维列表,仅保留各内嵌列表最新记录)
|
||
功能:根据输入的带时间序列的工况数据,推导每个监测点(point_id)的下一阶段工况
|
||
特性:
|
||
1. 输入为二维列表,每个内嵌列表对应一个point_id,且为倒序排列(最新记录在索引0)
|
||
2. 输出为一维列表,仅保留每个内嵌列表的最新记录,新增工况推导结果字段
|
||
3. 推导规则:有等效映射返回新工况,无等效保留旧工况;切换下一工况以base_periods为准
|
||
4. 时间计算仅按日期(天)维度,忽略时分秒
|
||
5. 冬休场景:仅以冬休前上一有效工况为判断基准,切换规则与非冬休完全一致
|
||
6. 适配中英文括号、逗号、空格,内部标准化匹配,外部返回规范名称
|
||
"""
|
||
|
||
def __init__(self):
|
||
"""初始化类,加载核心配置"""
|
||
# 1. 基础工况配置(最终返回的规范名称,含所有新旧工况)
|
||
self.base_periods = BASE_PERIODS.copy()
|
||
# 2. 旧→新等效映射(优先返回新工况)
|
||
self.old_to_new_map = OLD_TO_NEW_MAP.copy()
|
||
# 3. 工况分组(同义工况归为同一分组,复用切换规则)
|
||
self.condition_group = CONDITION_GROUP.copy()
|
||
# 4. 切换触发规则(沿用原逻辑,触发天数+目标工况)
|
||
self.transition_rules = TRANSITION_RULES.copy()
|
||
# 5. 冬休标识
|
||
self.winter_break_labels = WINTER_BREAK_LABELS.copy()
|
||
|
||
# 辅助映射:标准化名称→base_periods中的规范名称(用于最终返回)
|
||
self.std_to_canonical = {
|
||
self._standardize_name(name): name for name in self.base_periods.keys()
|
||
}
|
||
# 辅助映射:标准化名称→分组ID
|
||
self.std_to_group = {
|
||
self._standardize_name(name): group_id
|
||
for name, group_id in self.condition_group.items()
|
||
}
|
||
|
||
def _standardize_name(self, name):
|
||
"""
|
||
标准化工况名称(内部匹配用):去空格、统一中英文符号
|
||
:param name: 原始工况名称
|
||
:return: 标准化后的名称
|
||
"""
|
||
if not name:
|
||
return ""
|
||
# 去所有空格
|
||
std_name = name.replace(" ", "").strip()
|
||
# 统一中英文符号
|
||
replace_map = {
|
||
"(": "(", ")": ")", ",": ",", "。": ",", "~": "至",
|
||
";": ";", ":": ":", "'": "'", """: "\""
|
||
}
|
||
for old, new in replace_map.items():
|
||
std_name = std_name.replace(old, new)
|
||
return std_name
|
||
|
||
def _parse_to_date(self, time_str):
|
||
"""解析时间字符串为date对象(仅保留年月日)"""
|
||
if not time_str:
|
||
return None
|
||
try:
|
||
dt = datetime.strptime(str(time_str).strip(), "%Y-%m-%d %H:%M:%S")
|
||
return dt.date()
|
||
except ValueError:
|
||
return None
|
||
|
||
def _get_time_statistics(self, data, target_workinfo):
|
||
"""从倒序数据中提取目标工况的时间统计信息"""
|
||
# 筛选目标工况的有效记录
|
||
target_records = [
|
||
d for d in data
|
||
if self._standardize_name(d.get("workinfoname")) == self._standardize_name(target_workinfo)
|
||
and d.get("workinfoname") not in self.winter_break_labels
|
||
]
|
||
if not target_records:
|
||
return None, 0, 0
|
||
|
||
# 解析日期
|
||
target_dates = []
|
||
for item in target_records:
|
||
d = self._parse_to_date(item.get("MTIME_W"))
|
||
if d:
|
||
target_dates.append(d)
|
||
if not target_dates:
|
||
return None, 0, 0
|
||
|
||
# 计算时间差(倒序数据:最新在0,最早在-1)
|
||
last_date = target_dates[0]
|
||
first_date = target_dates[-1]
|
||
cumulative_days = (last_date - first_date).days
|
||
today = date.today()
|
||
days_to_today = (today - first_date).days if first_date else 0
|
||
|
||
return first_date, cumulative_days, days_to_today
|
||
|
||
def _get_pre_winter_break_workinfo(self, inner_data_list):
|
||
"""提取冬休前的上一个有效工况"""
|
||
if not inner_data_list:
|
||
return None
|
||
# 倒序遍历,跳过冬休,找第一个有效工况
|
||
for record in inner_data_list:
|
||
current_work = record.get("workinfoname")
|
||
if current_work and current_work not in self.winter_break_labels:
|
||
# 优先返回等效新工况
|
||
return self.old_to_new_map.get(current_work, current_work)
|
||
return None
|
||
|
||
def _validate_point_id(self, inner_list):
|
||
"""校验内嵌列表内point_id是否一致"""
|
||
if not inner_list:
|
||
return None
|
||
base_point_id = inner_list[0].get("point_id")
|
||
for item in inner_list:
|
||
if item.get("point_id") != base_point_id:
|
||
return None
|
||
return base_point_id
|
||
|
||
def predict(self, data_2d_list):
|
||
"""
|
||
公有核心方法:执行工况预测
|
||
:param data_2d_list: 二维倒序数据列表,格式 [[{},{},{}], [{},{},{}]]
|
||
:return: 一维结果列表,格式 [{}, {}, {}]
|
||
"""
|
||
final_result = []
|
||
|
||
for inner_data_list in data_2d_list:
|
||
if not isinstance(inner_data_list, list) or len(inner_data_list) == 0:
|
||
continue
|
||
|
||
# 取最新记录并复制(避免修改原数据)
|
||
latest_record = inner_data_list[0].copy()
|
||
point_id = self._validate_point_id(inner_data_list)
|
||
|
||
# 校验point_id
|
||
if not point_id:
|
||
latest_record.update({
|
||
"status": "fail",
|
||
"current_workinfo": None,
|
||
"first_measure_date": None,
|
||
"days_from_first_to_today": None,
|
||
"next_workinfo": None,
|
||
"judge_based_workinfo": None,
|
||
"error_msg": "point_id不一致或缺失"
|
||
})
|
||
final_result.append(latest_record)
|
||
continue
|
||
|
||
# 获取当前工况并标准化
|
||
current_workinfo = latest_record.get("workinfoname")
|
||
if not current_workinfo or self._standardize_name(current_workinfo) not in self.std_to_canonical:
|
||
latest_record.update({
|
||
"status": "fail",
|
||
"current_workinfo": None,
|
||
"first_measure_date": None,
|
||
"days_from_first_to_today": None,
|
||
"next_workinfo": None,
|
||
"judge_based_workinfo": None,
|
||
"error_msg": "工况无效或缺失"
|
||
})
|
||
final_result.append(latest_record)
|
||
continue
|
||
|
||
# 冬休逻辑:取冬休前工况作为判断基准
|
||
if current_workinfo in self.winter_break_labels:
|
||
judge_based_workinfo = self._get_pre_winter_break_workinfo(inner_data_list)
|
||
if not judge_based_workinfo:
|
||
latest_record.update({
|
||
"status": "fail",
|
||
"current_workinfo": current_workinfo,
|
||
"first_measure_date": None,
|
||
"days_from_first_to_today": None,
|
||
"next_workinfo": None,
|
||
"judge_based_workinfo": None,
|
||
"error_msg": "冬休前未找到有效工况"
|
||
})
|
||
final_result.append(latest_record)
|
||
continue
|
||
else:
|
||
# 非冬休:优先使用等效新工况作为判断基准
|
||
judge_based_workinfo = self.old_to_new_map.get(current_workinfo, current_workinfo)
|
||
|
||
# 计算时间统计信息
|
||
first_dt, cumulative_days, days_to_today = self._get_time_statistics(
|
||
inner_data_list, judge_based_workinfo
|
||
)
|
||
first_measure_date = first_dt.strftime("%Y-%m-%d") if first_dt else None
|
||
|
||
# 推导下一工况(以base_periods中的名称为准)
|
||
std_judge_work = self._standardize_name(judge_based_workinfo)
|
||
group_id = self.std_to_group.get(std_judge_work, "STATIC")
|
||
rule = self.transition_rules.get(group_id, {})
|
||
trigger_days = rule.get("trigger_days")
|
||
next_candidates = rule.get("next", [])
|
||
|
||
if trigger_days is not None and cumulative_days >= trigger_days and next_candidates:
|
||
# 切换工况:取base_periods中的规范名称
|
||
next_workinfo = next_candidates[0]
|
||
else:
|
||
# 不切换:有等效则返回新工况,无则返回旧工况
|
||
next_workinfo = self.old_to_new_map.get(judge_based_workinfo, judge_based_workinfo)
|
||
|
||
# 组装最终结果
|
||
latest_record.update({
|
||
"status": "success",
|
||
"current_workinfo": current_workinfo, # 保留原始输入工况
|
||
"judge_based_workinfo": judge_based_workinfo, # 实际判断用的工况(新工况优先)
|
||
"first_measure_date": first_measure_date,
|
||
"days_from_first_to_today": days_to_today,
|
||
"next_workinfo": next_workinfo, # 推导结果:有等效返回新工况,无则返回旧工况
|
||
"point_id": point_id,
|
||
"error_msg": ""
|
||
})
|
||
|
||
final_result.append(latest_record)
|
||
|
||
return final_result
|
||
|
||
|