原始数据建表优化

This commit is contained in:
lhx
2025-11-18 09:54:15 +08:00
parent d8b6247094
commit 0e782baed9

View File

@@ -337,6 +337,11 @@ class OriginalDataService(BaseService[OriginalData]):
table_name = self._get_table_name(account_id)
# 检查是否已在事务中(避免重复开始事务)
in_transaction = db.in_transaction()
# 如果不在事务中,才需要手动管理事务
if not in_transaction:
for attempt in range(2): # 最多重试1次
try:
db.begin()
@@ -344,44 +349,69 @@ class OriginalDataService(BaseService[OriginalData]):
failed_count = 0
failed_items = []
# 执行数据导入操作
success_count = self._execute_import(db, table_name, data, account_id)
db.commit()
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
db.rollback()
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import original data failed after retries")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
else:
# 如果已在事务中,直接执行操作(不管理事务)
try:
success_count = self._execute_import(db, table_name, data, account_id)
logger.info(f"Batch import original data completed in existing transaction. Success: {success_count}")
except Exception as e:
logger.error(f"Batch import failed in existing transaction: {str(e)}")
# 抛出异常,让外部处理事务回滚
raise
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int:
"""执行数据导入操作(抽取的公共逻辑)"""
nyid = str(data[0].get('NYID')) # 统一转换为字符串
# 检查该期数数据是否已存在
check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid")
is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0]
if is_exists > 0:
db.rollback()
return {
'success': True,
'message': '数据已存在',
'total_count': 0,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
return 0
# ===== 性能优化:批量查询沉降数据 =====
# 统一转换为字符串处理数据库NYID字段是VARCHAR类型
nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID')))
from ..models.settlement_data import SettlementData
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
settlement_map = {s.NYID: s for s in settlements}
missing_nyids = set(nyid_list) - set(settlement_map.keys())
if missing_nyids:
logger.warning(f"[批量导入原始数据] 批量查询settlement数据失败 Nyid: {list(missing_nyids)}")
db.rollback()
return {
'success': False,
'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}')
# ===== 性能优化:使用批量插入 =====
# 将数据分组每组1000条MySQL默认支持
batch_size = 1000
total_inserted = 0
for i in range(0, len(data), batch_size):
batch_data = data[i:i + batch_size]
@@ -412,32 +442,7 @@ class OriginalDataService(BaseService[OriginalData]):
"""
final_sql = text(insert_sql)
db.execute(final_sql, params)
success_count += len(batch_data)
total_inserted += len(batch_data)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records")
db.commit()
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
db.rollback()
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import original data failed after retries")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
return total_inserted