数据接口完善

This commit is contained in:
lhx
2025-09-28 17:30:40 +08:00
parent af25665997
commit 2e735e587b
6 changed files with 79 additions and 35 deletions

View File

@@ -1,6 +1,7 @@
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint
from .base import BaseService
class SettlementDataService(BaseService[SettlementData]):
@@ -15,11 +16,11 @@ class SettlementDataService(BaseService[SettlementData]):
"""根据期数ID获取沉降数据"""
return self.get_by_field(db, "NYID", nyid)
def get_by_point_and_nyid(self, db: Session, point_id: str, nyid: str) -> Optional[SettlementData]:
def get_by_point_and_nyid(self, db: Session, point_id: str = None, nyid: str = None) -> Optional[SettlementData]:
"""根据观测点ID和期数ID获取沉降数据"""
return db.query(SettlementData).filter(
SettlementData.point_id == point_id,
SettlementData.NYID == nyid
SettlementData.point_id == point_id if point_id else True,
SettlementData.NYID == nyid if nyid else True
).first()
def search_settlement_data(self, db: Session,
@@ -40,9 +41,15 @@ class SettlementDataService(BaseService[SettlementData]):
return self.search_by_conditions(db, conditions)
def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool:
"""检查观测点数据是否存在"""
checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first()
return checkpoint is not None
def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]:
"""
批量导入沉降数据根据观测点ID和期数ID判断是否重复重复数据改为更新操作
判断观测点数据是否存在,不存在则全部不导入
支持事务回滚,失败时重试一次
"""
import logging
@@ -62,10 +69,18 @@ class SettlementDataService(BaseService[SettlementData]):
for item_data in data:
try:
# 判断观测点数据是否存在
checkpoint = self._check_checkpoint_exists(db, item_data.get('point_id'))
logger.info(f"Checkpoint {item_data.get('point_id')}: {checkpoint}")
if not checkpoint:
logger.error(f"Checkpoint {item_data.get('point_id')} not found")
raise Exception(f"Checkpoint {item_data.get('point_id')} not found")
settlement = self.get_by_point_and_nyid(
db,
item_data.get('point_id'),
item_data.get('NYID')
# item_data.get('point_id'),
nyid=item_data.get('NYID')
)
if settlement:
# 更新操作