from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from ..models.settlement_data import SettlementData from ..models.checkpoint import Checkpoint from .base import BaseService class SettlementDataService(BaseService[SettlementData]): def __init__(self): super().__init__(SettlementData) def get_by_point_id(self, db: Session, point_id: str) -> List[SettlementData]: """根据观测点ID获取沉降数据""" return self.get_by_field(db, "point_id", point_id) def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]: """根据期数ID获取沉降数据""" return self.get_by_field(db, "NYID", nyid) def get_by_point_and_nyid(self, db: Session, point_id: str = None, nyid: str = None) -> Optional[SettlementData]: """根据观测点ID和期数ID获取沉降数据""" return db.query(SettlementData).filter( SettlementData.point_id == point_id if point_id else True, SettlementData.NYID == nyid if nyid else True ).first() def search_settlement_data(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None) -> List[SettlementData]: """根据多个条件搜索沉降数据""" conditions = {} if id is not None: conditions["id"] = id if point_id is not None: conditions["point_id"] = point_id if nyid is not None: conditions["NYID"] = nyid if sjName is not None: conditions["sjName"] = sjName if workinfoname is not None: conditions["workinfoname"] = workinfoname return self.search_by_conditions(db, conditions) def search_settlement_data_formatted(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None) -> List[Dict[str, Any]]: """查询沉降数据并返回格式化结果""" settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname) result = [] for settlement in settlement_data: settlement_dict = { "id": settlement.id, "point_id": settlement.point_id, "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } result.append(settlement_dict) return result def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool: """检查观测点数据是否存在""" checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first() return checkpoint is not None def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入沉降数据,根据观测点ID和期数ID判断是否重复,重复数据改为更新操作 判断观测点数据是否存在,不存在则全部不导入 支持事务回滚,失败时重试一次 """ import logging logger = logging.getLogger(__name__) total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] for attempt in range(2): # 最多重试1次 try: db.begin() success_count = 0 failed_count = 0 failed_items = [] for item_data in data: try: # 判断观测点数据是否存在 checkpoint = self._check_checkpoint_exists(db, item_data.get('point_id')) logger.info(f"Checkpoint {item_data.get('point_id')}: {checkpoint}") if not checkpoint: logger.error(f"Checkpoint {item_data.get('point_id')} not found") raise Exception(f"Checkpoint {item_data.get('point_id')} not found") settlement = self.get_by_point_and_nyid( db, # item_data.get('point_id'), nyid=item_data.get('NYID') ) if settlement: # 更新操作 settlement.CVALUE = item_data.get('CVALUE') settlement.MAVALUE = item_data.get('MAVALUE') settlement.MTIME_W = item_data.get('MTIME_W') settlement.PRELOADH = item_data.get('PRELOADH') settlement.PSTATE = item_data.get('PSTATE') settlement.REMARK = item_data.get('REMARK') settlement.WORKINFO = item_data.get('WORKINFO') settlement.createdate = item_data.get('createdate') settlement.day = item_data.get('day') settlement.day_jg = item_data.get('day_jg') settlement.isgzjdxz = item_data.get('isgzjdxz') settlement.mavalue_bc = item_data.get('mavalue_bc') settlement.mavalue_lj = item_data.get('mavalue_lj') settlement.sjName = item_data.get('sjName') settlement.useflag = item_data.get('useflag') settlement.workinfoname = item_data.get('workinfoname') settlement.upd_remark = item_data.get('upd_remark') logger.info(f"Updated settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") else: # 新增操作 settlement = SettlementData( point_id=item_data.get('point_id'), CVALUE=item_data.get('CVALUE'), MAVALUE=item_data.get('MAVALUE'), MTIME_W=item_data.get('MTIME_W'), NYID=item_data.get('NYID'), PRELOADH=item_data.get('PRELOADH'), PSTATE=item_data.get('PSTATE'), REMARK=item_data.get('REMARK'), WORKINFO=item_data.get('WORKINFO'), createdate=item_data.get('createdate'), day=item_data.get('day'), day_jg=item_data.get('day_jg'), isgzjdxz=item_data.get('isgzjdxz'), mavalue_bc=item_data.get('mavalue_bc'), mavalue_lj=item_data.get('mavalue_lj'), sjName=item_data.get('sjName'), useflag=item_data.get('useflag'), workinfoname=item_data.get('workinfoname'), upd_remark=item_data.get('upd_remark') ) db.add(settlement) logger.info(f"Created settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") success_count += 1 except Exception as e: failed_count += 1 failed_items.append({ 'data': item_data, 'error': str(e) }) logger.error(f"Failed to process settlement data {item_data.get('point_id')}-{item_data.get('NYID')}: {str(e)}") raise e db.commit() logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}") break except Exception as e: db.rollback() logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") if attempt == 1: # 最后一次重试失败 logger.error("Batch import settlement data failed after retries") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } return { 'success': failed_count == 0, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items }