Files
railway_cloud/app/utils/get_operating_mode.py

229 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, date
from .operating_mode_config import (
BASE_PERIODS,
OLD_TO_NEW_MAP,
CONDITION_GROUP,
TRANSITION_RULES,
WINTER_BREAK_LABELS,
)
class OperatingModePredictor:
"""
工况预测类(处理二维倒序数据,返回一维列表,仅保留各内嵌列表最新记录)
功能根据输入的带时间序列的工况数据推导每个监测点point_id的下一阶段工况
特性:
1. 输入为二维列表每个内嵌列表对应一个point_id且为倒序排列最新记录在索引0
2. 输出为一维列表,仅保留每个内嵌列表的最新记录,新增工况推导结果字段
3. 推导规则有等效映射返回新工况无等效保留旧工况切换下一工况以base_periods为准
4. 时间计算仅按日期(天)维度,忽略时分秒
5. 冬休场景:仅以冬休前上一有效工况为判断基准,切换规则与非冬休完全一致
6. 适配中英文括号、逗号、空格,内部标准化匹配,外部返回规范名称
"""
def __init__(self):
"""初始化类,加载核心配置"""
# 1. 基础工况配置(最终返回的规范名称,含所有新旧工况)
self.base_periods = BASE_PERIODS.copy()
# 2. 旧→新等效映射(优先返回新工况)
self.old_to_new_map = OLD_TO_NEW_MAP.copy()
# 3. 工况分组(同义工况归为同一分组,复用切换规则)
self.condition_group = CONDITION_GROUP.copy()
# 4. 切换触发规则(沿用原逻辑,触发天数+目标工况)
self.transition_rules = TRANSITION_RULES.copy()
# 5. 冬休标识
self.winter_break_labels = WINTER_BREAK_LABELS.copy()
# 辅助映射标准化名称→base_periods中的规范名称用于最终返回
self.std_to_canonical = {
self._standardize_name(name): name for name in self.base_periods.keys()
}
# 辅助映射标准化名称→分组ID
self.std_to_group = {
self._standardize_name(name): group_id
for name, group_id in self.condition_group.items()
}
def _standardize_name(self, name):
"""
标准化工况名称(内部匹配用):去空格、统一中英文符号
:param name: 原始工况名称
:return: 标准化后的名称
"""
if not name:
return ""
# 去所有空格
std_name = name.replace(" ", "").strip()
# 统一中英文符号
replace_map = {
"": "(", "": ")", "": ",", "": ",", "~": "",
"": ";", "": ":", "": "'", "": "\""
}
for old, new in replace_map.items():
std_name = std_name.replace(old, new)
return std_name
def _parse_to_date(self, time_str):
"""解析时间字符串为date对象仅保留年月日"""
if not time_str:
return None
try:
dt = datetime.strptime(str(time_str).strip(), "%Y-%m-%d %H:%M:%S")
return dt.date()
except ValueError:
return None
def _get_time_statistics(self, data, target_workinfo):
"""从倒序数据中提取目标工况的时间统计信息"""
# 筛选目标工况的有效记录
target_records = [
d for d in data
if self._standardize_name(d.get("workinfoname")) == self._standardize_name(target_workinfo)
and d.get("workinfoname") not in self.winter_break_labels
]
if not target_records:
return None, 0, 0
# 解析日期
target_dates = []
for item in target_records:
d = self._parse_to_date(item.get("MTIME_W"))
if d:
target_dates.append(d)
if not target_dates:
return None, 0, 0
# 计算时间差倒序数据最新在0最早在-1
last_date = target_dates[0]
first_date = target_dates[-1]
cumulative_days = (last_date - first_date).days
today = date.today()
days_to_today = (today - first_date).days if first_date else 0
return first_date, cumulative_days, days_to_today
def _get_pre_winter_break_workinfo(self, inner_data_list):
"""提取冬休前的上一个有效工况"""
if not inner_data_list:
return None
# 倒序遍历,跳过冬休,找第一个有效工况
for record in inner_data_list:
current_work = record.get("workinfoname")
if current_work and current_work not in self.winter_break_labels:
# 优先返回等效新工况
return self.old_to_new_map.get(current_work, current_work)
return None
def _validate_point_id(self, inner_list):
"""校验内嵌列表内point_id是否一致"""
if not inner_list:
return None
base_point_id = inner_list[0].get("point_id")
for item in inner_list:
if item.get("point_id") != base_point_id:
return None
return base_point_id
def predict(self, data_2d_list):
"""
公有核心方法:执行工况预测
:param data_2d_list: 二维倒序数据列表,格式 [[{},{},{}], [{},{},{}]]
:return: 一维结果列表,格式 [{}, {}, {}]
"""
final_result = []
for inner_data_list in data_2d_list:
if not isinstance(inner_data_list, list) or len(inner_data_list) == 0:
continue
# 取最新记录并复制(避免修改原数据)
latest_record = inner_data_list[0].copy()
point_id = self._validate_point_id(inner_data_list)
# 校验point_id
if not point_id:
latest_record.update({
"status": "fail",
"current_workinfo": None,
"first_measure_date": None,
"days_from_first_to_today": None,
"next_workinfo": None,
"judge_based_workinfo": None,
"error_msg": "point_id不一致或缺失"
})
final_result.append(latest_record)
continue
# 获取当前工况并标准化
current_workinfo = latest_record.get("workinfoname")
if not current_workinfo or self._standardize_name(current_workinfo) not in self.std_to_canonical:
latest_record.update({
"status": "fail",
"current_workinfo": None,
"first_measure_date": None,
"days_from_first_to_today": None,
"next_workinfo": None,
"judge_based_workinfo": None,
"error_msg": "工况无效或缺失"
})
final_result.append(latest_record)
continue
# 冬休逻辑:取冬休前工况作为判断基准
if current_workinfo in self.winter_break_labels:
judge_based_workinfo = self._get_pre_winter_break_workinfo(inner_data_list)
if not judge_based_workinfo:
latest_record.update({
"status": "fail",
"current_workinfo": current_workinfo,
"first_measure_date": None,
"days_from_first_to_today": None,
"next_workinfo": None,
"judge_based_workinfo": None,
"error_msg": "冬休前未找到有效工况"
})
final_result.append(latest_record)
continue
else:
# 非冬休:优先使用等效新工况作为判断基准
judge_based_workinfo = self.old_to_new_map.get(current_workinfo, current_workinfo)
# 计算时间统计信息
first_dt, cumulative_days, days_to_today = self._get_time_statistics(
inner_data_list, judge_based_workinfo
)
first_measure_date = first_dt.strftime("%Y-%m-%d") if first_dt else None
# 推导下一工况以base_periods中的名称为准
std_judge_work = self._standardize_name(judge_based_workinfo)
group_id = self.std_to_group.get(std_judge_work, "STATIC")
rule = self.transition_rules.get(group_id, {})
trigger_days = rule.get("trigger_days")
next_candidates = rule.get("next", [])
if trigger_days is not None and cumulative_days >= trigger_days and next_candidates:
# 切换工况取base_periods中的规范名称
next_workinfo = next_candidates[0]
else:
# 不切换:有等效则返回新工况,无则返回旧工况
next_workinfo = self.old_to_new_map.get(judge_based_workinfo, judge_based_workinfo)
# 组装最终结果
latest_record.update({
"status": "success",
"current_workinfo": current_workinfo, # 保留原始输入工况
"judge_based_workinfo": judge_based_workinfo, # 实际判断用的工况(新工况优先)
"first_measure_date": first_measure_date,
"days_from_first_to_today": days_to_today,
"next_workinfo": next_workinfo, # 推导结果:有等效返回新工况,无则返回旧工况
"point_id": point_id,
"error_msg": ""
})
final_result.append(latest_record)
return final_result