1.筛选代码回退

This commit is contained in:
whm
2025-11-14 21:56:47 +08:00
parent f54fb2871e
commit acff776a15

View File

@@ -1,532 +1,176 @@
from fastapi import APIRouter, Depends, HTTPException, status,Query from datetime import datetime
from sqlalchemy.orm import Session from typing import List, Dict
from typing import List, Optional import warnings
from ..core.database import get_db import copy
from ..core.response_code import ResponseCode, ResponseMessage
from ..schemas.comprehensive_data import (
BatchSectionDataImportRequest,
BatchCheckpointDataImportRequest,
BatchSettlementDataImportRequest,
BatchLevelDataImportRequest,
BatchOriginalDataImportRequest,
DataImportResponse,
DataResponse,
SectionDataQueryRequest,
SettlementDataQueryRequest,
OriginalDataQueryRequest,
SettlementDataCheckpointQueryRequest,
LevelDataQueryRequest,
LinecodeRequest,
NYIDRequest,
SectionByAccountRequest,
PointByAccountRequest,
TodayDataRequest
)
from ..services.daily import DailyDataService
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService
from ..services.original_data import OriginalDataService
from ..services.comprehensive import ComprehensiveDataService
import logging
router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"])
logger = logging.getLogger(__name__)
# 实例化服务类 class ConstructionMonitorUtils:
section_service = SectionDataService() def __init__(self):
checkpoint_service = CheckpointService() # 原始工况周期映射表(保持不变)
settlement_service = SettlementDataService() self.base_periods = {
level_service = LevelDataService() "仰拱(底板)施工完成后第1个月": 7,
original_service = OriginalDataService() "仰拱(底板)施工完成后第2至3个月": 14,
comprehensive_service = ComprehensiveDataService() "仰拱(底板)施工完成后3个月以后": 30,
"无砟轨道铺设后第1至3个月": 30,
"无砟轨道铺设后4至12个月": 90,
"无砟轨道铺设后12个月以后": 180,
"墩台施工到一定高度": 30,
"墩台混凝土施工": 30,
"预制梁桥,架梁前": 30,
"预制梁桥,预制梁架设前": 1,
"预制梁桥,预制梁架设后": 7,
"桥位施工桥梁,制梁前": 30,
"桥位施工桥梁,上部结构施工中": 1,
"架桥机(运梁车)通过": 7,
"桥梁主体工程完工后,第1至3个月": 7, # 包含英文逗号的原始数据
"桥梁主体工程完工后第4至6个月": 14,
"桥梁主体工程完工后6个月以后": 30,
"轨道铺设期间,前": 30,
"轨道铺设期间,后": 14,
"轨道铺设完成后第1个月": 14,
"轨道铺设完成后2至3个月": 30,
"轨道铺设完成后4至12个月": 90,
"轨道铺设完成后12个月以后": 180,
"填筑或堆载,一般情况": 1,
"填筑或堆载,沉降量突变情况": 1,
"填筑或堆载,两次填筑间隔时间较长情况": 3,
"堆载预压或路基填筑完成第1至3个月": 7,
"堆载预压或路基填筑完成第4至6个月": 14,
"堆载预压或路基填筑完成6个月以后": 30,
"架桥机(运梁车)首次通过前": 1,
"架桥机(运梁车)首次通过后前3天": 1,
"架桥机(运梁车)首次通过后": 7,
"轨道板(道床)铺设后第1个月": 14,
"轨道板(道床)铺设后第2至3个月": 30,
"轨道板(道床)铺设后3个月以后": 90,
"架桥机(运梁车) 首次通过后": 7,
}
# 构建中英文括号+逗号兼容映射表
self.compatible_periods = self._build_compatible_brackets_map()
@router.post("/batch_import_sections", response_model=DataImportResponse) def _build_compatible_brackets_map(self) -> Dict[str, int]:
def batch_import_sections(request: BatchSectionDataImportRequest, db: Session = Depends(get_db)): """构建支持中英文括号、中英文逗号的兼容映射表"""
"""批量导入断面数据""" compatible_map = {}
try: for original_key, period in self.base_periods.items():
logger.info(f"Starting batch import sections, count: {len(request.data)}") # 1. 保留原始key
data_list = request.data compatible_map[original_key] = period
result = section_service.batch_import_sections(db, data_list)
logger.info(f"Batch import sections completed: {result['message']}") # 2. 生成中文括号版key
chinese_bracket_key = original_key.replace("(", "").replace(")", "")
# 统一响应格式 if chinese_bracket_key != original_key:
return DataImportResponse( compatible_map[chinese_bracket_key] = period
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'], # 3. 生成英文逗号转中文逗号版key
data={ chinese_comma_key = original_key.replace(",", "")
'total_count': result.get('total_count', 0), if chinese_comma_key != original_key:
'success_count': result.get('success_count', 0), compatible_map[chinese_comma_key] = period
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', []) # 4. 生成中文括号+中文逗号混合版key
} mixed_key = chinese_bracket_key.replace(",", "")
) if mixed_key != original_key and mixed_key not in compatible_map:
except Exception as e: compatible_map[mixed_key] = period
logger.error(f"Batch import sections failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_checkpoints", response_model=DataImportResponse)
def batch_import_checkpoints(request: BatchCheckpointDataImportRequest, db: Session = Depends(get_db)):
"""批量导入观测点数据"""
try:
logger.info(f"Starting batch import checkpoints, count: {len(request.data)}")
data_list = request.data
result = checkpoint_service.batch_import_checkpoints(db, data_list)
logger.info(f"Batch import checkpoints completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import checkpoints failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_settlement_data", response_model=DataImportResponse)
def batch_import_settlement_data(request: BatchSettlementDataImportRequest, db: Session = Depends(get_db)):
"""批量导入沉降数据"""
try:
logger.info(f"Starting batch import settlement data, count: {len(request.data)}")
data_list = request.data
result = settlement_service.batch_import_settlement_data(db, data_list)
logger.info(f"Batch import settlement data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import settlement data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_level_data", response_model=DataImportResponse)
def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session = Depends(get_db)):
"""批量导入水准数据"""
try:
logger.info(f"Starting batch import level data, count: {len(request.data)}")
data_list = request.data
result = level_service.batch_import_level_data(db, data_list)
logger.info(f"Batch import level data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import level data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_original_data", response_model=DataImportResponse)
def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Session = Depends(get_db)):
"""批量导入原始数据 - 数据中必须包含account_id字段"""
try:
logger.info(f"Starting batch import original data, count: {len(request.data)}")
# 验证数据中是否包含account_id
if not request.data or len(request.data) == 0:
return DataImportResponse(
code=ResponseCode.BAD_REQUEST,
message="导入数据不能为空",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
# 检查第一条数据是否包含account_id
if 'account_id' not in request.data[0]:
return DataImportResponse(
code=ResponseCode.BAD_REQUEST,
message="数据中必须包含account_id字段",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
data_list = request.data
result = original_service.batch_import_original_data(db, data_list)
logger.info(f"Batch import original data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import original data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
# 查询断面数据对应观察点数据
@router.post("/get_section", response_model=DataResponse)
def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点(支持分页)"""
try:
logger.info(f"Querying section data with params: {request.dict()}")
result = section_service.search_sections_with_checkpoints(
db,
id=request.id,
section_id=request.section_id,
mileage=request.mileage,
work_site=request.work_site,
number=request.number,
status=request.status,
account_id=request.account_id,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {result['total']} sections with checkpoints, returning {len(result['data'])} records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=result['total'],
data=result['data']
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 根据观测点id查询沉降数据
@router.post("/get_settlement", response_model=DataResponse)
def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据按上传时间倒序排序支持分页参数skip、limit"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
result = settlement_service.search_settlement_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {result['total']} settlement records, returning {len(result['data'])} records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=result['total'],
data=result['data']
)
except Exception as e:
logger.error(f"Query settlement data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 查询沉降数据+观测点数据
@router.post("/get_settlement_checkpoint", response_model=DataResponse)
def get_settlement_checkpoint(request: SettlementDataCheckpointQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据+观测点数据按上传时间倒序排序支持limit参数限制返回数量"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
result_data = settlement_service.search_settlement_checkpoint_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
linecode=request.linecode,
limit=request.limit
)
logger.info(f"Found {len(result_data)} settlement records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(result_data),
data=result_data
)
except Exception as e:
logger.error(f"Query settlement data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 根据期数id获取原始数据
@router.post("/get_original", response_model=DataResponse)
def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)):
"""获取水准数据+原始数据 - account_id可选不填则查询所有分表"""
try:
logger.info(f"Querying original data with params: {request.dict()}")
result = comprehensive_service.get_level_and_original_data(
db,
account_id=request.account_id, # 可选
id=request.id,
bfpcode=request.bfpcode,
bffb=request.bffb,
nyid=request.NYID,
linecode=request.linecode,
bfpl=request.bfpl
)
return DataResponse(
code=ResponseCode.SUCCESS,
message=result["message"],
total=result["count"],
data=result["data"]
)
except Exception as e:
logger.error(f"Query original data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
@router.post("/get_settlement_by_linecode", response_model=DataResponse)
def get_settlement_by_linecode(
request: LinecodeRequest, # 假设定义了接收linecode的请求模型
db: Session = Depends(get_db)
):
try:
linecode = request.linecode # 从请求体中获取linecode
logger.info(f"接口请求根据linecode={linecode}查询沉降数据")
settlement_service = SettlementDataService() return compatible_map
result = settlement_service.get_settlement_by_linecode(db, linecode)
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据",
total=len(result['settlement_data']),
data=result['settlement_data']
)
except Exception as e:
logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)
@router.post("/get_settlement_by_nyid", response_model=DataResponse)
def get_settlement_by_nyid(
request: NYIDRequest, # 假设定义了接收nyid的请求模型
db: Session = Depends(get_db)
):
try:
nyid = request.NYID # 从请求体中获取nyid
logger.info(f"接口请求根据nyid={nyid}查询沉降数据")
settlement = SettlementDataService()
# 获取模型实例列表
checkpoint_instances = settlement.get_by_nyid(db, nyid=nyid)
# 转为字典列表(核心修正)
checkpoint_data = [instance.__dict__ for instance in checkpoint_instances]
# 清理 SQLAlchemy 内部属性(可选,避免多余字段)
checkpoint_data = [{k: v for k, v in item.items() if not k.startswith('_sa_')} for item in checkpoint_data]
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(checkpoint_data)}条沉降数据nyid={nyid}",
total=len(checkpoint_data),
data=checkpoint_data
)
except Exception as e:
logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)
@router.post("/get_today_data", response_model=DataResponse)
def get_today_data(request: TodayDataRequest, db: Session = Depends(get_db)):
"""接口通过POST请求触发调度器中的 scheduled_get_max_nyid_by_point_id 定时任务"""
try:
# 获取请求参数如果需要从请求体中接收参数可通过request获取
# 示例如需接收account_id可通过 request.account_id 获取
# account_id = request.account_id # 根据根据实际需求决定是否需要
# 触发定时任务(如果需要传入参数,可在这里添加)
from ..utils.scheduler import scheduled_get_max_nyid_by_point_id
scheduled_get_max_nyid_by_point_id()
# 调用服务层获取数据
account_id = request.account_id
daily_service = DailyDataService()
# 如需使用请求参数,可修改为 daily_service.get_daily_data_by_account(db, account_id=account_id)
daily_data = daily_service.get_daily_data_by_account(db, account_id=account_id)
return DataResponse(
code=ResponseCode.SUCCESS,
message="定时时任务触发执行成功!任务已开始处理(具体结果查看系统日志)",
total=1 if daily_data else 0, # 根据实际数据是否存在调整total
data=daily_data
)
except Exception as e:
logger.error(f"接口触发定时任务失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"定时任务触发失败:{str(e)}",
total=0,
data={}
)
# account_id获取所有断面数据
@router.post("/get_all_section_by_account", response_model=DataResponse)
def get_all_section_by_account(request: SectionByAccountRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
account_id = request.account_id
section_service = SectionDataService()
result_data = section_service.get_by_account_id(db, account_id=account_id)
data_list = [item.to_dict() for item in result_data] if result_data else []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# section_id 获取所有观测点数据
@router.post("/get_all_checkpoint_by_section", response_model=DataResponse)
def get_all_checkpoint_by_section(request: SectionByAccountRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
section_id = request.section_id
checkpoint_service = CheckpointService()
result_data = checkpoint_service.get_by_section_id(db, section_id=section_id)
data_list = [item.to_dict() for item in result_data] if result_data else []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
@router.post("/get_checkpoint_by_point", response_model=DataResponse)
def get_checkpoint_by_point(request: PointByAccountRequest, db: Session = Depends(get_db)):
"""根据观测点ID获取观测点"""
try:
point_id = request.point_id
checkpoint_service = CheckpointService()
result_data = checkpoint_service.get_by_point_id(db, point_id=point_id)
# 使用 __dict__ 转换(过滤内部属性)
if result_data:
# 复制字典并排除 SQLAlchemy 内部属性
data_dict = result_data.__dict__.copy()
data_dict.pop('_sa_instance_state', None) # 移除ORM内部状态属性
data_list = [data_dict]
else:
data_list = []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 根据水准线路获取所有的测点id def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 1, current_date: datetime = None) -> Dict[str, List[Dict]]:
@router.post("/get_checkpoint_by_linecode", response_model=DataResponse) result = {"winter": [], "data": [], "error_data": []}
def get_checkpoint_by_point(request: LevelDataQueryRequest, db: Session = Depends(get_db)): if not input_data:
"""根据观测点ID获取观测点""" return result
try:
linecode = request.linecode calc_date = current_date.date() if current_date else datetime.now().date()
level_service = LevelDataService()
result_data = level_service.get_last_by_linecode(db, linecode=linecode) for point_idx, point_data in enumerate(input_data):
NYID = result_data.NYID if result_data else None if not point_data:
settlement_service = SettlementDataService() continue
result_data = settlement_service.get_by_nyid(db, nyid=NYID)
# 过滤逻辑:仅保留 useflag 存在且值≠0 的记录
# 使用 __dict__ 转换(过滤内部属性) filtered_point_data = [
if result_data: item for item in point_data
# 复制字典并排除 SQLAlchemy 内部属性 if "useflag" in item and item["useflag"] != 0 # 核心条件:字段存在 + 非0
data_dict = result_data.__dict__.copy() ]
data_dict.pop('_sa_instance_state', None) # 移除ORM内部状态属性 # 过滤后无数据则跳过当前测点
data_list = [data_dict] if not filtered_point_data:
else: continue
data_list = []
# 使用过滤后的数据处理
return DataResponse( latest_item = filtered_point_data[0]
code=ResponseCode.SUCCESS, latest_condition = latest_item.get("workinfoname")
message="查询成功", if not latest_condition:
total=len(data_list), result["error_data"].append(latest_item)
data=data_list warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'workinfoname'字段", UserWarning)
) continue
except Exception as e:
logger.error(f"Query section data failed: {str(e)}") base_condition = None
return DataResponse( if latest_condition != "冬休":
code=ResponseCode.QUERY_FAILED, if latest_condition not in self.compatible_periods:
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}", result["error_data"].append(latest_item)
total=0, warnings.warn(f"【数据错误】测点{point_idx}最新数据存在未定义工况: {latest_condition}", UserWarning)
data=[] continue
) base_condition = latest_condition
else:
# 遍历过滤后的历史数据
for history_item in filtered_point_data[1:]:
history_condition = history_item.get("workinfoname")
if not history_condition:
result["error_data"].append(history_item)
warnings.warn(f"【数据错误】测点{point_idx}的历史数据缺少'workinfoname'字段", UserWarning)
continue
if history_condition != "冬休":
if history_condition not in self.compatible_periods:
result["error_data"].append(history_item)
warnings.warn(f"【数据错误】测点{point_idx}历史数据存在未定义工况: {history_condition}", UserWarning)
base_condition = None
break
base_condition = history_condition
break
item_copy = copy.deepcopy(latest_item)
create_date_val = latest_item.get("createdate")
if not create_date_val:
result["error_data"].append(item_copy)
warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'createdate'字段", UserWarning)
continue
try:
if isinstance(create_date_val, datetime):
create_date = create_date_val.date()
else:
create_date = datetime.strptime(create_date_val, "%Y-%m-%d %H:%M:%S").date()
except ValueError as e:
result["error_data"].append(item_copy)
warnings.warn(f"【数据错误】测点{point_idx}最新数据的日期格式错误:{create_date_val},错误:{str(e)}", UserWarning)
continue
if not base_condition:
result["winter"].append(item_copy)
continue
period = self.compatible_periods[base_condition]
days_passed = (calc_date - create_date).days
due_days = period - days_passed
if due_days < 0:
item_copy["remaining"] = 0 - int(abs(due_days))
result["error_data"].append(item_copy)
warn_msg = (
f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'{create_date}"
f"已超期{abs(due_days)}天!基准工况:{base_condition},周期{period}"
)
warnings.warn(warn_msg, UserWarning)
elif start <= due_days <= end:
item_copy["remaining"] = due_days
result["data"].append(item_copy)
# 相同NYID保留剩余天数最少的记录
if result["data"]:
nyid_min_remaining = {}
for record in result["data"]:
nyid = record.get("NYID")
if not nyid:
continue
if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]:
nyid_min_remaining[nyid] = record
result["data"] = list(nyid_min_remaining.values())
return result