from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from ..models.settlement_data import SettlementData from ..models.checkpoint import Checkpoint from .base import BaseService from sqlalchemy.orm import Session from typing import Dict, List from ..models.settlement_data import SettlementData from ..models.level_data import LevelData from ..models.section_data import SectionData import logging from datetime import datetime from sqlalchemy import distinct logger = logging.getLogger(__name__) class SettlementDataService(BaseService[SettlementData]): def __init__(self): super().__init__(SettlementData) def get_by_point_id(self, db: Session, point_id: str) -> List[SettlementData]: """根据观测点ID获取沉降数据""" return self.get_by_field(db, "point_id", point_id) def get_by_point_ids(self, db: Session, point_ids: List[str]) -> List[SettlementData]: """根据观测点ID列表批量获取沉降数据""" if not point_ids: return [] return db.query(SettlementData).filter(SettlementData.point_id.in_(point_ids)).all() def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]: """根据期数ID获取沉降数据""" return self.get_by_field(db, "NYID", nyid) def get_by_nyid_and_point_id(self, db: Session, nyid: str, point_id: str) -> List[SettlementData]: """根据期数ID和观测点ID获取沉降数据""" return self.get_by_field(db, "NYID", nyid, "point_id", point_id) def get_by_point_and_nyid(self, db: Session, point_id: str = None, nyid: str = None) -> Optional[SettlementData]: """根据观测点ID和期数ID获取沉降数据""" return db.query(SettlementData).filter( SettlementData.point_id == point_id if point_id else True, SettlementData.NYID == nyid if nyid else True ).first() def search_settlement_data(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, skip: int = 0, limit: Optional[int] = None) -> List[SettlementData]: """根据多个条件搜索沉降数据,按上传时间倒序排序""" # query = db.query(SettlementData) # 返回数据多加一个 aname 字段 query = db.query(SettlementData, Checkpoint.aname).outerjoin( Checkpoint, SettlementData.point_id == Checkpoint.point_id ) if id is not None: query = query.filter(SettlementData.id == id) if point_id is not None: query = query.filter(SettlementData.point_id == point_id) if nyid is not None: query = query.filter(SettlementData.NYID == nyid) if sjName is not None: query = query.filter(SettlementData.sjName == sjName) if workinfoname is not None: query = query.filter(SettlementData.workinfoname == workinfoname) # 按上传时间倒序排序 query = query.order_by(SettlementData.createdate.desc()) # 添加分页支持 if skip > 0: query = query.offset(skip) if limit is not None and limit > 0: query = query.limit(limit) result_tuples = query.all() settlement_data_list = [] for settlement, aname in result_tuples: # 动态添加aname属性,无匹配时为None setattr(settlement, 'aname', aname) settlement_data_list.append(settlement) return settlement_data_list # return query.all() def search_settlement_data_by_point_ids_formatted(self, db: Session, point_ids: List[str], id: Optional[int] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, skip: int = 0, limit: Optional[int] = None) -> Dict[str, Any]: """ 支持多个point_id的沉降数据批量查询(优化版本) 一次性查询,避免多次数据库访问 """ if not point_ids: return { "data": [], "total": 0, "skip": skip, "limit": limit } # 先获取总数(不计分页) count_query = db.query(SettlementData) if id is not None: count_query = count_query.filter(SettlementData.id == id) if nyid is not None: count_query = count_query.filter(SettlementData.NYID == nyid) if sjName is not None: count_query = count_query.filter(SettlementData.sjName == sjName) if workinfoname is not None: count_query = count_query.filter(SettlementData.workinfoname == workinfoname) # 支持多个point_id(使用IN查询) count_query = count_query.filter(SettlementData.point_id.in_(point_ids)) total_count = count_query.count() # 获取分页数据(使用相同的过滤条件) query = db.query(SettlementData) if id is not None: query = query.filter(SettlementData.id == id) if nyid is not None: query = query.filter(SettlementData.NYID == nyid) if sjName is not None: query = query.filter(SettlementData.sjName == sjName) if workinfoname is not None: query = query.filter(SettlementData.workinfoname == workinfoname) # 支持多个point_id query = query.filter(SettlementData.point_id.in_(point_ids)) # 按上传时间倒序排序 query = query.order_by(SettlementData.createdate.desc()) # 添加分页支持 if skip > 0: query = query.offset(skip) if limit is not None and limit > 0: query = query.limit(limit) settlement_data = query.all() # 格式化结果 result = [] for settlement in settlement_data: settlement_dict = { "id": settlement.id, "point_id": settlement.point_id, "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } result.append(settlement_dict) return { "data": result, "total": total_count, "skip": skip, "limit": limit } def search_settlement_data_formatted(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, skip: int = 0, limit: Optional[int] = None) -> Dict[str, Any]: """查询沉降数据并返回格式化结果,按上传时间倒序排序(支持分页)""" # 先获取总数(不计分页) count_query = db.query(distinct(SettlementData.id)).outerjoin( Checkpoint, SettlementData.point_id == Checkpoint.point_id ) if id is not None: count_query = count_query.filter(SettlementData.id == id) if point_id is not None: count_query = count_query.filter(SettlementData.point_id == point_id) if nyid is not None: count_query = count_query.filter(SettlementData.NYID == nyid) if sjName is not None: count_query = count_query.filter(SettlementData.sjName == sjName) if workinfoname is not None: count_query = count_query.filter(SettlementData.workinfoname == workinfoname) total_count = count_query.count() # 获取分页数据 settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname, skip, limit) result = [] for settlement in settlement_data: settlement_dict = { "id": settlement.id, "aname": settlement.aname, "point_id": settlement.point_id, "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } result.append(settlement_dict) return { "data": result, "total": total_count, "skip": skip, "limit": limit } def search_settlement_checkpoint_data_formatted(self, db: Session, id: Optional[int] = None, point_id: Optional[str] = None, nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, linecode: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ 通过水准线路编码查询沉降数据与观测点数据 业务逻辑: 1. 先查询水准数据(通过linecode) 2. 用水准数据的NYID查询沉降数据 3. 将沉降数据中观测点id对应的观测点数据查询 4. 返回以观测点为上层的结构,每个观测点下有沉降数据列表 """ from ..models.level_data import LevelData from ..models.checkpoint import Checkpoint # 1. 查询水准数据(通过linecode) query = db.query(LevelData) if linecode is not None: query = query.filter(LevelData.linecode == linecode) level_data_list = query.all() if not level_data_list: return [] # 2. 收集所有的NYID nyid_list = [level.NYID for level in level_data_list if level.NYID] if not nyid_list: return [] # 3. 查询沉降数据(通过NYID列表) settlement_query = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)) # 应用其他查询条件 if id is not None: settlement_query = settlement_query.filter(SettlementData.id == id) if point_id is not None: settlement_query = settlement_query.filter(SettlementData.point_id == point_id) if sjName is not None: settlement_query = settlement_query.filter(SettlementData.sjName == sjName) if workinfoname is not None: settlement_query = settlement_query.filter(SettlementData.workinfoname == workinfoname) # 按上传时间倒序排序 settlement_query = settlement_query.order_by(SettlementData.createdate.desc()) # 如果指定了limit,则限制返回数量 if limit is not None and limit > 0: settlement_query = settlement_query.limit(limit) settlement_data_list = settlement_query.all() if not settlement_data_list: return [] # 4. 收集所有的观测点ID point_id_set = set(settlement.point_id for settlement in settlement_data_list if settlement.point_id) # 5. 查询所有相关的观测点数据 checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(list(point_id_set))).all() checkpoint_dict = {cp.point_id: cp for cp in checkpoints} # 6. 组织数据结构: 以观测点为上层,每个观测点下有沉降数据列表 checkpoint_settlement_map = {} for settlement in settlement_data_list: if settlement.point_id not in checkpoint_settlement_map: checkpoint_settlement_map[settlement.point_id] = [] settlement_dict = { "id": settlement.id, "point_id": settlement.point_id, "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } checkpoint_settlement_map[settlement.point_id].append(settlement_dict) # 7. 构建最终结果 result = [] for point_id, settlements in checkpoint_settlement_map.items(): checkpoint = checkpoint_dict.get(point_id) checkpoint_data = { "point_id": point_id, "aname": checkpoint.aname if checkpoint else None, "section_id": checkpoint.section_id if checkpoint else None, "burial_date": checkpoint.burial_date if checkpoint else None, "settlement_data": settlements } result.append(checkpoint_data) return result def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool: """检查观测点数据是否存在""" checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first() return checkpoint is not None def batch_import_settlement_data(self, db: Session, data: List, duplicate_action: str = "skip") -> Dict[str, Any]: """ 批量导入沉降数据 - 性能优化版 使用批量查询和批量操作,大幅提升导入速度 1.根据观测点ID和期数ID判断是否重复 - duplicate_action="skip": 跳过重复数据(默认) - duplicate_action="overwrite": 覆盖重复数据(更新现有记录) 2.判断观测点数据是否存在,不存在则记录,跳过插入操作 支持事务回滚,失败时重试一次 """ import logging logger = logging.getLogger(__name__) total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] updated_count = 0 # 新增:记录更新数量 if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } for attempt in range(2): # 最多重试1次 try: db.begin() success_count = 0 failed_count = 0 failed_items = [] updated_count = 0 # ===== 性能优化1:批量查询观测点数据(IN查询) ===== # 统一转换为字符串处理(数据库point_id字段是VARCHAR类型) point_id_list = list(set(str(item.get('point_id')) for item in data if item.get('point_id'))) logger.info(f"Checking {len(point_id_list)} unique point_ids in checkpoint data") checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all() checkpoint_map = {c.point_id: c for c in checkpoints} missing_point_ids = set(point_id_list) - set(checkpoint_map.keys()) # 记录缺失的观测点 for item_data in data: point_id = str(item_data.get('point_id')) # 统一转换为字符串 if point_id in missing_point_ids: failed_count += 1 failed_items.append({ 'data': item_data, 'error': '测点id不存在,跳过插入操作' }) # 如果所有数据都失败,直接返回 if failed_count == total_count: db.rollback() return { 'success': False, 'message': '所有观测点ID都不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } # ===== 性能优化2:批量查询现有沉降数据(IN查询) ===== # 只查询有效的观测点数据(注意:需要转换为字符串比较) valid_items = [item for item in data if str(item.get('point_id')) not in missing_point_ids] if valid_items: # 使用组合键 (point_id, NYID) 查询现有数据 existing_data = db.query(SettlementData).filter( SettlementData.point_id.in_(point_id_list) ).all() # 使用组合键创建查找表:key = f"{point_id}_{NYID}" existing_map = { f"{item.point_id}_{item.NYID}": item for item in existing_data } logger.info(f"Found {len(existing_data)} existing settlement records") # ===== 性能优化3:批量处理插入、更新和跳过 ===== to_insert = [] to_update = [] # 新增:需要更新的数据 for item_data in valid_items: point_id = str(item_data.get('point_id')) # 统一转换为字符串 nyid = str(item_data.get('NYID')) # 统一转换为字符串 # 构建组合键 key = f"{point_id}_{nyid}" if key in existing_map: if duplicate_action == "overwrite": # 覆盖模式:记录需要更新的数据 to_update.append((existing_map[key], item_data)) else: # 跳过模式:数据已存在,跳过 logger.info(f"Continue settlement data: {point_id}-{nyid}") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '数据已存在,跳过插入操作' }) else: # 记录需要插入的数据 to_insert.append(item_data) # ===== 执行批量更新(覆盖模式) ===== if to_update: logger.info(f"Updating {len(to_update)} existing records (overwrite mode)") for existing_record, item_data in to_update: try: # 更新现有记录的字段 existing_record.CVALUE = item_data.get('CVALUE') existing_record.MAVALUE = item_data.get('MAVALUE') existing_record.MTIME_W = item_data.get('MTIME_W') existing_record.PRELOADH = item_data.get('PRELOADH') existing_record.PSTATE = item_data.get('PSTATE') existing_record.REMARK = item_data.get('REMARK') existing_record.WORKINFO = item_data.get('WORKINFO') existing_record.createdate = item_data.get('createdate') existing_record.day = item_data.get('day') existing_record.day_jg = item_data.get('day_jg') existing_record.isgzjdxz = item_data.get('isgzjdxz') existing_record.mavalue_bc = item_data.get('mavalue_bc') existing_record.mavalue_lj = item_data.get('mavalue_lj') existing_record.sjName = item_data.get('sjName') existing_record.useflag = item_data.get('useflag') existing_record.workinfoname = item_data.get('workinfoname') existing_record.upd_remark = item_data.get('upd_remark') updated_count += 1 except Exception as e: failed_count += 1 failed_items.append({ 'data': item_data, 'error': f'更新失败: {str(e)}' }) logger.error(f"Failed to update record: {str(e)}") # 刷新更新到数据库 db.flush() logger.info(f"Updated {updated_count} records successfully") # ===== 执行批量插入 ===== if to_insert: logger.info(f"Inserting {len(to_insert)} new records") # 分批插入,每批500条(避免SQL过长) batch_size = 500 for i in range(0, len(to_insert), batch_size): batch = to_insert[i:i + batch_size] try: settlement_data_list = [ SettlementData( point_id=str(item.get('point_id')), # 统一转换为字符串 CVALUE=item.get('CVALUE'), MAVALUE=item.get('MAVALUE'), MTIME_W=item.get('MTIME_W'), NYID=str(item.get('NYID')), PRELOADH=item.get('PRELOADH'), PSTATE=item.get('PSTATE'), REMARK=item.get('REMARK'), WORKINFO=item.get('WORKINFO'), createdate=item.get('createdate'), day=item.get('day'), day_jg=item.get('day_jg'), isgzjdxz=item.get('isgzjdxz'), mavalue_bc=item.get('mavalue_bc'), mavalue_lj=item.get('mavalue_lj'), sjName=item.get('sjName'), useflag=item.get('useflag'), workinfoname=item.get('workinfoname'), upd_remark=item.get('upd_remark') ) for item in batch ] db.add_all(settlement_data_list) success_count += len(batch) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") except Exception as e: failed_count += len(batch) failed_items.extend([ { 'data': item, 'error': f'插入失败: {str(e)}' } for item in batch ]) logger.error(f"Failed to insert batch: {str(e)}") raise e # 如果有插入/更新失败记录(不是跳过记录),不提交事务 operation_failed_items = [item for item in failed_items if '插入失败' in item.get('error', '') or '更新失败' in item.get('error', '')] if operation_failed_items: db.rollback() return { 'success': False, 'message': f'批量导入失败: {len(operation_failed_items)}条记录操作失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'updated_count': updated_count, 'failed_items': failed_items } db.commit() logger.info(f"Batch import settlement data completed. Success: {success_count}, Updated: {updated_count}, Failed: {failed_count}") break except Exception as e: db.rollback() logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") if attempt == 1: # 最后一次重试失败 logger.error("Batch import settlement data failed after retries") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'updated_count': 0, 'failed_items': failed_items } # 构建成功消息 if duplicate_action == "overwrite" and updated_count > 0: message = f'批量导入完成,新增{success_count}条,更新{updated_count}条' elif failed_count == 0: message = '批量导入完成' else: message = '部分导入失败' return { 'success': True, 'message': message, 'total_count': total_count, 'success_count': success_count, 'updated_count': updated_count, 'failed_count': failed_count, 'failed_items': failed_items } # 根据水准线路编码获取最新的NYID并获取对应的测点数据 def get_settlement_by_linecode( self, db: Session, linecode: str, num: int = 1 # 控制返回的期数,默认1(最新一期) ) -> Dict: """ 根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选 关联逻辑: LevelData.linecode → LevelData.NYID → SettlementData.NYID SettlementData.point_id(字符串)→ Checkpoint.point_id → Checkpoint.section_id → SectionData.section_id → SectionData.work_site :param db: 数据库会话 :param linecode: 目标水准线路编码 :param num: 返回的期数(按NYID从大到小排序),默认1(最新一期) :return: 字典格式,包含沉降数据列表,键为 "settlement_data" """ try: logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)") # 1. 根据linecode查询水准线路表,获取前N期的NYID nyid_query = db.query(LevelData.NYID)\ .filter(LevelData.linecode == linecode)\ .distinct()\ .order_by(LevelData.NYID.desc()) top_nyids = nyid_query.limit(num).all() if not top_nyids: logger.warning(f"未查询到linecode={linecode}对应的水准线路记录") return {"settlement_data": []} target_nyids = [item.NYID for item in top_nyids] # 2. 关联查询:沉降数据 → 观测点表 → 断面表(新增查询Checkpoint.aname) settlement_records = db.query( SettlementData, Checkpoint.section_id, # 从Checkpoint模型获取section_id Checkpoint.aname, # 新增:从Checkpoint模型获取测点名称aname SectionData.work_site # 从SectionData模型获取work_site )\ .join( Checkpoint, # 关联观测点模型(类名) SettlementData.point_id == Checkpoint.point_id, # 字符串类型匹配 isouter=True # 左连接:避免测点未关联观测点时丢失数据 )\ .join( SectionData, # 关联断面模型(类名) Checkpoint.section_id == SectionData.section_id, # 字符串类型匹配 isouter=True # 左连接:避免断面ID未关联断面表时丢失数据 )\ .filter(SettlementData.NYID.in_(target_nyids))\ .order_by( SettlementData.NYID.desc(), # 期数从新到旧 SettlementData.MTIME_W.asc() # 同期内按观测时间升序 )\ .all() # 3. 转换数据并新增字段 settlement_data = [] for record in settlement_records: # 解析查询结果(元组:(沉降数据实例, section_id, aname, work_site)) settlement, section_id, aname, work_site = record # 根据work_site判断work_type(默认0表示未匹配或无数据) 涵洞H 沉降板L 观测桩G和Z(分标段) B 路基 work_type = 0 if work_site: work_site_str = str(work_site).strip() # 确保为字符串且去空格 if "S" in aname: work_type = 1 elif "L" in aname or "G" in aname or "Z" in aname or "B" in aname: work_type = 2 elif "T" in aname or "D" in aname or "C " in aname: work_type = 3 elif "H" in aname : work_type = 4 # 组装返回字典(新增aname字段) record_dict = { "id": settlement.id, "point_id": settlement.point_id, "aname": aname, # 新增:测点名称(从Checkpoint表获取) "section_id": section_id, # 新增:观测点关联的断面ID "work_site": work_site, # 新增:断面的工点信息 "work_type": work_type, # 新增:工点类型编码(1-隧道,2-区间路基,3-桥,4-) "CVALUE": settlement.CVALUE, "MAVALUE": settlement.MAVALUE, "MTIME_W": settlement.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if settlement.MTIME_W else None, "NYID": settlement.NYID, "PRELOADH": settlement.PRELOADH, "PSTATE": settlement.PSTATE, "REMARK": settlement.REMARK, "WORKINFO": settlement.WORKINFO, "createdate": settlement.createdate.strftime("%Y-%m-%d %H:%M:%S") if settlement.createdate else None, "day": settlement.day, "day_jg": settlement.day_jg, "isgzjdxz": settlement.isgzjdxz, "mavalue_bc": settlement.mavalue_bc, "mavalue_lj": settlement.mavalue_lj, "sjName": settlement.sjName, "useflag": settlement.useflag, "workinfoname": settlement.workinfoname, "upd_remark": settlement.upd_remark } settlement_data.append(record_dict) logger.info(f"查询完成,linecode={linecode}的前{num}期对应{len(settlement_data)}条沉降数据") return {"settlement_data": settlement_data} except Exception as e: logger.error(f"查询linecode={linecode}的沉降数据失败:{str(e)}", exc_info=True) raise e