1.修改工况推理接口

This commit is contained in:
whm
2026-01-29 17:45:08 +08:00
parent 8b3796dd5a
commit 2eb7b9b5c1
2 changed files with 310 additions and 4 deletions

View File

@@ -30,6 +30,7 @@ from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService from ..services.level_data import LevelDataService
from ..services.original_data import OriginalDataService from ..services.original_data import OriginalDataService
from ..services.comprehensive import ComprehensiveDataService from ..services.comprehensive import ComprehensiveDataService
from ..utils.get_operating_mode import OperatingModePredictor
import logging import logging
router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"]) router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -470,12 +471,14 @@ def get_settlement_by_linecode(
settlement_service = SettlementDataService() settlement_service = SettlementDataService()
result = settlement_service.get_settlement_by_linecode(db, linecode) result = settlement_service.get_settlement_by_linecode(db, linecode)
settlement_data = result['settlement_data']
predictor = OperatingModePredictor()
result_1d = predictor.predict(settlement_data)
return DataResponse( return DataResponse(
code=ResponseCode.SUCCESS, code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据", message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据",
total=len(result['settlement_data']), total=len(result_1d),
data=result['settlement_data'] data=result_1d
) )
except Exception as e: except Exception as e:
@@ -724,3 +727,4 @@ def get_checkpoint_by_point(request: LevelDataQueryRequest, db: Session = Depend
total=0, total=0,
data=[] data=[]
) )

View File

@@ -0,0 +1,302 @@
from datetime import datetime, date
class OperatingModePredictor:
"""
工况预测类(处理二维倒序数据,返回一维列表,仅保留各内嵌列表最新记录)
功能根据输入的带时间序列的工况数据推导每个监测点point_id的下一阶段工况
特性:
1. 输入为二维列表每个内嵌列表对应一个point_id且为倒序排列最新记录在索引0
2. 输出为一维列表,仅保留每个内嵌列表的最新记录,新增工况推导结果字段
3. 无需切换工况时next_workinfo返回当前工况名称需要切换时返回目标工况名称
4. 时间计算仅按日期(天)维度,忽略时分秒
"""
def __init__(self):
"""初始化类,加载内置的工况配置、分组规则和切换触发规则"""
# 基础工况配置(键:工况名称,值:基础监测周期参考)
self.base_periods = self._load_base_periods()
# 工况分组(将同义不同格式的工况归类,共用切换规则)
self.condition_group = self._load_condition_group()
# 工况切换触发规则分组ID触发天数+目标工况候选)
self.group_transition_rules = self._load_group_transition_rules()
def _load_base_periods(self):
"""加载基础工况配置(私有方法,内部使用)"""
return {
"仰拱底板施工完成后第1个月": 7,
"仰拱底板施工完成后第2至3个月": 14,
"仰拱底板施工完成后3个月以后": 30,
"仰拱(底板)施工完成后第1个月": 7,
"仰拱(底板)施工完成后第2至3个月": 14,
"仰拱(底板)施工完成后3个月以后": 30,
"无砟轨道铺设后第1至3个月": 30,
"无砟轨道铺设后4至12个月": 90,
"无砟轨道铺设后12个月以后": 180,
"墩台施工到一定高度": 30,
"墩台混凝土施工": 30,
"预制梁桥,架梁前": 30,
"预制梁桥,预制梁架设前": 1,
"预制梁桥,预制梁架设后": 7,
"桥位施工桥梁,制梁前": 30,
"桥位施工桥梁,上部结构施工中": 1,
"架桥机(运梁车)通过": 7,
"桥梁主体工程完工后,第1至3个月": 7,
"桥梁主体工程完工后第4至6个月": 14,
"桥梁主体工程完工后,6个月以后": 30,
"轨道铺设期间,前": 30,
"轨道铺设期间,后": 14,
"轨道铺设完成后第1个月": 14,
"轨道铺设完成后2至3个月": 30,
"轨道铺设完成后4至12个月": 90,
"轨道铺设完成后12个月以后": 180,
"铺路或堆载,一般情况": 1,
"填筑或堆载,一般情况": 1,
"铺路或堆载,沉降量突变情况": 1,
"填筑或堆载,两次填筑间隔时间较长情况": 3,
"铺路或堆载,两次铺路间隔时间较长情况": 3,
"堆载预压或路基填筑完成第1至3个月": 7,
"堆载预压或路基填筑完成第4至6个月": 14,
"堆载预压或路基填筑完成6个月以后": 30,
"架桥机(运梁车) 首次通过前": 1,
"架桥机(运梁车) 首次通过后前3天": 1,
"架桥机(运梁车) 首次通过后": 7,
"轨道板(道床)铺设后第1个月": 14,
"轨道板(道床)铺设后第2至3个月": 30,
"轨道板(道床)铺设后3个月以后": 90
}
def _load_condition_group(self):
"""加载工况分组规则(私有方法,内部使用)"""
return {
"仰拱底板施工完成后第1个月": "YG_DIBAN_1",
"仰拱(底板)施工完成后第1个月": "YG_DIBAN_1",
"仰拱底板施工完成后第2至3个月": "YG_DIBAN_2_3",
"仰拱(底板)施工完成后第2至3个月": "YG_DIBAN_2_3",
"仰拱底板施工完成后3个月以后": "YG_DIBAN_AFTER_3",
"仰拱(底板)施工完成后3个月以后": "YG_DIBAN_AFTER_3",
"架桥机(运梁车) 首次通过前": "JQJ_FIRST_BEFORE",
"架桥机(运梁车) 首次通过后前3天": "JQJ_FIRST_AFTER_3D",
"架桥机(运梁车) 首次通过后": "JQJ_FIRST_AFTER",
"堆载预压或路基填筑完成第1至3个月": "DZYY_1_3",
"堆载预压或路基填筑完成第4至6个月": "DZYY_4_6",
"堆载预压或路基填筑完成6个月以后": "DZYY_AFTER_6",
"轨道板(道床)铺设后第1个月": "GDB_1",
"轨道板(道床)铺设后第2至3个月": "GDB_2_3",
"轨道板(道床)铺设后3个月以后": "GDB_AFTER_3",
"预制梁桥,预制梁架设前": "YZLQ_BEFORE_JS",
"预制梁桥,预制梁架设后": "YZLQ_AFTER_JS",
"桥梁主体工程完工后,第1至3个月": "QL_ZHUTI_1_3",
"桥梁主体工程完工后第4至6个月": "QL_ZHUTI_4_6",
"桥梁主体工程完工后,6个月以后": "QL_ZHUTI_AFTER_6",
"轨道铺设完成后第1个月": "GD_1",
"轨道铺设完成后2至3个月": "GD_2_3",
"轨道铺设完成后4至12个月": "GD_4_12",
"轨道铺设完成后12个月以后": "GD_AFTER_12",
"无砟轨道铺设后第1至3个月": "WZGD_1_3",
"无砟轨道铺设后4至12个月": "WZGD_4_12",
"无砟轨道铺设后12个月以后": "WZGD_AFTER_12",
"墩台施工到一定高度": "STATIC",
"墩台混凝土施工": "STATIC",
"预制梁桥,架梁前": "STATIC",
"桥位施工桥梁,制梁前": "STATIC",
"桥位施工桥梁,上部结构施工中": "STATIC",
"架桥机(运梁车)通过": "STATIC",
"轨道铺设期间,前": "STATIC",
"轨道铺设期间,后": "STATIC",
"铺路或堆载,一般情况": "STATIC",
"填筑或堆载,一般情况": "STATIC",
"铺路或堆载,沉降量突变情况": "STATIC",
"填筑或堆载,两次填筑间隔时间较长情况": "STATIC",
"铺路或堆载,两次铺路间隔时间较长情况": "STATIC"
}
def _load_group_transition_rules(self):
"""加载工况切换触发规则(私有方法,内部使用)"""
return {
"YG_DIBAN_1": {"trigger_days": 30, "next_candidates": ["仰拱底板施工完成后第2至3个月", "仰拱(底板)施工完成后第2至3个月"]},
"YG_DIBAN_2_3": {"trigger_days": 60, "next_candidates": ["仰拱底板施工完成后3个月以后", "仰拱(底板)施工完成后3个月以后"]},
"YG_DIBAN_AFTER_3": {"trigger_days": None, "next_candidates": None},
"JQJ_FIRST_BEFORE": {"trigger_days": 1, "next_candidates": ["架桥机(运梁车) 首次通过后前3天"]},
"JQJ_FIRST_AFTER_3D": {"trigger_days": 3, "next_candidates": ["架桥机(运梁车) 首次通过后"]},
"JQJ_FIRST_AFTER": {"trigger_days": None, "next_candidates": None},
"DZYY_1_3": {"trigger_days": 90, "next_candidates": ["堆载预压或路基填筑完成第4至6个月"]},
"DZYY_4_6": {"trigger_days": 90, "next_candidates": ["堆载预压或路基填筑完成6个月以后"]},
"DZYY_AFTER_6": {"trigger_days": None, "next_candidates": None},
"GDB_1": {"trigger_days": 30, "next_candidates": ["轨道板(道床)铺设后第2至3个月"]},
"GDB_2_3": {"trigger_days": 30, "next_candidates": ["轨道板(道床)铺设后3个月以后"]},
"GDB_AFTER_3": {"trigger_days": None, "next_candidates": None},
"YZLQ_BEFORE_JS": {"trigger_days": 1, "next_candidates": ["架桥机(运梁车)通过"]},
"YZLQ_AFTER_JS": {"trigger_days": 7, "next_candidates": ["桥梁主体工程完工后,第1至3个月"]},
"QL_ZHUTI_1_3": {"trigger_days": 90, "next_candidates": ["桥梁主体工程完工后第4至6个月"]},
"QL_ZHUTI_4_6": {"trigger_days": 90, "next_candidates": ["桥梁主体工程完工后,6个月以后"]},
"QL_ZHUTI_AFTER_6": {"trigger_days": None, "next_candidates": None},
"GD_1": {"trigger_days": 30, "next_candidates": ["轨道铺设完成后2至3个月"]},
"GD_2_3": {"trigger_days": 60, "next_candidates": ["轨道铺设完成后4至12个月"]},
"GD_4_12": {"trigger_days": 240, "next_candidates": ["轨道铺设完成后12个月以后"]},
"GD_AFTER_12": {"trigger_days": None, "next_candidates": None},
"WZGD_1_3": {"trigger_days": 90, "next_candidates": ["无砟轨道铺设后4至12个月"]},
"WZGD_4_12": {"trigger_days": 240, "next_candidates": ["无砟轨道铺设后12个月以后"]},
"WZGD_AFTER_12": {"trigger_days": None, "next_candidates": None},
"STATIC": {"trigger_days": None, "next_candidates": None}
}
def _parse_to_date(self, time_str):
"""
私有辅助方法:将时间字符串解析为日期对象,仅保留年月日,忽略时分秒
:param time_str: 时间字符串,格式为 "YYYY-MM-DD HH:MM:SS"
:return: date对象 / None解析失败时
"""
if not time_str:
return None
try:
dt = datetime.strptime(str(time_str).strip(), "%Y-%m-%d %H:%M:%S")
return dt.date()
except ValueError:
return None
def _get_time_statistics_from_reversed(self, data, workinfo):
"""
私有辅助方法:从倒序数据中提取时间统计信息
:param data: 单个point_id的倒序数据列表
:param workinfo: 当前工况名称
:return: 元组 (首次测量日期date对象, 首次到末次持续天数, 今日与首次测量天数差)
"""
# 筛选当前工况的有效记录
reversed_records = [d for d in data if d.get("workinfoname") == workinfo]
if not reversed_records:
return None, 0, 0
# 解析所有有效记录的日期
reversed_dates = []
for item in reversed_records:
d = self._parse_to_date(item.get("MTIME_W"))
if d:
reversed_dates.append(d)
if not reversed_dates:
return None, 0, 0
# 提取倒序数据的最新、最旧日期
last_date = reversed_dates[0]
first_date = reversed_dates[-1]
# 计算累计天数和今日与首次测量的天数差
cumulative_days = (last_date - first_date).days
today = date.today()
days_to_today = (today - first_date).days if first_date else 0
return first_date, cumulative_days, days_to_today
def _match_next_condition(self, current_name, candidates):
"""
私有辅助方法:按符号风格匹配下一个工况名称(全角/半角括号对应)
:param current_name: 当前工况名称
:param candidates: 目标工况候选列表
:return: 匹配的工况名称 / 当前工况名称(无候选时)
"""
if not candidates:
return current_name
# 优先匹配全角括号工况
if "" in current_name:
for cand in candidates:
if "" in cand:
return cand
# 再匹配半角括号工况
if "(" in current_name:
for cand in candidates:
if "(" in cand:
return cand
# 无对应符号时,返回第一个候选工况
return candidates[0]
def _validate_point_id(self, inner_list):
"""
私有辅助方法校验内嵌列表内所有元素的point_id是否一致
:param inner_list: 单个point_id的倒序数据列表
:return: point_id / None校验失败或列表为空时
"""
if not inner_list:
return None
base_point_id = inner_list[0].get("point_id")
for item in inner_list:
if item.get("point_id") != base_point_id:
return None
return base_point_id
def predict(self, data_2d_list):
"""
公有核心方法:执行工况预测,处理二维输入数据,返回一维结果列表
:param data_2d_list: 二维倒序数据列表,格式 [[{},{},{}], [{},{},{}]]
:return: 一维结果列表,格式 [{}, {}, {}],每个元素为对应内嵌列表的最新记录+推导字段
"""
final_result_1d = []
# 遍历二维列表逐个处理每个point_id的内嵌数据
for inner_data_list in data_2d_list:
# 跳过空列表
if not isinstance(inner_data_list, list) or len(inner_data_list) == 0:
continue
# 1. 提取当前内嵌列表的最新记录倒序数据索引0为最新
latest_record = inner_data_list[0].copy()
# 2. 校验point_id一致性
point_id = self._validate_point_id(inner_data_list)
if not point_id:
latest_record.update({
"status": "fail",
"current_workinfo": None,
"first_measure_date": None,
"days_from_first_to_today": None,
"next_workinfo": None,
"error_msg": "point_id不一致或缺失"
})
final_result_1d.append(latest_record)
continue
# 3. 提取并校验当前工况
current_workinfo = latest_record.get("workinfoname")
if not current_workinfo or current_workinfo not in self.base_periods:
latest_record.update({
"status": "fail",
"current_workinfo": None,
"first_measure_date": None,
"days_from_first_to_today": None,
"next_workinfo": None,
"error_msg": "工况无效或缺失"
})
final_result_1d.append(latest_record)
continue
# 4. 提取时间统计信息
first_dt, cumulative_days, days_to_today = self._get_time_statistics_from_reversed(
inner_data_list, current_workinfo
)
first_measure_date = first_dt.strftime("%Y-%m-%d") if first_dt else None
# 5. 判断工况切换条件,推导下一工况
group_id = self.condition_group.get(current_workinfo, "STATIC")
rule = self.group_transition_rules.get(group_id, {})
trigger_days = rule.get("trigger_days")
next_candidates = rule.get("next_candidates", [])
if trigger_days is not None and cumulative_days >= trigger_days:
# 满足切换条件:返回目标工况
next_workname = self._match_next_condition(current_workinfo, next_candidates)
else:
# 不满足切换条件:返回当前工况
next_workname = current_workinfo
# 6. 组装结果字段,更新最新记录
latest_record.update({
"status": "success",
"current_workinfo": current_workinfo,
"first_measure_date": first_measure_date,
"days_from_first_to_today": days_to_today,
"next_workinfo": next_workname,
"point_id": point_id
})
# 7. 加入最终结果列表
final_result_1d.append(latest_record)
return final_result_1d