diff --git a/app/api/level_data.py b/app/api/level_data.py index 2c4ea2d..f36f21b 100644 --- a/app/api/level_data.py +++ b/app/api/level_data.py @@ -8,7 +8,11 @@ from ..schemas.level_data import ( LevelDataListResponse, LevelDataResponse, BatchDeleteByLinecodesRequest, - BatchDeleteByLinecodesResponse + BatchDeleteByLinecodesResponse, + LinecodeRequest, + NyidListResponse, + SyncLoseDataRequest, + SyncLoseDataResponse, ) from ..services.level_data import LevelDataService @@ -36,6 +40,31 @@ def get_level_data_by_project(request: LevelDataRequest, db: Session = Depends(g ) +@router.post("/get_nyids_by_linecode", response_model=NyidListResponse) +def get_nyids_by_linecode(request: LinecodeRequest, db: Session = Depends(get_db)): + """ + 通过水准线路编码返回该线路下所有 NYID,只返回 NYID 列表(去重、按 NYID 降序) + """ + try: + level_service = LevelDataService() + level_list = level_service.get_by_linecode(db, linecode=request.linecode) + nyids = sorted( + {str(item.NYID) for item in level_list if item.NYID}, + key=lambda x: int(x) if str(x).isdigit() else 0, + reverse=True, + ) + return NyidListResponse( + code=ResponseCode.SUCCESS, + message=ResponseMessage.SUCCESS, + data=nyids, + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"查询失败: {str(e)}", + ) + + @router.post("/batch_delete_by_linecodes", response_model=BatchDeleteByLinecodesResponse) def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Session = Depends(get_db)): """ @@ -58,4 +87,40 @@ def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Sessio raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"批量删除失败: {str(e)}" + ) + + +@router.post("/sync_lose_data", response_model=SyncLoseDataResponse) +def sync_lose_data( + request: SyncLoseDataRequest = SyncLoseDataRequest(), + db: Session = Depends(get_db), +): + """ + 同步缺失数据到 lose_data 表。 + - 不传 linecode:按所有水准线路的 NYID 计算缺失并写入,仅返回是否处理成功。 + - 传 linecode:只处理该水准线路,并返回该线路的缺失数据记录列表。 + 缺失规则:原始数据无=1、沉降数据无=2,lose_data 为二者之和(0/1/2/3)。 + """ + try: + level_service = LevelDataService() + result = level_service.sync_lose_data(db, linecode=request.linecode) + if result.get("data") is None: + data = {"success": result["success"]} + if not result["success"]: + return SyncLoseDataResponse( + code=1, + message=result.get("message", "处理失败"), + data=data, + ) + else: + data = result["data"] + return SyncLoseDataResponse( + code=0 if result["success"] else 1, + message=ResponseMessage.SUCCESS if result["success"] else (result.get("message") or "处理失败"), + data=data, + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"同步缺失数据失败: {str(e)}", ) \ No newline at end of file diff --git a/app/core/db_monitor.py b/app/core/db_monitor.py index 177b07b..17e78f5 100644 --- a/app/core/db_monitor.py +++ b/app/core/db_monitor.py @@ -235,10 +235,11 @@ def receive_after_cursor_execute(conn, cursor, statement, params, context, execu log_transaction_end(success=True) @event.listens_for(Engine, "handle_error") -def receive_handle_error(exception, context): - """错误监听""" - error_msg = str(exception) - sql = context.statement if context and hasattr(context, 'statement') else None +def receive_handle_error(context): + """错误监听:SQLAlchemy 只传入一个 ExceptionContext 参数""" + exception = getattr(context, "original_exception", None) or getattr(context, "sqlalchemy_exception", None) + error_msg = str(exception) if exception else str(context) + sql = getattr(context, "statement", None) log_connection_error(error_msg, sql) log_transaction_end(success=False, error=error_msg) diff --git a/app/schemas/level_data.py b/app/schemas/level_data.py index e9df574..c2f756a 100644 --- a/app/schemas/level_data.py +++ b/app/schemas/level_data.py @@ -51,4 +51,41 @@ class BatchDeleteByLinecodesResponse(BaseModel): message: str success: bool backup_file: Optional[str] = None - deleted_counts: Optional[dict] = None \ No newline at end of file + deleted_counts: Optional[dict] = None + + +class LinecodeRequest(BaseModel): + """按水准线路编码查询请求""" + linecode: str = Field(..., description="水准线路编码") + + +class NyidListResponse(BaseModel): + """仅返回 NYID 列表的响应""" + code: int = 0 + message: str + data: List[str] = Field(default_factory=list, description="NYID 列表") + + +class SyncLoseDataRequest(BaseModel): + """同步缺失数据请求:不传 linecode 表示全量同步,传则只处理该线路""" + linecode: Optional[str] = Field(None, description="水准线路编码,不传则处理全部线路") + + +class LoseDataItem(BaseModel): + """lose_data 表单条记录""" + id: int + account_id: int + NYID: str + linecode: str + lose_data: int + section_id: Optional[str] = None + point_id: str = "" + + model_config = ConfigDict(from_attributes=True) + + +class SyncLoseDataResponse(BaseModel): + """同步缺失数据响应:全量时 data 为是否成功,单线路时 data 为该线路缺失记录列表""" + code: int = 0 + message: str + data: Any = None # 全量时为 {"success": bool},单线路时为 List[LoseDataItem] \ No newline at end of file diff --git a/app/services/level_data.py b/app/services/level_data.py index 8ea6647..e7aeb2e 100644 --- a/app/services/level_data.py +++ b/app/services/level_data.py @@ -1,12 +1,13 @@ from sqlalchemy.orm import Session from sqlalchemy import text, inspect -from typing import List, Optional, Dict, Any +from typing import List, Optional, Dict, Any, Tuple from ..models.level_data import LevelData from .base import BaseService from ..models.settlement_data import SettlementData from ..models.checkpoint import Checkpoint from ..models.section_data import SectionData from ..models.account import Account +from ..models.lose_data import LoseData from ..core.database import engine import logging import os @@ -464,6 +465,137 @@ class LevelDataService(BaseService[LevelData]): return original_data_map + def _get_nyids_with_settlement(self, db: Session, nyid_list: List[str]) -> set: + """返回在沉降表中存在记录的 NYID 集合""" + if not nyid_list: + return set() + rows = db.query(SettlementData.NYID).filter(SettlementData.NYID.in_(nyid_list)).distinct().all() + return {r[0] for r in rows if r[0]} + + def _get_nyid_to_all_points_account_section( + self, db: Session, nyid_list: List[str] + ) -> Dict[str, List[Tuple[str, int, Optional[str]]]]: + """通过沉降 -> 观测点 -> 断面 得到每个 NYID 对应的【所有】测点列表 [(point_id, account_id, section_id), ...],无沉降的 NYID 返回 [('', 0, None)]""" + if not nyid_list: + return {} + default_list = [("", 0, None)] + settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() + # 每个 NYID 对应该期数下所有出现过的 (point_id),去重但保留顺序 + nyid_to_points: Dict[str, List[Tuple[str, int, Optional[str]]]] = {} + point_ids = list({s.point_id for s in settlements if s.point_id}) + if not point_ids: + return {nyid: default_list for nyid in nyid_list} + checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all() + point_to_section = {c.point_id: (c.section_id or None) for c in checkpoints} + section_ids = list({c.section_id for c in checkpoints if c.section_id}) + point_to_account: Dict[str, int] = {} + if section_ids: + sections = db.query(SectionData).filter(SectionData.section_id.in_(section_ids)).all() + for c in checkpoints: + sec = next((s for s in sections if s.section_id == c.section_id), None) + if sec and sec.account_id is not None: + try: + point_to_account[c.point_id] = int(sec.account_id) + except (ValueError, TypeError): + point_to_account[c.point_id] = 0 + # 按 NYID 分组,每个 NYID 下该期数出现的所有 point_id(去重) + for nyid in nyid_list: + nyid_to_points[nyid] = [] + seen_per_nyid: Dict[str, set] = {nyid: set() for nyid in nyid_list} + for s in settlements: + pt_id = (s.point_id or "") if s.point_id else "" + if pt_id not in seen_per_nyid.get(s.NYID, set()): + seen_per_nyid[s.NYID].add(pt_id) + acc = point_to_account.get(s.point_id, 0) + sec_id = point_to_section.get(s.point_id) + nyid_to_points[s.NYID].append((pt_id, acc, sec_id)) + for nyid in nyid_list: + if not nyid_to_points[nyid]: + nyid_to_points[nyid] = default_list + return nyid_to_points + + def _sync_lose_data_for_one_linecode( + self, + db: Session, + linecode_val: str, + level_list: List[LevelData], + ) -> None: + """仅处理一个水准线路编码:查该线路的 NYID,查沉降/原始,写入 lose_data。""" + pairs = [(item.linecode, str(item.NYID)) for item in level_list if item.NYID] + if not pairs: + return + nyid_list = list({nyid for _, nyid in pairs}) + settlement_nyids = self._get_nyids_with_settlement(db, nyid_list) + original_data_map = self._find_original_data_by_nyids(db, nyid_list) + original_nyids = set() + for rows in original_data_map.values(): + for row in rows: + n = row.get("NYID") + if n is not None: + original_nyids.add(str(n)) + nyid_to_points_asp = self._get_nyid_to_all_points_account_section(db, nyid_list) + for linecode_, nyid in pairs: + has_original = nyid in original_nyids + has_settlement = nyid in settlement_nyids + lose_val = (0 if has_original else 1) + (0 if has_settlement else 2) + points_list = nyid_to_points_asp.get(nyid, [("", 0, None)]) + for point_id, acc_id, sec_id in points_list: + pt_id = point_id or "" + existing = db.query(LoseData).filter( + LoseData.linecode == linecode_, + LoseData.NYID == nyid, + LoseData.point_id == pt_id, + ).first() + if existing: + existing.lose_data = lose_val + existing.account_id = acc_id + existing.section_id = sec_id + else: + db.add(LoseData( + account_id=acc_id, + NYID=nyid, + linecode=linecode_, + lose_data=lose_val, + section_id=sec_id, + point_id=pt_id, + )) + + def sync_lose_data(self, db: Session, linecode: Optional[str] = None) -> Dict[str, Any]: + """ + 同步缺失数据到 lose_data 表。 + 无 linecode:按「每个水准线路编码」分批处理,每批只查该线路的 NYID 再查沉降/原始并插入,不返回明细。 + 有 linecode:只处理该线路,并返回该线路在 lose_data 中的记录列表。 + 缺失规则:原始数据无=1、有=0;沉降数据无=2、有=0;lose_data 字段为二者之和 0/1/2/3。 + 同一 (linecode, NYID) 不重复插入,存在则更新。 + """ + try: + if linecode: + level_list = self.get_by_linecode(db, linecode=linecode) + if not level_list: + return {"success": True, "data": []} + self._sync_lose_data_for_one_linecode(db, linecode, level_list) + db.commit() + records = db.query(LoseData).filter(LoseData.linecode == linecode).order_by(LoseData.NYID.desc()).all() + return {"success": True, "data": [r.to_dict() for r in records]} + + # 全量:先取所有不重复的 linecode,再按每个 linecode 分批处理 + linecode_rows = db.query(LevelData.linecode).distinct().all() + linecodes = [r[0] for r in linecode_rows if r[0]] + if not linecodes: + return {"success": True, "data": None} + + for lc in linecodes: + level_list = self.get_by_linecode(db, linecode=lc) + self._sync_lose_data_for_one_linecode(db, lc, level_list) + db.commit() + logger.info(f"sync_lose_data 已处理线路: {lc}") + + return {"success": True, "data": None} + except Exception as e: + db.rollback() + logger.error(f"sync_lose_data 失败: {str(e)}", exc_info=True) + return {"success": False, "data": [] if linecode else None, "message": str(e)} + def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData], settlement_list: List[SettlementData], checkpoint_list: List[Checkpoint], diff --git a/app/utils/construction_monitor.py b/app/utils/construction_monitor.py index b5790b0..0b9ec4b 100644 --- a/app/utils/construction_monitor.py +++ b/app/utils/construction_monitor.py @@ -139,7 +139,12 @@ class ConstructionMonitorUtils: continue if not base_condition: - result["winter"].append(item_copy) + # 当前为冬休且历史全是冬休 → 视为数据未补全,remaining 固定为 -365 + if latest_condition == "冬休": + item_copy["remaining"] = -365 + result["data"].append(item_copy) + else: + result["winter"].append(item_copy) continue # 核心修改:冬休回溯场景下调整测量间隔(基准周期)