from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from ..models.settlement_data import SettlementData from ..models.checkpoint import Checkpoint from .base import BaseService class SettlementDataService(BaseService[SettlementData]): def __init__(self): super().__init__(SettlementData) def get_by_point_id(self, db: Session, point_id: str) -> List[SettlementData]: """根据观测点ID获取沉降数据""" return self.get_by_field(db, "point_id", point_id) def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]: """根据期数ID获取沉降数据""" return self.get_by_field(db, "NYID", nyid) def get_by_point_and_nyid(self, db: Session, point_id: str = None, nyid: str = None) -> Optional[SettlementData]: """根据观测点ID和期数ID获取沉降数据""" return db.query(SettlementData).filter( SettlementData.point_id == point_id if point_id else True, SettlementData.NYID == nyid if nyid else True ).first() def search_settlement_data(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, limit: Optional[int] = None) -> List[SettlementData]: """根据多个条件搜索沉降数据,按上传时间倒序排序""" query = db.query(SettlementData) if id is not None: query = query.filter(SettlementData.id == id) if point_id is not None: query = query.filter(SettlementData.point_id == point_id) if nyid is not None: query = query.filter(SettlementData.NYID == nyid) if sjName is not None: query = query.filter(SettlementData.sjName == sjName) if workinfoname is not None: query = query.filter(SettlementData.workinfoname == workinfoname) # 按上传时间倒序排序 query = query.order_by(SettlementData.createdate.desc()) # 如果指定了limit,则限制返回数量 if limit is not None and limit > 0: query = query.limit(limit) return query.all() def search_settlement_data_formatted(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, Any]]: """查询沉降数据并返回格式化结果,按上传时间倒序排序""" settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname, limit) result = [] for settlement in settlement_data: settlement_dict = { "id": settlement.id, "point_id": settlement.point_id, "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } result.append(settlement_dict) return result def search_settlement_checkpoint_data_formatted(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, linecode: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ 通过水准线路编码查询沉降数据与观测点数据 业务逻辑: 1. 先查询水准数据(通过linecode) 2. 用水准数据的NYID查询沉降数据 3. 将沉降数据中观测点id对应的观测点数据查询 4. 返回以观测点为上层的结构,每个观测点下有沉降数据列表 """ from ..models.level_data import LevelData from ..models.checkpoint import Checkpoint # 1. 查询水准数据(通过linecode) query = db.query(LevelData) if linecode is not None: query = query.filter(LevelData.linecode == linecode) level_data_list = query.all() if not level_data_list: return [] # 2. 收集所有的NYID nyid_list = [level.NYID for level in level_data_list if level.NYID] if not nyid_list: return [] # 3. 查询沉降数据(通过NYID列表) settlement_query = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)) # 应用其他查询条件 if id is not None: settlement_query = settlement_query.filter(SettlementData.id == id) if point_id is not None: settlement_query = settlement_query.filter(SettlementData.point_id == point_id) if sjName is not None: settlement_query = settlement_query.filter(SettlementData.sjName == sjName) if workinfoname is not None: settlement_query = settlement_query.filter(SettlementData.workinfoname == workinfoname) # 按上传时间倒序排序 settlement_query = settlement_query.order_by(SettlementData.createdate.desc()) # 如果指定了limit,则限制返回数量 if limit is not None and limit > 0: settlement_query = settlement_query.limit(limit) settlement_data_list = settlement_query.all() if not settlement_data_list: return [] # 4. 收集所有的观测点ID point_id_set = set(settlement.point_id for settlement in settlement_data_list if settlement.point_id) # 5. 查询所有相关的观测点数据 checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(list(point_id_set))).all() checkpoint_dict = {cp.point_id: cp for cp in checkpoints} # 6. 组织数据结构: 以观测点为上层,每个观测点下有沉降数据列表 checkpoint_settlement_map = {} for settlement in settlement_data_list: if settlement.point_id not in checkpoint_settlement_map: checkpoint_settlement_map[settlement.point_id] = [] settlement_dict = { "id": settlement.id, "point_id": settlement.point_id, "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } checkpoint_settlement_map[settlement.point_id].append(settlement_dict) # 7. 构建最终结果 result = [] for point_id, settlements in checkpoint_settlement_map.items(): checkpoint = checkpoint_dict.get(point_id) checkpoint_data = { "point_id": point_id, "aname": checkpoint.aname if checkpoint else None, "section_id": checkpoint.section_id if checkpoint else None, "burial_date": checkpoint.burial_date if checkpoint else None, "settlement_data": settlements } result.append(checkpoint_data) return result def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool: """检查观测点数据是否存在""" checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first() return checkpoint is not None def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入沉降数据,根据观测点ID和期数ID判断是否重复,重复数据改为更新操作 判断观测点数据是否存在,不存在则全部不导入 支持事务回滚,失败时重试一次 """ import logging logger = logging.getLogger(__name__) total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] for attempt in range(2): # 最多重试1次 try: db.begin() success_count = 0 failed_count = 0 failed_items = [] for item_data in data: try: # 判断观测点数据是否存在 checkpoint = self._check_checkpoint_exists(db, item_data.get('point_id')) logger.info(f"Checkpoint {item_data.get('point_id')}: {checkpoint}") if not checkpoint: logger.error(f"Checkpoint {item_data.get('point_id')} not found") raise Exception(f"Checkpoint {item_data.get('point_id')} not found") settlement = self.get_by_point_and_nyid( db, nyid=item_data.get('NYID'), point_id=item_data.get('point_id') ) if settlement: # 跳过数据 logger.info(f"Continue settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") logger.info(f"Existing settlement data: {settlement}") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '数据已存在,跳过插入操作' }) continue # 更新操作 # settlement.CVALUE = item_data.get('CVALUE') # settlement.MAVALUE = item_data.get('MAVALUE') # settlement.MTIME_W = item_data.get('MTIME_W') # settlement.PRELOADH = item_data.get('PRELOADH') # settlement.PSTATE = item_data.get('PSTATE') # settlement.REMARK = item_data.get('REMARK') # settlement.WORKINFO = item_data.get('WORKINFO') # settlement.createdate = item_data.get('createdate') # settlement.day = item_data.get('day') # settlement.day_jg = item_data.get('day_jg') # settlement.isgzjdxz = item_data.get('isgzjdxz') # settlement.mavalue_bc = item_data.get('mavalue_bc') # settlement.mavalue_lj = item_data.get('mavalue_lj') # settlement.sjName = item_data.get('sjName') # settlement.useflag = item_data.get('useflag') # settlement.workinfoname = item_data.get('workinfoname') # settlement.upd_remark = item_data.get('upd_remark') else: # 新增操作 settlement = SettlementData( point_id=item_data.get('point_id'), CVALUE=item_data.get('CVALUE'), MAVALUE=item_data.get('MAVALUE'), MTIME_W=item_data.get('MTIME_W'), NYID=item_data.get('NYID'), PRELOADH=item_data.get('PRELOADH'), PSTATE=item_data.get('PSTATE'), REMARK=item_data.get('REMARK'), WORKINFO=item_data.get('WORKINFO'), createdate=item_data.get('createdate'), day=item_data.get('day'), day_jg=item_data.get('day_jg'), isgzjdxz=item_data.get('isgzjdxz'), mavalue_bc=item_data.get('mavalue_bc'), mavalue_lj=item_data.get('mavalue_lj'), sjName=item_data.get('sjName'), useflag=item_data.get('useflag'), workinfoname=item_data.get('workinfoname'), upd_remark=item_data.get('upd_remark') ) db.add(settlement) logger.info(f"Created settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") success_count += 1 except Exception as e: failed_count += 1 failed_items.append({ 'data': item_data, 'error': str(e) }) logger.error(f"Failed to process settlement data {item_data.get('point_id')}-{item_data.get('NYID')}: {str(e)}") raise e db.commit() logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}") break except Exception as e: db.rollback() logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") if attempt == 1: # 最后一次重试失败 logger.error("Batch import settlement data failed after retries") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } return { 'success': failed_count == 0, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items }