优化原始数据导入

This commit is contained in:
lhx
2025-11-10 11:24:49 +08:00
parent 3bd5885dce
commit 155575f462

View File

@@ -245,8 +245,8 @@ class OriginalDataService(BaseService[OriginalData]):
def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]:
""" """
批量导入原始数据到指定账号的分表,直接新增,无需检查重复 批量导入原始数据到指定账号的分表 - 性能优化版
支持事务回滚,失败时重试一次 使用批量插入替代逐条插入,大幅提升导入速度
Args: Args:
db: 数据库会话 db: 数据库会话
@@ -318,7 +318,7 @@ class OriginalDataService(BaseService[OriginalData]):
failed_count = 0 failed_count = 0
failed_items = [] failed_items = []
nyid = data[0].get('NYID') nyid = str(data[0].get('NYID')) # 统一转换为字符串
# 检查该期数数据是否已存在 # 检查该期数数据是否已存在
check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid")
is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0]
@@ -334,42 +334,61 @@ class OriginalDataService(BaseService[OriginalData]):
'failed_items': failed_items 'failed_items': failed_items
} }
for item_data in data: # ===== 性能优化:批量查询沉降数据 =====
try: # 统一转换为字符串处理数据库NYID字段是VARCHAR类型
# 判断期数id是否存在 nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID')))
settlement = self._check_settlement_exists(db, item_data.get('NYID')) logger.info(f"Querying settlement data for nyid list: {nyid_list}")
if not settlement: settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
logger.error(f"Settlement {item_data.get('NYID')} not found") logger.info(f"Found {len(settlements)} settlement records")
raise Exception(f"Settlement {item_data.get('NYID')} not found") settlement_map = {s.NYID: s for s in settlements}
missing_nyids = set(nyid_list) - set(settlement_map.keys())
# 构建插入SQL if missing_nyids:
insert_sql = text(f""" db.rollback()
INSERT INTO `{table_name}` return {
(account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) 'success': False,
VALUES 'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}',
(:account_id, :bfpcode, :mtime, :bffb, :bfpl, :bfpvalue, :NYID, :sort) 'total_count': total_count,
""") 'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
db.execute(insert_sql, { # ===== 性能优化:使用批量插入 =====
"account_id": account_id, # 将数据分组每组1000条MySQL默认支持
"bfpcode": item_data.get('bfpcode'), batch_size = 1000
"mtime": item_data.get('mtime'), for i in range(0, len(data), batch_size):
"bffb": item_data.get('bffb'), batch_data = data[i:i + batch_size]
"bfpl": item_data.get('bfpl'),
"bfpvalue": item_data.get('bfpvalue'), # 构建批量参数
"NYID": item_data.get('NYID'), values_list = []
"sort": item_data.get('sort') params = {}
for idx, item_data in enumerate(batch_data):
values_list.append(
f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, "
f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})"
)
params.update({
f"account_id_{idx}": account_id,
f"bfpcode_{idx}": item_data.get('bfpcode'),
f"mtime_{idx}": item_data.get('mtime'),
f"bffb_{idx}": item_data.get('bffb'),
f"bfpl_{idx}": item_data.get('bfpl'),
f"bfpvalue_{idx}": item_data.get('bfpvalue'),
f"NYID_{idx}": item_data.get('NYID'),
f"sort_{idx}": item_data.get('sort')
}) })
logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')} in table {table_name}")
success_count += 1 # 批量插入SQL - 使用字符串拼接修复TextClause拼接问题
except Exception as e: insert_sql = f"""
failed_count += 1 INSERT INTO `{table_name}`
failed_items.append({ (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort)
'data': item_data, VALUES {", ".join(values_list)}
'error': str(e) """
}) final_sql = text(insert_sql)
logger.error(f"Failed to process original data {item_data.get('bfpcode')}-{item_data.get('NYID')}: {str(e)}") db.execute(final_sql, params)
raise e success_count += len(batch_data)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records")
db.commit() db.commit()
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")