From 5e9409aada5b2eadeaa82f9c8e31e0dfe48513e7 Mon Sep 17 00:00:00 2001 From: whm <973418690@qq.com> Date: Thu, 30 Oct 2025 11:43:20 +0800 Subject: [PATCH] =?UTF-8?q?1.=E8=87=AA=E5=8A=A8=E6=9B=B4=E6=96=B0=E4=BB=8A?= =?UTF-8?q?=E6=97=A5=E9=9C=80=E8=A6=81=E6=8A=93=E5=8F=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=202.=E5=B7=A5=E5=86=B5=E6=8E=A5=E5=8F=A3=E8=BF=94=E5=9B=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env | 7 +- app/api/comprehensive_data.py | 33 ++++++- app/core/config.py | 9 +- app/main.py | 2 + app/models/account.py | 11 ++- app/models/checkpoint.py | 10 +- app/models/daily.py | 12 +++ app/models/level_data.py | 10 +- app/models/section_data.py | 10 +- app/models/settlement_data.py | 15 ++- app/schemas/comprehensive_data.py | 40 ++++++-- app/services/account.py | 2 +- app/services/daily.py | 152 ++++++++++++++++++++++++++++ app/services/settlement_data.py | 86 +++++++++++++++- app/utils/construction_monitor.py | 159 ++++++++++++++++++++++++++++++ app/utils/scheduler.py | 117 +++++++++++++++++++++- 16 files changed, 652 insertions(+), 23 deletions(-) create mode 100644 app/models/daily.py create mode 100644 app/services/daily.py create mode 100644 app/utils/construction_monitor.py diff --git a/.env b/.env index d998e27..6a6eae4 100644 --- a/.env +++ b/.env @@ -1,11 +1,10 @@ # 云端数据库配置 -DATABASE_URL=mysql+pymysql://railway:Railway01.@172.17.0.1:3306/railway -DB_HOST=172.17.0.1 +DATABASE_URL=mysql+pymysql://railway:Railway01.@localhost:3306/railway +DB_HOST=localhost DB_PORT=3306 DB_USER=railway DB_PASSWORD=Railway01. DB_NAME=railway - # 本地配置 # DATABASE_URL=mysql+pymysql://railway:Railway01.@localhost:3306/railway # DB_HOST=localhost @@ -15,7 +14,7 @@ DB_NAME=railway # DB_NAME=railway # 应用配置 -APP_HOST=0.0.0.0 +APP_HOST=127.0.0.1 APP_PORT=8000 APP_DEBUG=True diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py index d7faa14..478c536 100644 --- a/app/api/comprehensive_data.py +++ b/app/api/comprehensive_data.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, status,Query from sqlalchemy.orm import Session from typing import List, Optional from ..core.database import get_db @@ -15,7 +15,8 @@ from ..schemas.comprehensive_data import ( SettlementDataQueryRequest, OriginalDataQueryRequest, SettlementDataCheckpointQueryRequest, - LevelDataQueryRequest + LevelDataQueryRequest, + LinecodeRequest ) from ..services.section_data import SectionDataService from ..services.checkpoint import CheckpointService @@ -24,7 +25,6 @@ from ..services.level_data import LevelDataService from ..services.original_data import OriginalDataService from ..services.comprehensive import ComprehensiveDataService import logging - router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"]) logger = logging.getLogger(__name__) @@ -318,3 +318,30 @@ def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db total=0, data=[] ) +@router.post("/get_settlement_by_linecode", response_model=DataResponse) +def get_settlement_by_linecode( + request: LinecodeRequest, # 假设定义了接收linecode的请求模型 + db: Session = Depends(get_db) +): + try: + linecode = request.linecode # 从请求体中获取linecode + logger.info(f"接口请求:根据linecode={linecode}查询沉降数据") + + settlement_service = SettlementDataService() + result = settlement_service.get_settlement_by_linecode(db, linecode) + + return DataResponse( + code=ResponseCode.SUCCESS, + message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据", + total=len(result['settlement_data']), + data=result['settlement_data'] + ) + + except Exception as e: + logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True) + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"查询失败:{str(e)}", + total=0, + data=[] + ) \ No newline at end of file diff --git a/app/core/config.py b/app/core/config.py index e29762c..144b2d7 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -15,4 +15,11 @@ class Settings: APP_PORT = int(os.getenv("APP_PORT", 8000)) APP_DEBUG = os.getenv("APP_DEBUG", "True").lower() == "true" -settings = Settings() \ No newline at end of file +settings = Settings() + + +# 打印配置验证 +print("读取到的数据库配置:") +print(f"DATABASE_URL: {settings.DATABASE_URL}") # 应显示包含 railway@localhost:3306 +print(f"DB_HOST: {settings.DB_HOST}") # 应显示 localhost +print(f"DB_USER: {settings.DB_USER}") # 应显示 railway \ No newline at end of file diff --git a/app/main.py b/app/main.py index 2468308..359bbe8 100644 --- a/app/main.py +++ b/app/main.py @@ -11,6 +11,7 @@ from .api.database import router as database_router from .api.task import router as task_router from .api.comprehensive_data import router as comprehensive_data_router from .utils.scheduler import task_scheduler +from .api.test import router as test_router # 初始化日志系统 setup_logging() @@ -68,6 +69,7 @@ app.include_router(account_router, prefix="/api") app.include_router(database_router, prefix="/api") app.include_router(task_router, prefix="/api") app.include_router(comprehensive_data_router, prefix="/api") +app.include_router(test_router, prefix="/api") # 根路径 @app.get("/") diff --git a/app/models/account.py b/app/models/account.py index b7a531d..f3e7563 100644 --- a/app/models/account.py +++ b/app/models/account.py @@ -13,4 +13,13 @@ class Account(Base): project_name = Column(String(1000), comment="项目名称") created_at = Column(DateTime, server_default=func.now(), comment="创建时间") updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间") - update_time = Column(String(1000), nullable=False, comment="更新时间跨度") \ No newline at end of file + update_time = Column(String(1000), nullable=False, comment="更新时间跨度") + + + # 模型转字典 + def to_dict(self): + """将模型实例转换为字典,支持 Pydantic 序列化""" + return { + column.name: getattr(self, column.name) + for column in self.__table__.columns + } \ No newline at end of file diff --git a/app/models/checkpoint.py b/app/models/checkpoint.py index 1ba5669..cb639b4 100644 --- a/app/models/checkpoint.py +++ b/app/models/checkpoint.py @@ -8,4 +8,12 @@ class Checkpoint(Base): aname = Column(String(100), nullable=False, comment="观察点名称") burial_date = Column(String(100), comment="埋设日期") section_id = Column(String(100), nullable=False, comment="所属断面id") - point_id = Column(String(100), nullable=False, comment="观察点id") \ No newline at end of file + point_id = Column(String(100), nullable=False, comment="观察点id") + + # 模型转字典 + def to_dict(self): + """将模型实例转换为字典,支持 Pydantic 序列化""" + return { + column.name: getattr(self, column.name) + for column in self.__table__.columns + } \ No newline at end of file diff --git a/app/models/daily.py b/app/models/daily.py new file mode 100644 index 0000000..84ab4f5 --- /dev/null +++ b/app/models/daily.py @@ -0,0 +1,12 @@ +from sqlalchemy import Column, Integer, String +from ..core.database import Base + +class DailyData(Base): + __tablename__ = "daily" + + id = Column(Integer, primary_key=True, index=True, autoincrement=True) + account_id = Column(Integer, nullable=False, comment="账户id") + point_id = Column(String(100), comment="测点id") + NYID = Column(String(100), nullable=False, comment="期数id") + linecode = Column(String(255), nullable=False, comment="水准线路编码") + section_id = Column(String(255), nullable=False, comment="所属断面id") \ No newline at end of file diff --git a/app/models/level_data.py b/app/models/level_data.py index fe7b567..fda69db 100644 --- a/app/models/level_data.py +++ b/app/models/level_data.py @@ -10,4 +10,12 @@ class LevelData(Base): wsphigh = Column(String(100), comment="工作基点高程序列(m)") NYID = Column(String(100), nullable=False, comment="期数id", index=True) createDate = Column(DateTime, comment="上传时间") - mtype = Column(String(100), comment="水准观测类型") \ No newline at end of file + mtype = Column(String(100), comment="水准观测类型") + + # 模型转字典 + def to_dict(self): + """将模型实例转换为字典,支持 Pydantic 序列化""" + return { + column.name: getattr(self, column.name) + for column in self.__table__.columns + } \ No newline at end of file diff --git a/app/models/section_data.py b/app/models/section_data.py index 3b023fd..3c49473 100644 --- a/app/models/section_data.py +++ b/app/models/section_data.py @@ -18,4 +18,12 @@ class SectionData(Base): foundation_treatment_method = Column(String(100), comment="地基处理方法") rock_mass_classification = Column(String(100), comment="围岩级别") account_id = Column(String(100), nullable=True, comment="账号id", index=True) - section_id = Column(String(100), nullable=False, comment="断面id", index=True) \ No newline at end of file + section_id = Column(String(100), nullable=False, comment="断面id", index=True) + + # 模型转字典 + def to_dict(self): + """将模型实例转换为字典,支持 Pydantic 序列化""" + return { + column.name: getattr(self, column.name) + for column in self.__table__.columns + } \ No newline at end of file diff --git a/app/models/settlement_data.py b/app/models/settlement_data.py index 0be83d3..872cc2b 100644 --- a/app/models/settlement_data.py +++ b/app/models/settlement_data.py @@ -1,6 +1,6 @@ from sqlalchemy import Column, Integer, String, DateTime from ..core.database import Base - +from datetime import datetime class SettlementData(Base): __tablename__ = "settlement_data" @@ -23,4 +23,15 @@ class SettlementData(Base): sjName = Column(String(100), comment="司镜人员") useflag = Column(String(100)) workinfoname = Column(String(100), comment="观测阶段") - upd_remark = Column(String(1000), comment="备注") \ No newline at end of file + upd_remark = Column(String(1000), comment="备注") + def to_dict(self): + data = {} + for column in self.__table__.columns: + field_value = getattr(self, column.name) + # 时间字段格式化(不影响原生字段名) + if isinstance(field_value, datetime): + data[column.name] = field_value.strftime("%Y-%m-%d %H:%M:%S") + else: + data[column.name] = field_value + # 【关键】删除之前添加的max_nyid,只保留原生NYID字段 + return data \ No newline at end of file diff --git a/app/schemas/comprehensive_data.py b/app/schemas/comprehensive_data.py index 2bff7e3..30c1b76 100644 --- a/app/schemas/comprehensive_data.py +++ b/app/schemas/comprehensive_data.py @@ -1,6 +1,5 @@ from pydantic import BaseModel -from typing import Any, Dict, List, Optional - +from typing import Any, Dict, List, Optional, Union # 原始数据导入请求 class OriginalDataImportRequest(BaseModel): @@ -81,6 +80,7 @@ class OriginalDataQueryRequest(BaseModel): NYID: Optional[str] = None sort: Optional[int] = None + # 沉降数据查询请求 class SettlementDataQueryRequest(BaseModel): id: Optional[int] = None @@ -209,10 +209,26 @@ class DataImportResponse(BaseModel): } # 查询响应模型 +# class DataResponse(BaseModel): +# code: int = 0 # 响应状态码,0表示成功 +# message: str +# data: List[Dict[str, Any]] = [] +# total: Optional[int] = None # 总数 + +# class Config: +# schema_extra = { +# "example": { +# "code": 0, +# "message": "查询成功", +# "total": 10, +# "data": [] +# } +# } class DataResponse(BaseModel): code: int = 0 # 响应状态码,0表示成功 message: str - data: List[Dict[str, Any]] = [] + # 关键:用Union允许data为两种格式(列表套字典 或 列表套列表套字典) + data: Union[List[Dict[str, Any]], List[List[Dict[str, Any]]]] = [] total: Optional[int] = None # 总数 class Config: @@ -221,14 +237,26 @@ class DataResponse(BaseModel): "code": 0, "message": "查询成功", "total": 10, - "data": [] + "data": [ # 示例1:List[dict]格式 + {"id": 1, "point_id": 100, "NYID": 50}, + {"id": 2, "point_id": 200, "NYID": 60} + ] + }, + "another_example": { + "code": 0, + "message": "查询成功", + "total": 2, + "data": [ # 示例2:List[List[dict]]格式 + [{"id": 1, "point_id": 100, "NYID": 50}], + [{"id": 2, "point_id": 200, "NYID": 60}] + ] } } - # 兼容旧接口的模型 class ComprehensiveDataImportRequest(BaseModel): data: Dict[str, Any] - +class LinecodeRequest(BaseModel): + linecode: str class ComprehensiveDataImportResponse(BaseModel): success: bool message: str diff --git a/app/services/account.py b/app/services/account.py index daf0466..d544f15 100644 --- a/app/services/account.py +++ b/app/services/account.py @@ -90,4 +90,4 @@ class AccountService: db.delete(db_account) db.commit() return True - return False \ No newline at end of file + return False diff --git a/app/services/daily.py b/app/services/daily.py new file mode 100644 index 0000000..39bb2a2 --- /dev/null +++ b/app/services/daily.py @@ -0,0 +1,152 @@ +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any, Set, Tuple,Union +from ..models.level_data import LevelData +from ..models.daily import DailyData +from .base import BaseService +from ..models.settlement_data import SettlementData +from sqlalchemy import func, select, desc,over +from sqlalchemy.orm import Session +import logging +logger = logging.getLogger(__name__) +class DailyDataService(BaseService[DailyData]): + def __init__(self): + super().__init__(DailyData) + + def _dict_to_instance(self, data_dict: Dict) -> DailyData: + """辅助方法:将单个字典转换为 DailyData 实例""" + model_fields = [col.name for col in DailyData.__table__.columns] + filtered_data = {k: v for k, v in data_dict.items() if k in model_fields} + return DailyData(**filtered_data) + + def _ensure_instances(self, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: + """确保输入数据是 DailyData 实例列表""" + if not isinstance(data, list): + raise TypeError(f"输入必须是列表,而非 {type(data)}") + + instances = [] + for item in data: + if isinstance(item, DailyData): + instances.append(item) + elif isinstance(item, dict): + instances.append(self._dict_to_instance(item)) + else: + raise TypeError(f"列表元素必须是 dict 或 DailyData 实例,而非 {type(item)}") + return instances + + def batch_create_by_account_nyid(self, db: Session, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: + """ + 批量创建记录,支持两种输入格式: + - List[DailyData]:模型实例列表 + - List[dict]:字典列表(自动转换为实例) + 通过 (account_id, NYID) 联合判断是否已存在,存在则忽略 + """ + try: + data_list = self._ensure_instances(data) + except TypeError as e: + logger.error(f"数据格式错误:{str(e)}") + raise + + target_pairs: List[Tuple[int, int]] = [ + (item.account_id, item.NYID) + for item in data_list + if item.account_id is not None and item.NYID is not None + ] + + if not target_pairs: + logger.warning("批量创建失败:所有记录缺少 account_id 或 NYID") + return [] + + existing_pairs: Set[Tuple[int, int]] = { + (item.account_id, item.NYID) + for item in db.query(DailyData.account_id, DailyData.NYID) + .filter(DailyData.account_id.in_([p[0] for p in target_pairs]), + DailyData.NYID.in_([p[1] for p in target_pairs])) + .all() + } + + to_create = [ + item for item in data_list + if (item.account_id, item.NYID) not in existing_pairs + ] + + ignored_count = len(data_list) - len(to_create) + if ignored_count > 0: + logger.info(f"批量创建时忽略{ignored_count}条已存在记录(account_id和NYID已存在)") + + if not to_create: + return [] + + # 修复点:使用 add_all 替代 bulk_save_objects,确保对象被会话跟踪 + db.add_all(to_create) # 这里是关键修改 + db.commit() + + # 现在可以安全地刷新实例了 + for item in to_create: + db.refresh(item) + + return to_create + + + def get_nyid_by_point_id( + self, + db: Session, + point_ids: List[int] = None, + max_num: int = 1 + ) -> List[List[dict]]: + """ + 获取指定point_id的记录,修复子查询中模型对象访问错误 + """ + # 处理参数默认值 + point_ids = point_ids or [] + max_num = max(max_num, 1) + + # 窗口函数:按point_id分组,每组内按NYID降序编号 + row_num = over( + func.row_number(), + partition_by=SettlementData.point_id, + order_by=desc(SettlementData.NYID) + ).label("row_num") + + # 子查询:查询模型的所有字段 + 行号(不保留模型对象,只展平字段) + # 先获取模型的所有字段列表 + model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] + subquery = ( + select(*model_columns, row_num) # 展开所有字段 + 行号 + .where(SettlementData.point_id.in_(point_ids) if point_ids else True) + .subquery() + ) + + # 主查询:筛选行号<=max_num的记录 + query = ( + select(subquery) + .where(subquery.c.row_num <= max_num) + .order_by(subquery.c.point_id, subquery.c.row_num) + ) + + # 执行查询(结果为包含字段值的行对象) + results = db.execute(query).all() + grouped: Dict[int, List[dict]] = {} + + # 获取模型字段名列表(用于映射行对象到字典) + field_names = [col.name for col in SettlementData.__table__.columns] + + for row in results: + # 将行对象转换为字典(忽略最后一个字段row_num) + item_dict = { + field: getattr(row, field) + for field in field_names + } + pid = item_dict["point_id"] + + if pid not in grouped: + grouped[pid] = [] + grouped[pid].append(item_dict) + + # 按输入point_ids顺序整理结果 + if not point_ids: + point_ids = sorted(grouped.keys()) + + # 构建[[{}], [{}]]格式 + return [ + [record] for pid in point_ids for record in grouped.get(pid, []) + ] \ No newline at end of file diff --git a/app/services/settlement_data.py b/app/services/settlement_data.py index 05fa3d6..4f59e7d 100644 --- a/app/services/settlement_data.py +++ b/app/services/settlement_data.py @@ -3,7 +3,14 @@ from typing import List, Optional, Dict, Any from ..models.settlement_data import SettlementData from ..models.checkpoint import Checkpoint from .base import BaseService +from sqlalchemy.orm import Session +from typing import Dict, List +from ..models.settlement_data import SettlementData +from ..models.level_data import LevelData +import logging +from datetime import datetime +logger = logging.getLogger(__name__) class SettlementDataService(BaseService[SettlementData]): def __init__(self): super().__init__(SettlementData) @@ -340,4 +347,81 @@ class SettlementDataService(BaseService[SettlementData]): 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items - } \ No newline at end of file + } + + # 根据水准线路编码获取最新的NYID并获取对应的测点数据 + def get_settlement_by_linecode( + self, + db: Session, + linecode: str, + num: int = 1 # 新增参数:控制返回的期数,默认1(最新一期) +) -> Dict: + """ + 根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选 + 关联逻辑:LevelData.linecode → LevelData.NYID → SettlementData.NYID + :param db: 数据库会话 + :param linecode: 目标水准线路编码 + :param num: 返回的期数(按NYID从大到小排序),默认1(最新一期) + :return: 字典格式,包含沉降数据列表,键为 "settlement_data" + """ + try: + logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)") + + # 1. 根据linecode查询水准线路表,获取所有关联的NYID(去重后按NYID降序排序) + nyid_query = db.query(LevelData.NYID)\ + .filter(LevelData.linecode == linecode)\ + .distinct()\ + .order_by(LevelData.NYID.desc()) # 按NYID降序,确保最新的在前 + + # 根据num参数截取前N期的NYID + top_nyids = nyid_query.limit(num).all() + + if not top_nyids: + logger.warning(f"未查询到linecode={linecode}对应的水准线路记录") + return {"settlement_data": []} + + # 提取NYID字符串列表(按降序排列,保持最新的在前) + target_nyids = [item.NYID for item in top_nyids] + + # 2. 根据NYID列表查询沉降数据表,按NYID降序、观测时间升序排列 + settlement_records = db.query(SettlementData)\ + .filter(SettlementData.NYID.in_(target_nyids))\ + .order_by( + SettlementData.NYID.desc(), # 期数从新到旧 + SettlementData.MTIME_W.asc() # 同期内按观测时间从早到晚 + )\ + .all() + + # 3. 转换模型实例为字典列表(处理日期格式) + settlement_data = [] + for record in settlement_records: + record_dict = { + "id": record.id, + "point_id": record.point_id, + "CVALUE": record.CVALUE, + "MAVALUE": record.MAVALUE, + "MTIME_W": record.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if record.MTIME_W else None, + "NYID": record.NYID, + "PRELOADH": record.PRELOADH, + "PSTATE": record.PSTATE, + "REMARK": record.REMARK, + "WORKINFO": record.WORKINFO, + "createdate": record.createdate.strftime("%Y-%m-%d %H:%M:%S") if record.createdate else None, + "day": record.day, + "day_jg": record.day_jg, + "isgzjdxz": record.isgzjdxz, + "mavalue_bc": record.mavalue_bc, + "mavalue_lj": record.mavalue_lj, + "sjName": record.sjName, + "useflag": record.useflag, + "workinfoname": record.workinfoname, + "upd_remark": record.upd_remark + } + settlement_data.append(record_dict) + + logger.info(f"查询完成,linecode={linecode}的前{num}期对应{len(settlement_data)}条沉降数据") + return {"settlement_data": settlement_data} + + except Exception as e: + logger.error(f"查询linecode={linecode}的沉降数据失败:{str(e)}", exc_info=True) + raise e \ No newline at end of file diff --git a/app/utils/construction_monitor.py b/app/utils/construction_monitor.py new file mode 100644 index 0000000..d6cc001 --- /dev/null +++ b/app/utils/construction_monitor.py @@ -0,0 +1,159 @@ +from datetime import datetime +from typing import List, Dict +import warnings +import copy + +class ConstructionMonitorUtils: + def __init__(self): + # 原始工况周期映射表(保持不变) + self.base_periods = { + "仰拱(底板)施工完成后,第1个月": 7, + "仰拱(底板)施工完成后,第2至3个月": 14, + "仰拱(底板)施工完成后,3个月以后": 30, + "无砟轨道铺设后,第1至3个月": 30, + "无砟轨道铺设后,4至12个月": 90, + "无砟轨道铺设后,12个月以后": 180, + "墩台施工到一定高度": 30, + "墩台混凝土施工": 30, + "预制梁桥,架梁前": 30, + "预制梁桥,预制梁架设前": 1, + "预制梁桥,预制梁架设后": 7, + "桥位施工桥梁,制梁前": 30, + "桥位施工桥梁,上部结构施工中": 1, + "架桥机(运梁车)通过": 7, + "桥梁主体工程完工后,第1至3个月": 7, + "桥梁主体工程完工后,第4至6个月": 14, + "桥梁主体工程完工后,6个月以后": 30, + "轨道铺设期间,前": 30, + "轨道铺设期间,后": 14, + "轨道铺设完成后,第1个月": 14, + "轨道铺设完成后,2至3个月": 30, + "轨道铺设完成后,4至12个月": 90, + "轨道铺设完成后,12个月以后": 180, + "填筑或堆载,一般情况": 1, + "填筑或堆载,沉降量突变情况": 1, + "填筑或堆载,两次填筑间隔时间较长情况": 3, + "堆载预压或路基填筑完成,第1至3个月": 7, + "堆载预压或路基填筑完成,第4至6个月": 14, + "堆载预压或路基填筑完成,6个月以后": 30, + "架桥机(运梁车)首次通过前": 1, + "架桥机(运梁车)首次通过后,前3天": 1, + "架桥机(运梁车)首次通过后": 7, + "轨道板(道床)铺设后,第1个月": 14, + "轨道板(道床)铺设后,第2至3个月": 30, + "轨道板(道床)铺设后,3个月以后": 90 + } + # 构建中英文括号兼容映射表 + self.compatible_periods = self._build_compatible_brackets_map() + + def _build_compatible_brackets_map(self) -> Dict[str, int]: + """构建支持中英文括号的兼容映射表""" + compatible_map = {} + for original_key, period in self.base_periods.items(): + compatible_map[original_key] = period + # 生成中文括号版key并添加到映射表 + chinese_bracket_key = original_key.replace("(", "(").replace(")", ")") + if chinese_bracket_key != original_key: + compatible_map[chinese_bracket_key] = period + return compatible_map + + def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 1, current_date: datetime = None) -> Dict[str, List[Dict]]: + result = {"winter": [], "data": [], "error_data": []} + if not input_data: + return result + + calc_date = current_date.date() if current_date else datetime.now().date() + + # ------------------------------ + # 原有核心逻辑(完全不变) + # ------------------------------ + for point_idx, point_data in enumerate(input_data): + if not point_data: + continue + + latest_item = point_data[0] + latest_condition = latest_item.get("workinfoname") + if not latest_condition: + result["error_data"].append(latest_item) + warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'workinfoname'字段", UserWarning) + continue + + base_condition = None + if latest_condition != "冬休": + if latest_condition not in self.compatible_periods: + result["error_data"].append(latest_item) + warnings.warn(f"【数据错误】测点{point_idx}最新数据存在未定义工况: {latest_condition}", UserWarning) + continue + base_condition = latest_condition + else: + for history_item in point_data[1:]: + history_condition = history_item.get("workinfoname") + if not history_condition: + result["error_data"].append(history_item) + warnings.warn(f"【数据错误】测点{point_idx}的历史数据缺少'workinfoname'字段", UserWarning) + continue + if history_condition != "冬休": + if history_condition not in self.compatible_periods: + result["error_data"].append(history_item) + warnings.warn(f"【数据错误】测点{point_idx}历史数据存在未定义工况: {history_condition}", UserWarning) + base_condition = None + break + base_condition = history_condition + break + + item_copy = copy.deepcopy(latest_item) + create_date_val = latest_item.get("createdate") + if not create_date_val: + result["error_data"].append(item_copy) + warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'createdate'字段", UserWarning) + continue + + try: + if isinstance(create_date_val, datetime): + create_date = create_date_val.date() + else: + create_date = datetime.strptime(create_date_val, "%Y-%m-%d %H:%M:%S").date() + except ValueError as e: + result["error_data"].append(item_copy) + warnings.warn(f"【数据错误】测点{point_idx}最新数据的日期格式错误:{create_date_val},错误:{str(e)}", UserWarning) + continue + + if not base_condition: + result["winter"].append(item_copy) + continue + + period = self.compatible_periods[base_condition] + days_passed = (calc_date - create_date).days + due_days = period - days_passed + + if due_days < 0: + item_copy["overdue"] = abs(due_days) + result["error_data"].append(item_copy) + warn_msg = ( + f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'({create_date})" + f"已超期{abs(due_days)}天!基准工况:{base_condition},周期{period}天" + ) + warnings.warn(warn_msg, UserWarning) + elif start <= due_days <= end: + item_copy["remaining"] = due_days + result["data"].append(item_copy) + + # ------------------------------ + # 新增步骤:data中相同NYID保留剩余天数最少的记录 + # ------------------------------ + if result["data"]: + # 用字典临时存储:key=NYID,value=该NYID下剩余天数最少的记录 + nyid_min_remaining = {} + for record in result["data"]: + nyid = record.get("NYID") # 假设NYID字段名是"NYID",若实际不同需调整 + if not nyid: + continue # 无NYID的记录直接保留(或按需求处理) + + # 若该NYID未存储,或当前记录剩余天数更少,则更新 + if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]: + nyid_min_remaining[nyid] = record + + # 将字典 values 转换为列表,作为去重后的data + result["data"] = list(nyid_min_remaining.values()) + + return result \ No newline at end of file diff --git a/app/utils/scheduler.py b/app/utils/scheduler.py index 16bd0a0..fd1e730 100644 --- a/app/utils/scheduler.py +++ b/app/utils/scheduler.py @@ -3,9 +3,20 @@ from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR from ..core.config import settings +from sqlalchemy.orm import Session from ..core.database import SessionLocal from ..core.logging_config import get_logger from ..models.account import Account +# from ..services.daily import get_nyid_by_point_id +from ..services.daily import DailyDataService +from ..services.level_data import LevelDataService +from ..services.checkpoint import CheckpointService +from ..services.section_data import SectionDataService +from ..services.account import AccountService +from ..models.daily import DailyData +from ..models.settlement_data import SettlementData +from typing import List +from ..utils.construction_monitor import ConstructionMonitorUtils # 获取日志记录器 logger = get_logger(__name__) @@ -70,6 +81,7 @@ class TaskScheduler: # 添加每天午夜12点重置today_updated字段的任务 self.scheduler.add_job( reset_today_updated_task, + scheduled_get_max_nyid_by_point_id, 'cron', id='daily_reset_today_updated', hour=0, @@ -174,6 +186,7 @@ def reset_today_updated_task(): finally: db.close() + # 示例定时任务函数 def example_task(): """示例定时任务""" @@ -185,4 +198,106 @@ def database_cleanup_task(): """数据库清理任务示例""" logger.info("执行数据库清理任务") # 这里可以添加数据库清理逻辑 - return "数据库清理完成" \ No newline at end of file + return "数据库清理完成" + +# 每日自动写入获取最新工况信息 +def scheduled_get_max_nyid_by_point_id(): + """定时任务:获取max NYID关联数据并批量创建DailyData记录""" + db: Session = None + try: + # 初始化数据库会话(替代接口的Depends依赖) + db = SessionLocal() + logger.info("定时任务开始执行:获取max NYID关联数据并处理") + # 核心新增:清空DailyData表所有数据 + delete_count = db.query(DailyData).delete() + db.commit() + logger.info(f"DailyData表清空完成,共删除{delete_count}条历史记录") + + # 1. 获取沉降数据(返回 List[List[dict]]) + daily_service = DailyDataService() + result = daily_service.get_nyid_by_point_id(db, [], 1) + + # 2. 计算到期数据 + monitor = ConstructionMonitorUtils() + daily_data = monitor.get_due_data(result) + data = daily_data['data'] + error_data = daily_data['error_data'] + winters = daily_data['winter'] + logger.info(f"首次获取数据完成,共{len(result)}条记录") + + # 3. 循环处理冬休数据,追溯历史非冬休记录 + max_num = 1 + while winters: + max_num += 1 + # 提取冬休数据的point_id列表 + new_list = [w['point_id'] for w in winters] + # 获取更多历史记录 + nyid_list = daily_service.get_nyid_by_point_id(db, new_list, max_num) + w_list = monitor.get_due_data(nyid_list) + # 更新冬休、待处理、错误数据 + winters = w_list['winter'] + data.extend(w_list['data']) + error_data.extend(w_list['error_data']) + + # 4. 初始化服务实例 + level_service = LevelDataService() + checkpoint_db = CheckpointService() + section_db = SectionDataService() + account_service = AccountService() + + # 5. 关联其他表数据(核心逻辑保留) + for d in data: + # 处理 LevelData(假设返回列表,取第一条) + level_results = level_service.get_by_nyid(db, d['NYID']) + level_instance = level_results[0] if isinstance(level_results, list) and level_results else level_results + d['level_data'] = level_instance.to_dict() if level_instance else None + + # 处理 CheckpointData(返回单实例,直接使用) + checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id']) + d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None + + # 处理 SectionData(根据checkpoint_data关联) + if d['checkpoint_data']: + section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id']) + d['section_data'] = section_instance.to_dict() if section_instance else None + else: + d['section_data'] = None + + # 处理 AccountData + if d.get('section_data') and d['section_data'].get('account_id'): + account_response = account_service.get_account(db, account_id=d['section_data']['account_id']) + d['account_data'] = account_response.__dict__ if account_response else None + else: + d['account_data'] = None + + # 6. 构造DailyData数据并批量创建 + daily_create_data = [] + for d in data: + # 过滤无效数据(避免缺失关键字段报错) + if all(key in d for key in ['NYID', 'point_id']) and d.get('level_data') and d.get('account_data') and d.get('section_data'): + tem = { + 'NYID': d['NYID'], + 'point_id': d['point_id'], + 'linecode': d['level_data']['linecode'], + 'account_id': d['account_data']['account_id'], + 'section_id': d['section_data']['id'] + } + daily_create_data.append(tem) + + # 批量创建记录 + if daily_create_data: + created_records = daily_service.batch_create_by_account_nyid(db, daily_create_data) + logger.info(f"定时任务完成:成功创建{len(created_records)}条DailyData记录,共处理{len(data)}个point_id数据") + else: + logger.warning("定时任务完成:无有效数据可创建DailyData记录") + + except Exception as e: + # 异常时回滚事务 + if db: + db.rollback() + logger.error(f"定时任务执行失败:{str(e)}", exc_info=True) + raise e # 抛出异常便于定时框架捕获告警 + finally: + # 确保数据库会话关闭 + if db: + db.close() \ No newline at end of file