diff --git a/app/models/original_data.py b/app/models/original_data.py index b2dae41..5ef7647 100644 --- a/app/models/original_data.py +++ b/app/models/original_data.py @@ -6,7 +6,7 @@ class OriginalData(Base): id = Column(Integer, primary_key=True, index=True, autoincrement=True) bfpcode = Column(String(1000), nullable=False, comment="前(后)视点名称") - mtime = Column(DateTime, nullable=False, comment="测点观测时间") + mtime = Column(String(1000), nullable=False, comment="测点观测时间") bffb = Column(String(1000), nullable=False, comment="前(后)视标记符") bfpl = Column(String(1000), nullable=False, comment="前(后)视距离(m)") bfpvalue = Column(String(1000), nullable=False, comment="前(后)视尺读数(m)") diff --git a/app/schemas/comprehensive_data.py b/app/schemas/comprehensive_data.py index 83ea2fc..4a55e55 100644 --- a/app/schemas/comprehensive_data.py +++ b/app/schemas/comprehensive_data.py @@ -4,12 +4,12 @@ from typing import Any, Dict, List, Optional # 原始数据导入请求 class OriginalDataImportRequest(BaseModel): - bfpcode: str - mtime: str - bffb: str - bfpl: str - bfpvalue: str - times: str + bfpcode: Optional[str] = None + mtime: Optional[str] = None + bffb: Optional[str] = None + bfpl: Optional[str] = None + bfpvalue: Optional[str] = None + times: Optional[str] = None NYID: str sort: Optional[int] = None @@ -25,14 +25,14 @@ class LevelDataImportRequest(BaseModel): class SettlementDataImportRequest(BaseModel): point_id: str NYID: str - CVALUE: str - MAVALUE: str - MTIME_W: str - PRELOADH: str - PSTATE: str - createdate: str - day: str - day_jg: str + CVALUE: Optional[str] = None + MAVALUE: Optional[str] = None + MTIME_W: Optional[str] = None + PRELOADH: Optional[str] = None + PSTATE: Optional[str] = None + createdate: Optional[str] = None + day: Optional[str] = None + day_jg: Optional[str] = None REMARK: Optional[str] = None WORKINFO: Optional[str] = None useflag: Optional[str] = None @@ -46,17 +46,17 @@ class SettlementDataImportRequest(BaseModel): # 观测点数据导入请求 class CheckpointDataImportRequest(BaseModel): point_id: str - aname: str + aname: Optional[str] = None section_id: str burial_date: Optional[str] = None # 断面数据导入请求 class SectionDataImportRequest(BaseModel): section_id: str - mileage: str - work_site: str - status: str - number: str + mileage: Optional[str] = None + work_site: Optional[str] = None + status: Optional[str] = None + number: Optional[str] = None basic_types: Optional[str] = None height: Optional[str] = None transition_paragraph: Optional[str] = None diff --git a/app/services/checkpoint.py b/app/services/checkpoint.py index fd889e4..9773b57 100644 --- a/app/services/checkpoint.py +++ b/app/services/checkpoint.py @@ -1,13 +1,12 @@ from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from ..models.checkpoint import Checkpoint +from ..models.section_data import SectionData from .base import BaseService -from .section_data import SectionDataService class CheckpointService(BaseService[Checkpoint]): def __init__(self): super().__init__(Checkpoint) - self.section_service = SectionDataService() def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]: """根据断面ID获取观测点""" @@ -33,6 +32,11 @@ class CheckpointService(BaseService[Checkpoint]): return self.search_by_conditions(db, conditions) + def _check_section_exists(self, db: Session, section_id: str) -> bool: + """检查断面是否存在""" + section = db.query(SectionData).filter(SectionData.section_id == section_id).first() + return section is not None + def batch_import_checkpoints(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入观测点数据,根据观测点ID判断是否重复,重复数据改为更新操作 @@ -56,10 +60,8 @@ class CheckpointService(BaseService[Checkpoint]): for item_data in data: try: - # 判断断面id是否存在 - section = self.section_service.get_by_section_id(db, item_data.get('section_id')) - if not section: + if not self._check_section_exists(db, item_data.get('section_id')): logger.error(f"Section {item_data.get('section_id')} not found") raise Exception(f"Section {item_data.get('section_id')} not found") diff --git a/app/services/level_data.py b/app/services/level_data.py index 7c9ce7d..e6238ba 100644 --- a/app/services/level_data.py +++ b/app/services/level_data.py @@ -2,6 +2,7 @@ from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from ..models.level_data import LevelData from .base import BaseService +from ..models.settlement_data import SettlementData class LevelDataService(BaseService[LevelData]): def __init__(self): @@ -30,13 +31,18 @@ class LevelDataService(BaseService[LevelData]): return self.search_by_conditions(db, conditions) - def get_by_nyid_and_linecode(self, db: Session, nyid: str, linecode: str) -> Optional[LevelData]: + def get_by_nyid_and_linecode(self, db: Session, nyid: str, linecode: str = None) -> Optional[LevelData]: """根据期数ID和线路编码获取水准数据""" return db.query(LevelData).filter( - LevelData.NYID == nyid, - LevelData.linecode == linecode + LevelData.NYID == nyid if nyid else True, + LevelData.linecode == linecode if linecode else True ).first() + def _check_settlement_exists(self, db: Session, nyid: str) -> bool: + """检查期数id沉降数据是否存在""" + settlement = db.query(SettlementData).filter(SettlementData.NYID == nyid).first() + return settlement is not None + def batch_import_level_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入水准数据,根据期数ID和线路编码判断是否重复,重复数据改为更新操作 @@ -59,10 +65,17 @@ class LevelDataService(BaseService[LevelData]): for item_data in data: try: + + # 判断期数id沉降数据是否存在 + settlement = self._check_settlement_exists(db, item_data.get('NYID')) + if not settlement: + logger.error(f"Settlement {item_data.get('NYID')} not found") + raise Exception(f"Settlement {item_data.get('NYID')} not found") + level_data = self.get_by_nyid_and_linecode( db, - item_data.get('NYID'), - item_data.get('linecode') + # item_data.get('linecode'), + nyid=item_data.get('NYID') ) if level_data: # 更新操作 diff --git a/app/services/original_data.py b/app/services/original_data.py index 4789e24..2a3e009 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -2,6 +2,7 @@ from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from ..models.original_data import OriginalData from .base import BaseService +from ..models.settlement_data import SettlementData class OriginalDataService(BaseService[OriginalData]): def __init__(self): @@ -33,6 +34,11 @@ class OriginalDataService(BaseService[OriginalData]): return self.search_by_conditions(db, conditions) + def _check_settlement_exists(self, db: Session, nyid: str) -> bool: + """检查期数id沉降数据是否存在""" + settlement = db.query(SettlementData).filter(SettlementData.NYID == nyid).first() + return settlement is not None + def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入原始数据,直接新增,无需检查重复 @@ -55,6 +61,14 @@ class OriginalDataService(BaseService[OriginalData]): for item_data in data: try: + + # 判断期数id是否存在 + settlement = self._check_settlement_exists(db, item_data.get('NYID')) + if not settlement: + logger.error(f"Settlement {item_data.get('NYID')} not found") + raise Exception(f"Settlement {item_data.get('NYID')} not found") + + # 直接新增操作 original_data = OriginalData( bfpcode=item_data.get('bfpcode'), diff --git a/app/services/settlement_data.py b/app/services/settlement_data.py index 3508f55..1c0f90a 100644 --- a/app/services/settlement_data.py +++ b/app/services/settlement_data.py @@ -1,6 +1,7 @@ from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from ..models.settlement_data import SettlementData +from ..models.checkpoint import Checkpoint from .base import BaseService class SettlementDataService(BaseService[SettlementData]): @@ -15,11 +16,11 @@ class SettlementDataService(BaseService[SettlementData]): """根据期数ID获取沉降数据""" return self.get_by_field(db, "NYID", nyid) - def get_by_point_and_nyid(self, db: Session, point_id: str, nyid: str) -> Optional[SettlementData]: + def get_by_point_and_nyid(self, db: Session, point_id: str = None, nyid: str = None) -> Optional[SettlementData]: """根据观测点ID和期数ID获取沉降数据""" return db.query(SettlementData).filter( - SettlementData.point_id == point_id, - SettlementData.NYID == nyid + SettlementData.point_id == point_id if point_id else True, + SettlementData.NYID == nyid if nyid else True ).first() def search_settlement_data(self, db: Session, @@ -40,9 +41,15 @@ class SettlementDataService(BaseService[SettlementData]): return self.search_by_conditions(db, conditions) + def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool: + """检查观测点数据是否存在""" + checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first() + return checkpoint is not None + def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入沉降数据,根据观测点ID和期数ID判断是否重复,重复数据改为更新操作 + 判断观测点数据是否存在,不存在则全部不导入 支持事务回滚,失败时重试一次 """ import logging @@ -62,10 +69,18 @@ class SettlementDataService(BaseService[SettlementData]): for item_data in data: try: + + # 判断观测点数据是否存在 + checkpoint = self._check_checkpoint_exists(db, item_data.get('point_id')) + logger.info(f"Checkpoint {item_data.get('point_id')}: {checkpoint}") + if not checkpoint: + logger.error(f"Checkpoint {item_data.get('point_id')} not found") + raise Exception(f"Checkpoint {item_data.get('point_id')} not found") + settlement = self.get_by_point_and_nyid( db, - item_data.get('point_id'), - item_data.get('NYID') + # item_data.get('point_id'), + nyid=item_data.get('NYID') ) if settlement: # 更新操作