diff --git a/app/core/database.py b/app/core/database.py index 756f0de..9a9fb2b 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -3,7 +3,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from .config import settings -engine = create_engine(settings.DATABASE_URL, echo=settings.APP_DEBUG, pool_size=15, max_overflow=30, pool_timeout=60, pool_recycle=300) +engine = create_engine(settings.DATABASE_URL, echo=False, pool_size=15, max_overflow=30, pool_timeout=60, pool_recycle=300) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() diff --git a/app/services/level_data.py b/app/services/level_data.py index f5237aa..35ee07a 100644 --- a/app/services/level_data.py +++ b/app/services/level_data.py @@ -59,7 +59,9 @@ class LevelDataService(BaseService[LevelData]): def batch_import_level_data(self, db: Session, data: List) -> Dict[str, Any]: """ - 批量导入水准数据,根据期数ID和线路编码判断是否重复,重复数据改为更新操作 + 批量导入水准数据 - 性能优化版 + 使用批量查询和批量操作,大幅提升导入速度 + 根据期数ID和线路编码判断是否重复,重复数据改为更新操作 支持事务回滚,失败时重试一次 """ import logging @@ -70,6 +72,16 @@ class LevelDataService(BaseService[LevelData]): failed_count = 0 failed_items = [] + if total_count == 0: + return { + 'success': False, + 'message': '导入数据不能为空', + 'total_count': 0, + 'success_count': 0, + 'failed_count': 0, + 'failed_items': [] + } + for attempt in range(2): # 最多重试1次 try: db.begin() @@ -77,49 +89,120 @@ class LevelDataService(BaseService[LevelData]): failed_count = 0 failed_items = [] + # ===== 性能优化1:批量查询沉降数据(IN查询) ===== + nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) + logger.info(f"Checking {len(nyid_list)} unique NYIDs in settlement data") + settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() + settlement_map = {s.NYID: s for s in settlements} + missing_nyids = set(nyid_list) - set(settlement_map.keys()) + + if missing_nyids: + db.rollback() + return { + 'success': False, + 'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': [] + } + + # ===== 性能优化2:批量查询现有水准数据(IN查询) ===== + # 构建 (NYID, linecode) 组合键来查找重复数据 + existing_data = db.query(LevelData).filter( + LevelData.NYID.in_(nyid_list) + ).all() + + # 使用组合键创建查找表:key = f"{NYID}_{linecode}" + existing_map = { + f"{item.NYID}_{item.linecode}": item + for item in existing_data + } + logger.info(f"Found {len(existing_data)} existing level records") + + # ===== 性能优化3:批量处理插入和更新 ===== + to_update = [] + to_insert = [] + for item_data in data: - try: + nyid = str(item_data.get('NYID')) + linecode = item_data.get('linecode') - # 判断期数id沉降数据是否存在 - settlement = self._check_settlement_exists(db, item_data.get('NYID')) - if not settlement: - logger.error(f"Settlement {item_data.get('NYID')} not found") - raise Exception(f"Settlement {item_data.get('NYID')} not found") + # 构建组合键 + key = f"{nyid}_{linecode}" - level_data = self.get_by_nyid_and_linecode( - db, - # item_data.get('linecode'), - nyid=item_data.get('NYID') - ) - if level_data: - # 更新操作 - level_data.benchmarkids = item_data.get('benchmarkids') - level_data.wsphigh = item_data.get('wsphigh') - level_data.mtype = item_data.get('mtype') - level_data.createDate = item_data.get('createDate') - logger.info(f"Updated level data: {item_data.get('linecode')}-{item_data.get('NYID')}") - else: - # 新增操作 - level_data = LevelData( - linecode=item_data.get('linecode'), - benchmarkids=item_data.get('benchmarkids'), - wsphigh=item_data.get('wsphigh'), - mtype=item_data.get('mtype'), - NYID=item_data.get('NYID'), - createDate=item_data.get('createDate') - ) - db.add(level_data) - logger.info(f"Created level data: {item_data.get('linecode')}-{item_data.get('NYID')}") + if key in existing_map: + # 记录需要更新的数据 + existing_item = existing_map[key] + to_update.append((existing_item, item_data)) + else: + # 记录需要插入的数据 + to_insert.append(item_data) - success_count += 1 - except Exception as e: - failed_count += 1 - failed_items.append({ - 'data': item_data, - 'error': str(e) - }) - logger.error(f"Failed to process level data {item_data.get('linecode')}-{item_data.get('NYID')}: {str(e)}") - raise e + # ===== 执行批量更新 ===== + if to_update: + logger.info(f"Updating {len(to_update)} existing records") + for existing_item, item_data in to_update: + try: + existing_item.benchmarkids = item_data.get('benchmarkids') + existing_item.wsphigh = item_data.get('wsphigh') + existing_item.mtype = item_data.get('mtype') + existing_item.createDate = item_data.get('createDate') + success_count += 1 + except Exception as e: + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': f'更新失败: {str(e)}' + }) + logger.error(f"Failed to update level data: {str(e)}") + raise e + + # ===== 执行批量插入 ===== + if to_insert: + logger.info(f"Inserting {len(to_insert)} new records") + # 分批插入,每批500条(避免SQL过长) + batch_size = 500 + for i in range(0, len(to_insert), batch_size): + batch = to_insert[i:i + batch_size] + try: + level_data_list = [ + LevelData( + linecode=item.get('linecode'), + benchmarkids=item.get('benchmarkids'), + wsphigh=item.get('wsphigh'), + mtype=item.get('mtype'), + NYID=str(item.get('NYID')), + createDate=item.get('createDate') + ) + for item in batch + ] + db.add_all(level_data_list) + success_count += len(batch) + logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") + except Exception as e: + failed_count += len(batch) + failed_items.extend([ + { + 'data': item, + 'error': f'插入失败: {str(e)}' + } + for item in batch + ]) + logger.error(f"Failed to insert batch: {str(e)}") + raise e + + # 如果有失败记录,不提交事务 + if failed_items: + db.rollback() + return { + 'success': False, + 'message': f'批量导入失败: {len(failed_items)}条记录处理失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } db.commit() logger.info(f"Batch import level data completed. Success: {success_count}, Failed: {failed_count}") diff --git a/app/services/settlement_data.py b/app/services/settlement_data.py index 3aa99f4..c6ae766 100644 --- a/app/services/settlement_data.py +++ b/app/services/settlement_data.py @@ -341,7 +341,8 @@ class SettlementDataService(BaseService[SettlementData]): def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]: """ - 批量导入沉降数据, + 批量导入沉降数据 - 性能优化版 + 使用批量查询和批量操作,大幅提升导入速度 1.根据观测点ID和期数ID判断是否重复,修复记录,跳过插入操作 2.判断观测点数据是否存在,不存在则记录,跳过插入操作 支持事务回滚,失败时重试一次 @@ -354,6 +355,16 @@ class SettlementDataService(BaseService[SettlementData]): failed_count = 0 failed_items = [] + if total_count == 0: + return { + 'success': False, + 'message': '导入数据不能为空', + 'total_count': 0, + 'success_count': 0, + 'failed_count': 0, + 'failed_items': [] + } + for attempt in range(2): # 最多重试1次 try: db.begin() @@ -361,91 +372,131 @@ class SettlementDataService(BaseService[SettlementData]): failed_count = 0 failed_items = [] + # ===== 性能优化1:批量查询观测点数据(IN查询) ===== + point_id_list = list(set(item.get('point_id') for item in data if item.get('point_id'))) + logger.info(f"Checking {len(point_id_list)} unique point_ids in checkpoint data") + checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all() + checkpoint_map = {c.point_id: c for c in checkpoints} + missing_point_ids = set(point_id_list) - set(checkpoint_map.keys()) + + # 记录缺失的观测点 for item_data in data: - try: + point_id = item_data.get('point_id') + if point_id in missing_point_ids: + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': '测点id不存在,跳过插入操作' + }) - # 判断观测点数据是否存在 - checkpoint = self._check_checkpoint_exists(db, item_data.get('point_id')) - logger.info(f"Checkpoint {item_data.get('point_id')}: {checkpoint}") - if not checkpoint: - logger.error(f"Checkpoint {item_data.get('point_id')} not found") - logger.error(f"Checkpoint {item_data} not found") - failed_count += 1 - failed_items.append({ - 'data': item_data, - 'error': '测点id不存在,跳过插入操作' - }) - continue + # 如果所有数据都失败,直接返回 + if failed_count == total_count: + db.rollback() + return { + 'success': False, + 'message': '所有观测点ID都不存在', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } - settlement = self.get_by_point_and_nyid( - db, - nyid=item_data.get('NYID'), - point_id=item_data.get('point_id') - ) - if settlement: - # 跳过数据 - logger.info(f"Continue settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") - logger.info(f"Existing settlement data: {settlement}") + # ===== 性能优化2:批量查询现有沉降数据(IN查询) ===== + # 只查询有效的观测点数据 + valid_items = [item for item in data if item.get('point_id') not in missing_point_ids] + if valid_items: + # 使用组合键 (point_id, NYID) 查询现有数据 + existing_data = db.query(SettlementData).filter( + SettlementData.point_id.in_(point_id_list) + ).all() + + # 使用组合键创建查找表:key = f"{point_id}_{NYID}" + existing_map = { + f"{item.point_id}_{item.NYID}": item + for item in existing_data + } + logger.info(f"Found {len(existing_data)} existing settlement records") + + # ===== 性能优化3:批量处理插入和跳过 ===== + to_insert = [] + + for item_data in valid_items: + point_id = item_data.get('point_id') + nyid = str(item_data.get('NYID')) + + # 构建组合键 + key = f"{point_id}_{nyid}" + + if key in existing_map: + # 数据已存在,跳过 + logger.info(f"Continue settlement data: {point_id}-{nyid}") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '数据已存在,跳过插入操作' }) - continue - # 更新操作 - # settlement.CVALUE = item_data.get('CVALUE') - # settlement.MAVALUE = item_data.get('MAVALUE') - # settlement.MTIME_W = item_data.get('MTIME_W') - # settlement.PRELOADH = item_data.get('PRELOADH') - # settlement.PSTATE = item_data.get('PSTATE') - # settlement.REMARK = item_data.get('REMARK') - # settlement.WORKINFO = item_data.get('WORKINFO') - # settlement.createdate = item_data.get('createdate') - # settlement.day = item_data.get('day') - # settlement.day_jg = item_data.get('day_jg') - # settlement.isgzjdxz = item_data.get('isgzjdxz') - # settlement.mavalue_bc = item_data.get('mavalue_bc') - # settlement.mavalue_lj = item_data.get('mavalue_lj') - # settlement.sjName = item_data.get('sjName') - # settlement.useflag = item_data.get('useflag') - # settlement.workinfoname = item_data.get('workinfoname') - # settlement.upd_remark = item_data.get('upd_remark') - else: - # 新增操作 - settlement = SettlementData( - point_id=item_data.get('point_id'), - CVALUE=item_data.get('CVALUE'), - MAVALUE=item_data.get('MAVALUE'), - MTIME_W=item_data.get('MTIME_W'), - NYID=item_data.get('NYID'), - PRELOADH=item_data.get('PRELOADH'), - PSTATE=item_data.get('PSTATE'), - REMARK=item_data.get('REMARK'), - WORKINFO=item_data.get('WORKINFO'), - createdate=item_data.get('createdate'), - day=item_data.get('day'), - day_jg=item_data.get('day_jg'), - isgzjdxz=item_data.get('isgzjdxz'), - mavalue_bc=item_data.get('mavalue_bc'), - mavalue_lj=item_data.get('mavalue_lj'), - sjName=item_data.get('sjName'), - useflag=item_data.get('useflag'), - workinfoname=item_data.get('workinfoname'), - upd_remark=item_data.get('upd_remark') - ) - db.add(settlement) - logger.info(f"Created settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}") + # 记录需要插入的数据 + to_insert.append(item_data) - success_count += 1 - except Exception as e: - failed_count += 1 - failed_items.append({ - 'data': item_data, - 'error': str(e) - }) - logger.error(f"Failed to process settlement data {item_data.get('point_id')}-{item_data.get('NYID')}: {str(e)}") - raise e + # ===== 执行批量插入 ===== + if to_insert: + logger.info(f"Inserting {len(to_insert)} new records") + # 分批插入,每批500条(避免SQL过长) + batch_size = 500 + for i in range(0, len(to_insert), batch_size): + batch = to_insert[i:i + batch_size] + try: + settlement_data_list = [ + SettlementData( + point_id=item.get('point_id'), + CVALUE=item.get('CVALUE'), + MAVALUE=item.get('MAVALUE'), + MTIME_W=item.get('MTIME_W'), + NYID=str(item.get('NYID')), + PRELOADH=item.get('PRELOADH'), + PSTATE=item.get('PSTATE'), + REMARK=item.get('REMARK'), + WORKINFO=item.get('WORKINFO'), + createdate=item.get('createdate'), + day=item.get('day'), + day_jg=item.get('day_jg'), + isgzjdxz=item.get('isgzjdxz'), + mavalue_bc=item.get('mavalue_bc'), + mavalue_lj=item.get('mavalue_lj'), + sjName=item.get('sjName'), + useflag=item.get('useflag'), + workinfoname=item.get('workinfoname'), + upd_remark=item.get('upd_remark') + ) + for item in batch + ] + db.add_all(settlement_data_list) + success_count += len(batch) + logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") + except Exception as e: + failed_count += len(batch) + failed_items.extend([ + { + 'data': item, + 'error': f'插入失败: {str(e)}' + } + for item in batch + ]) + logger.error(f"Failed to insert batch: {str(e)}") + raise e + + # 如果有失败记录,不提交事务 + if failed_items: + db.rollback() + return { + 'success': False, + 'message': f'批量导入失败: {len(failed_items)}条记录处理失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } db.commit() logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}")