观测点接口逻辑

This commit is contained in:
lhx
2025-09-28 16:20:52 +08:00
parent 54d0b2a90e
commit af25665997

View File

@@ -2,10 +2,12 @@ from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any
from ..models.checkpoint import Checkpoint from ..models.checkpoint import Checkpoint
from .base import BaseService from .base import BaseService
from .section_data import SectionDataService
class CheckpointService(BaseService[Checkpoint]): class CheckpointService(BaseService[Checkpoint]):
def __init__(self): def __init__(self):
super().__init__(Checkpoint) super().__init__(Checkpoint)
self.section_service = SectionDataService()
def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]: def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]:
"""根据断面ID获取观测点""" """根据断面ID获取观测点"""
@@ -34,6 +36,7 @@ class CheckpointService(BaseService[Checkpoint]):
def batch_import_checkpoints(self, db: Session, data: List) -> Dict[str, Any]: def batch_import_checkpoints(self, db: Session, data: List) -> Dict[str, Any]:
""" """
批量导入观测点数据根据观测点ID判断是否重复重复数据改为更新操作 批量导入观测点数据根据观测点ID判断是否重复重复数据改为更新操作
判断断面id是否存在不存在则全部不导入
支持事务回滚,失败时重试一次 支持事务回滚,失败时重试一次
""" """
import logging import logging
@@ -55,12 +58,11 @@ class CheckpointService(BaseService[Checkpoint]):
try: try:
# 判断断面id是否存在 # 判断断面id是否存在
section = self.get_by_section_id(db, item_data.get('section_id')) section = self.section_service.get_by_section_id(db, item_data.get('section_id'))
if not section: if not section:
logger.error(f"Section {item_data.get('section_id')} not found") logger.error(f"Section {item_data.get('section_id')} not found")
raise Exception(f"Section {item_data.get('section_id')} not found") raise Exception(f"Section {item_data.get('section_id')} not found")
checkpoint = self.get_by_point_id(db, item_data.get('point_id')) checkpoint = self.get_by_point_id(db, item_data.get('point_id'))
if checkpoint: if checkpoint:
# 更新操作 # 更新操作