From 0e782baed958ef3373b313036c0bb08f0a14fa91 Mon Sep 17 00:00:00 2001 From: lhx Date: Tue, 18 Nov 2025 09:54:15 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=8E=9F=E5=A7=8B=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BB=BA=E8=A1=A8=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/original_data.py | 193 +++++++++++++++++----------------- 1 file changed, 99 insertions(+), 94 deletions(-) diff --git a/app/services/original_data.py b/app/services/original_data.py index 8b2ef88..71f7fd2 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -337,101 +337,46 @@ class OriginalDataService(BaseService[OriginalData]): table_name = self._get_table_name(account_id) - for attempt in range(2): # 最多重试1次 + # 检查是否已在事务中(避免重复开始事务) + in_transaction = db.in_transaction() + + # 如果不在事务中,才需要手动管理事务 + if not in_transaction: + for attempt in range(2): # 最多重试1次 + try: + db.begin() + success_count = 0 + failed_count = 0 + failed_items = [] + + # 执行数据导入操作 + success_count = self._execute_import(db, table_name, data, account_id) + db.commit() + logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") + break + + except Exception as e: + db.rollback() + logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + if attempt == 1: # 最后一次重试失败 + logger.error("Batch import original data failed after retries") + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } + else: + # 如果已在事务中,直接执行操作(不管理事务) try: - db.begin() - success_count = 0 - failed_count = 0 - failed_items = [] - - nyid = str(data[0].get('NYID')) # 统一转换为字符串 - # 检查该期数数据是否已存在 - check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") - is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] - - if is_exists > 0: - db.rollback() - return { - 'success': True, - 'message': '数据已存在', - 'total_count': 0, - 'success_count': success_count, - 'failed_count': failed_count, - 'failed_items': failed_items - } - - # ===== 性能优化:批量查询沉降数据 ===== - # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) - nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) - settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() - settlement_map = {s.NYID: s for s in settlements} - missing_nyids = set(nyid_list) - set(settlement_map.keys()) - - if missing_nyids: - logger.warning(f"[批量导入原始数据] 批量查询settlement数据失败, Nyid: {list(missing_nyids)}") - db.rollback() - return { - 'success': False, - 'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}', - 'total_count': total_count, - 'success_count': 0, - 'failed_count': total_count, - 'failed_items': [] - } - - # ===== 性能优化:使用批量插入 ===== - # 将数据分组,每组1000条(MySQL默认支持) - batch_size = 1000 - for i in range(0, len(data), batch_size): - batch_data = data[i:i + batch_size] - - # 构建批量参数 - values_list = [] - params = {} - for idx, item_data in enumerate(batch_data): - values_list.append( - f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, " - f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})" - ) - params.update({ - f"account_id_{idx}": account_id, - f"bfpcode_{idx}": item_data.get('bfpcode'), - f"mtime_{idx}": item_data.get('mtime'), - f"bffb_{idx}": item_data.get('bffb'), - f"bfpl_{idx}": item_data.get('bfpl'), - f"bfpvalue_{idx}": item_data.get('bfpvalue'), - f"NYID_{idx}": item_data.get('NYID'), - f"sort_{idx}": item_data.get('sort') - }) - - # 批量插入SQL - 使用字符串拼接(修复TextClause拼接问题) - insert_sql = f""" - INSERT INTO `{table_name}` - (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) - VALUES {", ".join(values_list)} - """ - final_sql = text(insert_sql) - db.execute(final_sql, params) - success_count += len(batch_data) - logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") - - db.commit() - logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") - break - + success_count = self._execute_import(db, table_name, data, account_id) + logger.info(f"Batch import original data completed in existing transaction. Success: {success_count}") except Exception as e: - db.rollback() - logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") - if attempt == 1: # 最后一次重试失败 - logger.error("Batch import original data failed after retries") - return { - 'success': False, - 'message': f'批量导入失败: {str(e)}', - 'total_count': total_count, - 'success_count': 0, - 'failed_count': total_count, - 'failed_items': failed_items - } + logger.error(f"Batch import failed in existing transaction: {str(e)}") + # 抛出异常,让外部处理事务回滚 + raise return { 'success': True, @@ -440,4 +385,64 @@ class OriginalDataService(BaseService[OriginalData]): 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items - } \ No newline at end of file + } + + def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int: + """执行数据导入操作(抽取的公共逻辑)""" + nyid = str(data[0].get('NYID')) # 统一转换为字符串 + # 检查该期数数据是否已存在 + check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") + is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] + + if is_exists > 0: + return 0 + + # ===== 性能优化:批量查询沉降数据 ===== + # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) + nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) + from ..models.settlement_data import SettlementData + settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() + settlement_map = {s.NYID: s for s in settlements} + missing_nyids = set(nyid_list) - set(settlement_map.keys()) + + if missing_nyids: + raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}') + + # ===== 性能优化:使用批量插入 ===== + # 将数据分组,每组1000条(MySQL默认支持) + batch_size = 1000 + total_inserted = 0 + for i in range(0, len(data), batch_size): + batch_data = data[i:i + batch_size] + + # 构建批量参数 + values_list = [] + params = {} + for idx, item_data in enumerate(batch_data): + values_list.append( + f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, " + f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})" + ) + params.update({ + f"account_id_{idx}": account_id, + f"bfpcode_{idx}": item_data.get('bfpcode'), + f"mtime_{idx}": item_data.get('mtime'), + f"bffb_{idx}": item_data.get('bffb'), + f"bfpl_{idx}": item_data.get('bfpl'), + f"bfpvalue_{idx}": item_data.get('bfpvalue'), + f"NYID_{idx}": item_data.get('NYID'), + f"sort_{idx}": item_data.get('sort') + }) + + # 批量插入SQL - 使用字符串拼接(修复TextClause拼接问题) + insert_sql = f""" + INSERT INTO `{table_name}` + (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) + VALUES {", ".join(values_list)} + """ + final_sql = text(insert_sql) + db.execute(final_sql, params) + total_inserted += len(batch_data) + logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") + + return total_inserted \ No newline at end of file From 61df55074a2788105ba04b4dc2901137e43fac8c Mon Sep 17 00:00:00 2001 From: lhx Date: Tue, 18 Nov 2025 10:08:49 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=8E=9F=E5=A7=8B=E5=AF=BC=E5=85=A5?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/original_data.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/services/original_data.py b/app/services/original_data.py index 71f7fd2..b057df8 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -395,6 +395,7 @@ class OriginalDataService(BaseService[OriginalData]): is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] if is_exists > 0: + logger.warning(f"Data for NYID {nyid} already exists in {table_name}, skipping import") return 0 # ===== 性能优化:批量查询沉降数据 ===== @@ -412,6 +413,7 @@ class OriginalDataService(BaseService[OriginalData]): # 将数据分组,每组1000条(MySQL默认支持) batch_size = 1000 total_inserted = 0 + logger.info(f"Starting batch insert into {table_name}, total records: {len(data)}") for i in range(0, len(data), batch_size): batch_data = data[i:i + batch_size] From b1cc24978f36d25a3e518acf86599d7b25a7adac Mon Sep 17 00:00:00 2001 From: lhx Date: Tue, 18 Nov 2025 10:28:43 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/original_data.py | 55 +++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/app/services/original_data.py b/app/services/original_data.py index b057df8..5b23530 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -389,14 +389,7 @@ class OriginalDataService(BaseService[OriginalData]): def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int: """执行数据导入操作(抽取的公共逻辑)""" - nyid = str(data[0].get('NYID')) # 统一转换为字符串 - # 检查该期数数据是否已存在 - check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") - is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] - - if is_exists > 0: - logger.warning(f"Data for NYID {nyid} already exists in {table_name}, skipping import") - return 0 + logger.info(f"Starting batch import into {table_name}, total records: {len(data)}") # ===== 性能优化:批量查询沉降数据 ===== # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) @@ -409,13 +402,50 @@ class OriginalDataService(BaseService[OriginalData]): if missing_nyids: raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}') - # ===== 性能优化:使用批量插入 ===== + # ===== 性能优化:批量查询现有原始数据(IN查询)===== + # 使用组合键 (NYID, sort) 查询现有数据,过滤重复数据 + existing_data = db.query(text("*")).from_statement( + text(f"SELECT * FROM `{table_name}` WHERE account_id = :account_id") + ).params(account_id=account_id).all() + + # 使用组合键创建查找表:key = f"{NYID}_{sort}" + existing_map = { + f"{item[7]}_{item[8]}": item # NYID是第8个字段(索引7),sort是第9个字段(索引8) + for item in existing_data + } + logger.info(f"Found {len(existing_data)} existing records in {table_name}") + + # ===== 批量处理插入和跳过 ===== + to_insert = [] + skipped_count = 0 + + for item_data in data: + nyid = str(item_data.get('NYID')) # 统一转换为字符串 + sort = item_data.get('sort') + + # 构建组合键 + key = f"{nyid}_{sort}" + + if key in existing_map: + # 数据已存在,跳过 + logger.info(f"Skip existing data: NYID {nyid}, sort {sort}") + skipped_count += 1 + else: + # 记录需要插入的数据 + to_insert.append(item_data) + + logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert") + + # ===== 执行批量插入 ===== + if not to_insert: + logger.info("No new records to insert, all data already exists") + return 0 + # 将数据分组,每组1000条(MySQL默认支持) batch_size = 1000 total_inserted = 0 - logger.info(f"Starting batch insert into {table_name}, total records: {len(data)}") - for i in range(0, len(data), batch_size): - batch_data = data[i:i + batch_size] + for i in range(0, len(to_insert), batch_size): + batch_data = to_insert[i:i + batch_size] # 构建批量参数 values_list = [] @@ -447,4 +477,5 @@ class OriginalDataService(BaseService[OriginalData]): total_inserted += len(batch_data) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") + logger.info(f"Batch import completed: {total_inserted} records inserted, {skipped_count} duplicates skipped") return total_inserted \ No newline at end of file From 6f0f3d94b2f21d4c2f8a34690f9b4fc2d9ee6b95 Mon Sep 17 00:00:00 2001 From: lhx Date: Tue, 18 Nov 2025 10:33:29 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/original_data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/services/original_data.py b/app/services/original_data.py index 5b23530..9d234d0 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -428,11 +428,12 @@ class OriginalDataService(BaseService[OriginalData]): if key in existing_map: # 数据已存在,跳过 - logger.info(f"Skip existing data: NYID {nyid}, sort {sort}") + # logger.info(f"Skip existing data: NYID {nyid}, sort {sort}") skipped_count += 1 else: # 记录需要插入的数据 to_insert.append(item_data) + logger.info(f"Skip existing data: {skipped_count} duplicates found so far") logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert") From b2806cfd2778897bc9a07d69ca729f1ae1291bc4 Mon Sep 17 00:00:00 2001 From: lhx Date: Tue, 18 Nov 2025 10:35:22 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/original_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/services/original_data.py b/app/services/original_data.py index 9d234d0..ec38d04 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -433,7 +433,7 @@ class OriginalDataService(BaseService[OriginalData]): else: # 记录需要插入的数据 to_insert.append(item_data) - logger.info(f"Skip existing data: {skipped_count} duplicates found so far") + # logger.info(f"Skip existing data: {skipped_count} duplicates found so far") logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert")