From 155575f4624947f13f6e47e1b9c6a5260b610c6a Mon Sep 17 00:00:00 2001 From: lhx Date: Mon, 10 Nov 2025 11:24:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8E=9F=E5=A7=8B=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=AF=BC=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/original_data.py | 91 +++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/app/services/original_data.py b/app/services/original_data.py index e541802..c698c64 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -245,8 +245,8 @@ class OriginalDataService(BaseService[OriginalData]): def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: """ - 批量导入原始数据到指定账号的分表,直接新增,无需检查重复 - 支持事务回滚,失败时重试一次 + 批量导入原始数据到指定账号的分表 - 性能优化版 + 使用批量插入替代逐条插入,大幅提升导入速度 Args: db: 数据库会话 @@ -318,7 +318,7 @@ class OriginalDataService(BaseService[OriginalData]): failed_count = 0 failed_items = [] - nyid = data[0].get('NYID') + nyid = str(data[0].get('NYID')) # 统一转换为字符串 # 检查该期数数据是否已存在 check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] @@ -334,42 +334,61 @@ class OriginalDataService(BaseService[OriginalData]): 'failed_items': failed_items } - for item_data in data: - try: - # 判断期数id是否存在 - settlement = self._check_settlement_exists(db, item_data.get('NYID')) - if not settlement: - logger.error(f"Settlement {item_data.get('NYID')} not found") - raise Exception(f"Settlement {item_data.get('NYID')} not found") + # ===== 性能优化:批量查询沉降数据 ===== + # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) + nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) + logger.info(f"Querying settlement data for nyid list: {nyid_list}") + settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() + logger.info(f"Found {len(settlements)} settlement records") + settlement_map = {s.NYID: s for s in settlements} + missing_nyids = set(nyid_list) - set(settlement_map.keys()) - # 构建插入SQL - insert_sql = text(f""" - INSERT INTO `{table_name}` - (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) - VALUES - (:account_id, :bfpcode, :mtime, :bffb, :bfpl, :bfpvalue, :NYID, :sort) - """) + if missing_nyids: + db.rollback() + return { + 'success': False, + 'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': [] + } - db.execute(insert_sql, { - "account_id": account_id, - "bfpcode": item_data.get('bfpcode'), - "mtime": item_data.get('mtime'), - "bffb": item_data.get('bffb'), - "bfpl": item_data.get('bfpl'), - "bfpvalue": item_data.get('bfpvalue'), - "NYID": item_data.get('NYID'), - "sort": item_data.get('sort') + # ===== 性能优化:使用批量插入 ===== + # 将数据分组,每组1000条(MySQL默认支持) + batch_size = 1000 + for i in range(0, len(data), batch_size): + batch_data = data[i:i + batch_size] + + # 构建批量参数 + values_list = [] + params = {} + for idx, item_data in enumerate(batch_data): + values_list.append( + f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, " + f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})" + ) + params.update({ + f"account_id_{idx}": account_id, + f"bfpcode_{idx}": item_data.get('bfpcode'), + f"mtime_{idx}": item_data.get('mtime'), + f"bffb_{idx}": item_data.get('bffb'), + f"bfpl_{idx}": item_data.get('bfpl'), + f"bfpvalue_{idx}": item_data.get('bfpvalue'), + f"NYID_{idx}": item_data.get('NYID'), + f"sort_{idx}": item_data.get('sort') }) - logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')} in table {table_name}") - success_count += 1 - except Exception as e: - failed_count += 1 - failed_items.append({ - 'data': item_data, - 'error': str(e) - }) - logger.error(f"Failed to process original data {item_data.get('bfpcode')}-{item_data.get('NYID')}: {str(e)}") - raise e + + # 批量插入SQL - 使用字符串拼接(修复TextClause拼接问题) + insert_sql = f""" + INSERT INTO `{table_name}` + (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) + VALUES {", ".join(values_list)} + """ + final_sql = text(insert_sql) + db.execute(final_sql, params) + success_count += len(batch_data) + logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") db.commit() logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")