from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from ..models.settlement_data import SettlementData from ..models.checkpoint import Checkpoint from .base import BaseService from sqlalchemy.orm import Session from typing import Dict, List from ..models.settlement_data import SettlementData from ..models.level_data import LevelData import logging from datetime import datetime logger = logging.getLogger(__name__) class SettlementDataService(BaseService[SettlementData]): def __init__(self): super().__init__(SettlementData) def get_by_point_id(self, db: Session, point_id: str) -> List[SettlementData]: """根据观测点ID获取沉降数据""" return self.get_by_field(db, "point_id", point_id) def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]: """根据期数ID获取沉降数据""" return self.get_by_field(db, "NYID", nyid) def get_by_point_and_nyid(self, db: Session, point_id: str = None, nyid: str = None) -> Optional[SettlementData]: """根据观测点ID和期数ID获取沉降数据""" return db.query(SettlementData).filter( SettlementData.point_id == point_id if point_id else True, SettlementData.NYID == nyid if nyid else True ).first() def search_settlement_data(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, limit: Optional[int] = None) -> List[SettlementData]: """根据多个条件搜索沉降数据,按上传时间倒序排序""" query = db.query(SettlementData) if id is not None: query = query.filter(SettlementData.id == id) if point_id is not None: query = query.filter(SettlementData.point_id == point_id) if nyid is not None: query = query.filter(SettlementData.NYID == nyid) if sjName is not None: query = query.filter(SettlementData.sjName == sjName) if workinfoname is not None: query = query.filter(SettlementData.workinfoname == workinfoname) # 按上传时间倒序排序 query = query.order_by(SettlementData.createdate.desc()) # 如果指定了limit,则限制返回数量 if limit is not None and limit > 0: query = query.limit(limit) return query.all() def search_settlement_data_formatted(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, Any]]: """查询沉降数据并返回格式化结果,按上传时间倒序排序""" settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname, limit) result = [] for settlement in settlement_data: settlement_dict = { "id": settlement.id, "point_id": settlement.point_id, "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } result.append(settlement_dict) return result def search_settlement_checkpoint_data_formatted(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, linecode: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ 通过水准线路编码查询沉降数据与观测点数据 业务逻辑: 1. 先查询水准数据(通过linecode) 2. 用水准数据的NYID查询沉降数据 3. 将沉降数据中观测点id对应的观测点数据查询 4. 返回以观测点为上层的结构,每个观测点下有沉降数据列表 """ from ..models.level_data import LevelData from ..models.checkpoint import Checkpoint # 1. 查询水准数据(通过linecode) query = db.query(LevelData) if linecode is not None: query = query.filter(LevelData.linecode == linecode) level_data_list = query.all() if not level_data_list: return [] # 2. 收集所有的NYID nyid_list = [level.NYID for level in level_data_list if level.NYID] if not nyid_list: return [] # 3. 查询沉降数据(通过NYID列表) settlement_query = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)) # 应用其他查询条件 if id is not None: settlement_query = settlement_query.filter(SettlementData.id == id) if point_id is not None: settlement_query = settlement_query.filter(SettlementData.point_id == point_id) if sjName is not None: settlement_query = settlement_query.filter(SettlementData.sjName == sjName) if workinfoname is not None: settlement_query = settlement_query.filter(SettlementData.workinfoname == workinfoname) # 按上传时间倒序排序 settlement_query = settlement_query.order_by(SettlementData.createdate.desc()) # 如果指定了limit,则限制返回数量 if limit is not None and limit > 0: settlement_query = settlement_query.limit(limit) settlement_data_list = settlement_query.all() if not settlement_data_list: return [] # 4. 收集所有的观测点ID point_id_set = set(settlement.point_id for settlement in settlement_data_list if settlement.point_id) # 5. 查询所有相关的观测点数据 checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(list(point_id_set))).all() checkpoint_dict = {cp.point_id: cp for cp in checkpoints} # 6. 组织数据结构: 以观测点为上层,每个观测点下有沉降数据列表 checkpoint_settlement_map = {} for settlement in settlement_data_list: if settlement.point_id not in checkpoint_settlement_map: checkpoint_settlement_map[settlement.point_id] = [] settlement_dict = { "id": settlement.id, "point_id": settlement.point_id, "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } checkpoint_settlement_map[settlement.point_id].append(settlement_dict) # 7. 构建最终结果 result = [] for point_id, settlements in checkpoint_settlement_map.items(): checkpoint = checkpoint_dict.get(point_id) checkpoint_data = { "point_id": point_id, "aname": checkpoint.aname if checkpoint else None, "section_id": checkpoint.section_id if checkpoint else None, "burial_date": checkpoint.burial_date if checkpoint else None, "settlement_data": settlements } result.append(checkpoint_data) return result def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool: """检查观测点数据是否存在""" checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first() return checkpoint is not None def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入沉降数据, 1.根据观测点ID和期数ID判断是否重复,修复记录,跳过插入操作 2.判断观测点数据是否存在,不存在则记录,跳过插入操作 支持事务回滚,失败时重试一次 """ import logging logger = logging.getLogger(__name__) total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] for attempt in range(2): # 最多重试1次 try: db.begin() success_count = 0 failed_count = 0 failed_items = [] for item_data in data: try: # 判断观测点数据是否存在 checkpoint = self._check_checkpoint_exists(db, item_data.get('point_id')) logger.info(f"Checkpoint {item_data.get('point_id')}: {checkpoint}") if not checkpoint: logger.error(f"Checkpoint {item_data.get('point_id')} not found") logger.error(f"Checkpoint {item_data} not found") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '测点id不存在,跳过插入操作' }) continue settlement = self.get_by_point_and_nyid( db, nyid=item_data.get('NYID'), point_id=item_data.get('point_id') ) if settlement: # 跳过数据 logger.info(f"Continue settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") logger.info(f"Existing settlement data: {settlement}") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '数据已存在,跳过插入操作' }) continue # 更新操作 # settlement.CVALUE = item_data.get('CVALUE') # settlement.MAVALUE = item_data.get('MAVALUE') # settlement.MTIME_W = item_data.get('MTIME_W') # settlement.PRELOADH = item_data.get('PRELOADH') # settlement.PSTATE = item_data.get('PSTATE') # settlement.REMARK = item_data.get('REMARK') # settlement.WORKINFO = item_data.get('WORKINFO') # settlement.createdate = item_data.get('createdate') # settlement.day = item_data.get('day') # settlement.day_jg = item_data.get('day_jg') # settlement.isgzjdxz = item_data.get('isgzjdxz') # settlement.mavalue_bc = item_data.get('mavalue_bc') # settlement.mavalue_lj = item_data.get('mavalue_lj') # settlement.sjName = item_data.get('sjName') # settlement.useflag = item_data.get('useflag') # settlement.workinfoname = item_data.get('workinfoname') # settlement.upd_remark = item_data.get('upd_remark') else: # 新增操作 settlement = SettlementData( point_id=item_data.get('point_id'), CVALUE=item_data.get('CVALUE'), MAVALUE=item_data.get('MAVALUE'), MTIME_W=item_data.get('MTIME_W'), NYID=item_data.get('NYID'), PRELOADH=item_data.get('PRELOADH'), PSTATE=item_data.get('PSTATE'), REMARK=item_data.get('REMARK'), WORKINFO=item_data.get('WORKINFO'), createdate=item_data.get('createdate'), day=item_data.get('day'), day_jg=item_data.get('day_jg'), isgzjdxz=item_data.get('isgzjdxz'), mavalue_bc=item_data.get('mavalue_bc'), mavalue_lj=item_data.get('mavalue_lj'), sjName=item_data.get('sjName'), useflag=item_data.get('useflag'), workinfoname=item_data.get('workinfoname'), upd_remark=item_data.get('upd_remark') ) db.add(settlement) logger.info(f"Created settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") success_count += 1 except Exception as e: failed_count += 1 failed_items.append({ 'data': item_data, 'error': str(e) }) logger.error(f"Failed to process settlement data {item_data.get('point_id')}-{item_data.get('NYID')}: {str(e)}") raise e db.commit() logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}") break except Exception as e: db.rollback() logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") if attempt == 1: # 最后一次重试失败 logger.error("Batch import settlement data failed after retries") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } # 根据水准线路编码获取最新的NYID并获取对应的测点数据 def get_settlement_by_linecode( self, db: Session, linecode: str, num: int = 1 # 新增参数:控制返回的期数,默认1(最新一期) ) -> Dict: """ 根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选 关联逻辑:LevelData.linecode → LevelData.NYID → SettlementData.NYID :param db: 数据库会话 :param linecode: 目标水准线路编码 :param num: 返回的期数(按NYID从大到小排序),默认1(最新一期) :return: 字典格式,包含沉降数据列表,键为 "settlement_data" """ try: logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)") # 1. 根据linecode查询水准线路表,获取所有关联的NYID(去重后按NYID降序排序) nyid_query = db.query(LevelData.NYID)\ .filter(LevelData.linecode == linecode)\ .distinct()\ .order_by(LevelData.NYID.desc()) # 按NYID降序,确保最新的在前 # 根据num参数截取前N期的NYID top_nyids = nyid_query.limit(num).all() if not top_nyids: logger.warning(f"未查询到linecode={linecode}对应的水准线路记录") return {"settlement_data": []} # 提取NYID字符串列表(按降序排列,保持最新的在前) target_nyids = [item.NYID for item in top_nyids] # 2. 根据NYID列表查询沉降数据表,按NYID降序、观测时间升序排列 settlement_records = db.query(SettlementData)\ .filter(SettlementData.NYID.in_(target_nyids))\ .order_by( SettlementData.NYID.desc(), # 期数从新到旧 SettlementData.MTIME_W.asc() # 同期内按观测时间从早到晚 )\ .all() # 3. 转换模型实例为字典列表(处理日期格式) settlement_data = [] for record in settlement_records: record_dict = { "id": record.id, "point_id": record.point_id, "CVALUE": record.CVALUE, "MAVALUE": record.MAVALUE, "MTIME_W": record.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if record.MTIME_W else None, "NYID": record.NYID, "PRELOADH": record.PRELOADH, "PSTATE": record.PSTATE, "REMARK": record.REMARK, "WORKINFO": record.WORKINFO, "createdate": record.createdate.strftime("%Y-%m-%d %H:%M:%S") if record.createdate else None, "day": record.day, "day_jg": record.day_jg, "isgzjdxz": record.isgzjdxz, "mavalue_bc": record.mavalue_bc, "mavalue_lj": record.mavalue_lj, "sjName": record.sjName, "useflag": record.useflag, "workinfoname": record.workinfoname, "upd_remark": record.upd_remark } settlement_data.append(record_dict) logger.info(f"查询完成,linecode={linecode}的前{num}期对应{len(settlement_data)}条沉降数据") return {"settlement_data": settlement_data} except Exception as e: logger.error(f"查询linecode={linecode}的沉降数据失败:{str(e)}", exc_info=True) raise e