From 54ac4037d5033c3b4a96f17d5ce086c740186ea7 Mon Sep 17 00:00:00 2001 From: lhx Date: Mon, 17 Nov 2025 16:14:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=B9=E9=87=8F=E5=AF=BC=E5=85=A5=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/checkpoint.py | 146 ++++++++++++++++++++++++++------- app/services/level_data.py | 153 +++++++++++++++++------------------ app/services/section_data.py | 138 ++++++++++++++++++++----------- 3 files changed, 283 insertions(+), 154 deletions(-) diff --git a/app/services/checkpoint.py b/app/services/checkpoint.py index 5fa9ee6..83ed0ee 100644 --- a/app/services/checkpoint.py +++ b/app/services/checkpoint.py @@ -37,8 +37,10 @@ class CheckpointService(BaseService[Checkpoint]): def batch_import_checkpoints(self, db: Session, data: List) -> Dict[str, Any]: """ - 批量导入观测点数据,根据观测点ID判断是否重复,重复数据改为更新操作 - 判断断面id是否存在,不存在则全部不导入 + 批量导入观测点数据 - 性能优化版 + 使用批量查询和批量操作,大幅提升导入速度 + 1.判断断面id是否存在,不存在则跳过该条数据 + 2.根据观测点ID判断是否重复,重复数据跳过,不进行更新操作 支持事务回滚,失败时重试一次 """ import logging @@ -49,6 +51,16 @@ class CheckpointService(BaseService[Checkpoint]): failed_count = 0 failed_items = [] + if total_count == 0: + return { + 'success': False, + 'message': '导入数据不能为空', + 'total_count': 0, + 'success_count': 0, + 'failed_count': 0, + 'failed_items': [] + } + for attempt in range(2): # 最多重试1次 try: db.begin() @@ -56,40 +68,112 @@ class CheckpointService(BaseService[Checkpoint]): failed_count = 0 failed_items = [] + # ===== 性能优化1:批量查询断面数据(IN查询) ===== + # 统一转换为字符串处理(数据库section_id字段是VARCHAR类型) + section_id_list = list(set(str(item.get('section_id')) for item in data if item.get('section_id'))) + logger.info(f"Checking {len(section_id_list)} unique section_ids in section data") + sections = db.query(SectionData).filter(SectionData.section_id.in_(section_id_list)).all() + section_map = {s.section_id: s for s in sections} + missing_section_ids = set(section_id_list) - set(section_map.keys()) + + # 记录缺失的断面 for item_data in data: - try: - # 判断断面id是否存在 - if not self._check_section_exists(db, item_data.get('section_id')): - logger.error(f"Section {item_data.get('section_id')} not found") - raise Exception(f"Section {item_data.get('section_id')} not found") - - checkpoint = self.get_by_point_id(db, item_data.get('point_id')) - if checkpoint: - # 更新操作 - checkpoint.aname = item_data.get('aname') - checkpoint.section_id = item_data.get('section_id') - checkpoint.burial_date = item_data.get('burial_date') - logger.info(f"Updated checkpoint: {item_data.get('point_id')}") - else: - # 新增操作 - checkpoint = Checkpoint( - point_id=item_data.get('point_id'), - aname=item_data.get('aname'), - section_id=item_data.get('section_id'), - burial_date=item_data.get('burial_date'), - ) - db.add(checkpoint) - logger.info(f"Created checkpoint: {item_data.get('point_id')}") - - success_count += 1 - except Exception as e: + section_id = str(item_data.get('section_id')) # 统一转换为字符串 + if section_id in missing_section_ids: failed_count += 1 failed_items.append({ 'data': item_data, - 'error': str(e) + 'error': '断面ID不存在,跳过插入操作' }) - logger.error(f"Failed to process checkpoint {item_data.get('point_id')}: {str(e)}") - raise e + + # 如果所有数据都失败,直接返回 + if failed_count == total_count: + db.rollback() + return { + 'success': False, + 'message': '所有断面ID都不存在', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } + + # ===== 性能优化2:批量查询现有观测点数据(IN查询) ===== + # 只查询有效的断面数据 + valid_items = [item for item in data if str(item.get('section_id')) not in missing_section_ids] + if valid_items: + # 统一转换为字符串处理(数据库point_id字段是VARCHAR类型) + point_id_list = list(set(str(item.get('point_id')) for item in valid_items if item.get('point_id'))) + existing_checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all() + + # 使用point_id创建查找表 + existing_map = { + checkpoint.point_id: checkpoint + for checkpoint in existing_checkpoints + } + logger.info(f"Found {len(existing_checkpoints)} existing checkpoints") + + # ===== 性能优化3:批量处理插入和跳过 ===== + to_insert = [] + + for item_data in valid_items: + point_id = str(item_data.get('point_id')) # 统一转换为字符串 + + if point_id in existing_map: + # 数据已存在,跳过 + logger.info(f"Continue checkpoint data: {point_id}") + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': '数据已存在,跳过插入操作' + }) + else: + # 记录需要插入的数据 + to_insert.append(item_data) + + # ===== 执行批量插入 ===== + if to_insert: + logger.info(f"Inserting {len(to_insert)} new records") + # 分批插入,每批500条(避免SQL过长) + batch_size = 500 + for i in range(0, len(to_insert), batch_size): + batch = to_insert[i:i + batch_size] + try: + checkpoint_list = [ + Checkpoint( + point_id=str(item.get('point_id')), # 统一转换为字符串 + aname=item.get('aname'), + section_id=str(item.get('section_id')), # 统一转换为字符串 + burial_date=item.get('burial_date') + ) + for item in batch + ] + db.add_all(checkpoint_list) + success_count += len(batch) + logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") + except Exception as e: + failed_count += len(batch) + failed_items.extend([ + { + 'data': item, + 'error': f'插入失败: {str(e)}' + } + for item in batch + ]) + logger.error(f"Failed to insert batch: {str(e)}") + raise e + + # 如果有失败记录,不提交事务 + if failed_items: + db.rollback() + return { + 'success': False, + 'message': f'批量导入失败: {len(failed_items)}条记录处理失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } db.commit() logger.info(f"Batch import checkpoints completed. Success: {success_count}, Failed: {failed_count}") diff --git a/app/services/level_data.py b/app/services/level_data.py index 5c7588b..1b02c2a 100644 --- a/app/services/level_data.py +++ b/app/services/level_data.py @@ -61,7 +61,8 @@ class LevelDataService(BaseService[LevelData]): """ 批量导入水准数据 - 性能优化版 使用批量查询和批量操作,大幅提升导入速度 - 根据期数ID和线路编码判断是否重复,重复数据改为更新操作 + 1.根据期数ID和线路编码判断是否重复,跳过重复数据,不进行更新 + 2.判断沉降数据是否存在,不存在则记录并跳过插入操作 支持事务回滚,失败时重试一次 """ import logging @@ -96,101 +97,99 @@ class LevelDataService(BaseService[LevelData]): settlement_map = {s.NYID: s for s in settlements} missing_nyids = set(nyid_list) - set(settlement_map.keys()) - if missing_nyids: + # 记录缺失的NYID + for item_data in data: + nyid = str(item_data.get('NYID')) # 统一转换为字符串 + if nyid in missing_nyids: + failed_count += 1 + failed_items.append({ + 'data': item_data, + 'error': '期数ID在沉降表中不存在,跳过插入操作' + }) + + # 如果所有数据都失败,直接返回 + if failed_count == total_count: db.rollback() return { 'success': False, - 'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}', + 'message': '所有期数ID在沉降表中都不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, - 'failed_items': [] + 'failed_items': failed_items } # ===== 性能优化2:批量查询现有水准数据(IN查询) ===== - # 构建 (NYID, linecode) 组合键来查找重复数据 - existing_data = db.query(LevelData).filter( - LevelData.NYID.in_(nyid_list) - ).all() + # 只查询有效的NYID数据 + valid_items = [item for item in data if str(item.get('NYID')) not in missing_nyids] + if valid_items: + # 构建 (NYID, linecode) 组合键来查找重复数据 + existing_data = db.query(LevelData).filter( + LevelData.NYID.in_(nyid_list) + ).all() - # 使用组合键创建查找表:key = f"{NYID}_{linecode}" - existing_map = { - f"{item.NYID}_{item.linecode}": item - for item in existing_data - } - logger.info(f"Found {len(existing_data)} existing level records") + # 使用组合键创建查找表:key = f"{NYID}_{linecode}" + existing_map = { + f"{item.NYID}_{item.linecode}": item + for item in existing_data + } + logger.info(f"Found {len(existing_data)} existing level records") - # ===== 性能优化3:批量处理插入和更新 ===== - to_update = [] - to_insert = [] + # ===== 性能优化3:批量处理插入和跳过 ===== + to_insert = [] - for item_data in data: - nyid = str(item_data.get('NYID')) - linecode = item_data.get('linecode') + for item_data in valid_items: + nyid = str(item_data.get('NYID')) # 统一转换为字符串 + linecode = item_data.get('linecode') - # 构建组合键 - key = f"{nyid}_{linecode}" + # 构建组合键 + key = f"{nyid}_{linecode}" - if key in existing_map: - # 记录需要更新的数据 - existing_item = existing_map[key] - to_update.append((existing_item, item_data)) - else: - # 记录需要插入的数据 - to_insert.append(item_data) - - # ===== 执行批量更新 ===== - if to_update: - logger.info(f"Updating {len(to_update)} existing records") - for existing_item, item_data in to_update: - try: - existing_item.benchmarkids = item_data.get('benchmarkids') - existing_item.wsphigh = item_data.get('wsphigh') - existing_item.mtype = item_data.get('mtype') - existing_item.createDate = item_data.get('createDate') - success_count += 1 - except Exception as e: + if key in existing_map: + # 数据已存在,跳过 + logger.info(f"Continue level data: {nyid}-{linecode}") failed_count += 1 failed_items.append({ 'data': item_data, - 'error': f'更新失败: {str(e)}' + 'error': '数据已存在,跳过插入操作' }) - logger.error(f"Failed to update level data: {str(e)}") - raise e + else: + # 记录需要插入的数据 + to_insert.append(item_data) - # ===== 执行批量插入 ===== - if to_insert: - logger.info(f"Inserting {len(to_insert)} new records") - # 分批插入,每批500条(避免SQL过长) - batch_size = 500 - for i in range(0, len(to_insert), batch_size): - batch = to_insert[i:i + batch_size] - try: - level_data_list = [ - LevelData( - linecode=str(item.get('linecode')), # 统一转换为字符串 - benchmarkids=item.get('benchmarkids'), - wsphigh=item.get('wsphigh'), - mtype=item.get('mtype'), - NYID=str(item.get('NYID')), - createDate=item.get('createDate') - ) - for item in batch - ] - db.add_all(level_data_list) - success_count += len(batch) - logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") - except Exception as e: - failed_count += len(batch) - failed_items.extend([ - { - 'data': item, - 'error': f'插入失败: {str(e)}' - } - for item in batch - ]) - logger.error(f"Failed to insert batch: {str(e)}") - raise e + # ===== 执行批量插入 ===== + if to_insert: + logger.info(f"Inserting {len(to_insert)} new records") + # 分批插入,每批500条(避免SQL过长) + batch_size = 500 + for i in range(0, len(to_insert), batch_size): + batch = to_insert[i:i + batch_size] + try: + level_data_list = [ + LevelData( + linecode=str(item.get('linecode')), # 统一转换为字符串 + benchmarkids=item.get('benchmarkids'), + wsphigh=item.get('wsphigh'), + mtype=item.get('mtype'), + NYID=str(item.get('NYID')), + createDate=item.get('createDate') + ) + for item in batch + ] + db.add_all(level_data_list) + success_count += len(batch) + logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") + except Exception as e: + failed_count += len(batch) + failed_items.extend([ + { + 'data': item, + 'error': f'插入失败: {str(e)}' + } + for item in batch + ]) + logger.error(f"Failed to insert batch: {str(e)}") + raise e # 如果有失败记录,不提交事务 if failed_items: diff --git a/app/services/section_data.py b/app/services/section_data.py index cb78c33..e9a0b56 100644 --- a/app/services/section_data.py +++ b/app/services/section_data.py @@ -251,7 +251,9 @@ class SectionDataService(BaseService[SectionData]): def batch_import_sections(self, db: Session, data: List) -> Dict[str, Any]: """ - 批量导入断面数据,根据断面id判断是否重复,重复数据改为更新操作 + 批量导入断面数据 - 性能优化版 + 使用批量查询和批量操作,大幅提升导入速度 + 根据断面ID判断是否重复,重复数据跳过,不进行更新操作 支持事务回滚,失败时重试一次 """ import logging @@ -262,6 +264,16 @@ class SectionDataService(BaseService[SectionData]): failed_count = 0 failed_items = [] + if total_count == 0: + return { + 'success': False, + 'message': '导入数据不能为空', + 'total_count': 0, + 'success_count': 0, + 'failed_count': 0, + 'failed_items': [] + } + for attempt in range(2): # 最多重试1次 try: db.begin() @@ -269,56 +281,90 @@ class SectionDataService(BaseService[SectionData]): failed_count = 0 failed_items = [] - for item_data in data: - try: - section = self.get_by_section_id(db, item_data.get('section_id')) - if section: - # 更新操作 - section.mileage = item_data.get('mileage') - section.work_site = item_data.get('work_site') - section.basic_types = item_data.get('basic_types') - section.height = item_data.get('height') - section.status = item_data.get('status') - section.number = item_data.get('number') - section.transition_paragraph = item_data.get('transition_paragraph') - section.design_fill_height = item_data.get('design_fill_height') - section.compression_layer_thickness = item_data.get('compression_layer_thickness') - section.treatment_depth = item_data.get('treatment_depth') - section.foundation_treatment_method = item_data.get('foundation_treatment_method') - section.rock_mass_classification = item_data.get('rock_mass_classification') - section.account_id = item_data.get('account_id') - logger.info(f"Updated section: {item_data.get('section_id')}") - else: - # 新增操作 - from ..models.section_data import SectionData - section = SectionData( - section_id=item_data.get('section_id'), - mileage=item_data.get('mileage'), - work_site=item_data.get('work_site'), - basic_types=item_data.get('basic_types'), - height=item_data.get('height'), - status=item_data.get('status'), - number=item_data.get('number'), - transition_paragraph=item_data.get('transition_paragraph'), - design_fill_height=item_data.get('design_fill_height'), - compression_layer_thickness=item_data.get('compression_layer_thickness'), - treatment_depth=item_data.get('treatment_depth'), - foundation_treatment_method=item_data.get('foundation_treatment_method'), - rock_mass_classification=item_data.get('rock_mass_classification'), - account_id=item_data.get('account_id') - ) - db.add(section) - logger.info(f"Created section: {item_data.get('section_id')}") + # ===== 性能优化1:批量查询现有断面数据(IN查询) ===== + # 统一转换为字符串处理(数据库section_id字段是VARCHAR类型) + section_id_list = list(set(str(item.get('section_id')) for item in data if item.get('section_id'))) + logger.info(f"Checking {len(section_id_list)} unique section_ids") + existing_sections = db.query(SectionData).filter(SectionData.section_id.in_(section_id_list)).all() - success_count += 1 - except Exception as e: + # 使用section_id创建查找表 + existing_map = { + section.section_id: section + for section in existing_sections + } + logger.info(f"Found {len(existing_sections)} existing sections") + + # ===== 性能优化2:批量处理插入和跳过 ===== + to_insert = [] + + for item_data in data: + section_id = str(item_data.get('section_id')) # 统一转换为字符串 + + if section_id in existing_map: + # 数据已存在,跳过 + logger.info(f"Continue section data: {section_id}") failed_count += 1 failed_items.append({ 'data': item_data, - 'error': str(e) + 'error': '数据已存在,跳过插入操作' }) - logger.error(f"Failed to process section {item_data.get('section_id')}: {str(e)}") - raise e + else: + # 记录需要插入的数据 + to_insert.append(item_data) + + # ===== 执行批量插入 ===== + if to_insert: + logger.info(f"Inserting {len(to_insert)} new records") + # 分批插入,每批500条(避免SQL过长) + batch_size = 500 + for i in range(0, len(to_insert), batch_size): + batch = to_insert[i:i + batch_size] + try: + section_data_list = [ + SectionData( + section_id=str(item.get('section_id')), # 统一转换为字符串 + mileage=item.get('mileage'), + work_site=item.get('work_site'), + basic_types=item.get('basic_types'), + height=item.get('height'), + status=item.get('status'), + number=str(item.get('number')) if item.get('number') else None, # 统一转换为字符串 + transition_paragraph=item.get('transition_paragraph'), + design_fill_height=item.get('design_fill_height'), + compression_layer_thickness=item.get('compression_layer_thickness'), + treatment_depth=item.get('treatment_depth'), + foundation_treatment_method=item.get('foundation_treatment_method'), + rock_mass_classification=item.get('rock_mass_classification'), + account_id=str(item.get('account_id')) if item.get('account_id') else None # 统一转换为字符串 + ) + for item in batch + ] + db.add_all(section_data_list) + success_count += len(batch) + logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") + except Exception as e: + failed_count += len(batch) + failed_items.extend([ + { + 'data': item, + 'error': f'插入失败: {str(e)}' + } + for item in batch + ]) + logger.error(f"Failed to insert batch: {str(e)}") + raise e + + # 如果有失败记录,不提交事务 + if failed_items: + db.rollback() + return { + 'success': False, + 'message': f'批量导入失败: {len(failed_items)}条记录处理失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } db.commit() logger.info(f"Batch import sections completed. Success: {success_count}, Failed: {failed_count}")