From b1cc24978f36d25a3e518acf86599d7b25a7adac Mon Sep 17 00:00:00 2001 From: lhx Date: Tue, 18 Nov 2025 10:28:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/original_data.py | 55 +++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/app/services/original_data.py b/app/services/original_data.py index b057df8..5b23530 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -389,14 +389,7 @@ class OriginalDataService(BaseService[OriginalData]): def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int: """执行数据导入操作(抽取的公共逻辑)""" - nyid = str(data[0].get('NYID')) # 统一转换为字符串 - # 检查该期数数据是否已存在 - check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") - is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] - - if is_exists > 0: - logger.warning(f"Data for NYID {nyid} already exists in {table_name}, skipping import") - return 0 + logger.info(f"Starting batch import into {table_name}, total records: {len(data)}") # ===== 性能优化:批量查询沉降数据 ===== # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) @@ -409,13 +402,50 @@ class OriginalDataService(BaseService[OriginalData]): if missing_nyids: raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}') - # ===== 性能优化:使用批量插入 ===== + # ===== 性能优化:批量查询现有原始数据(IN查询)===== + # 使用组合键 (NYID, sort) 查询现有数据,过滤重复数据 + existing_data = db.query(text("*")).from_statement( + text(f"SELECT * FROM `{table_name}` WHERE account_id = :account_id") + ).params(account_id=account_id).all() + + # 使用组合键创建查找表:key = f"{NYID}_{sort}" + existing_map = { + f"{item[7]}_{item[8]}": item # NYID是第8个字段(索引7),sort是第9个字段(索引8) + for item in existing_data + } + logger.info(f"Found {len(existing_data)} existing records in {table_name}") + + # ===== 批量处理插入和跳过 ===== + to_insert = [] + skipped_count = 0 + + for item_data in data: + nyid = str(item_data.get('NYID')) # 统一转换为字符串 + sort = item_data.get('sort') + + # 构建组合键 + key = f"{nyid}_{sort}" + + if key in existing_map: + # 数据已存在,跳过 + logger.info(f"Skip existing data: NYID {nyid}, sort {sort}") + skipped_count += 1 + else: + # 记录需要插入的数据 + to_insert.append(item_data) + + logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert") + + # ===== 执行批量插入 ===== + if not to_insert: + logger.info("No new records to insert, all data already exists") + return 0 + # 将数据分组,每组1000条(MySQL默认支持) batch_size = 1000 total_inserted = 0 - logger.info(f"Starting batch insert into {table_name}, total records: {len(data)}") - for i in range(0, len(data), batch_size): - batch_data = data[i:i + batch_size] + for i in range(0, len(to_insert), batch_size): + batch_data = to_insert[i:i + batch_size] # 构建批量参数 values_list = [] @@ -447,4 +477,5 @@ class OriginalDataService(BaseService[OriginalData]): total_inserted += len(batch_data) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") + logger.info(f"Batch import completed: {total_inserted} records inserted, {skipped_count} duplicates skipped") return total_inserted \ No newline at end of file