日志优化

This commit is contained in:
lhx
2025-11-18 10:28:43 +08:00
parent 61df55074a
commit b1cc24978f

View File

@@ -389,14 +389,7 @@ class OriginalDataService(BaseService[OriginalData]):
def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int:
"""执行数据导入操作(抽取的公共逻辑)"""
nyid = str(data[0].get('NYID')) # 统一转换为字符串
# 检查该期数数据是否已存在
check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid")
is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0]
if is_exists > 0:
logger.warning(f"Data for NYID {nyid} already exists in {table_name}, skipping import")
return 0
logger.info(f"Starting batch import into {table_name}, total records: {len(data)}")
# ===== 性能优化:批量查询沉降数据 =====
# 统一转换为字符串处理数据库NYID字段是VARCHAR类型
@@ -409,13 +402,50 @@ class OriginalDataService(BaseService[OriginalData]):
if missing_nyids:
raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}')
# ===== 性能优化:使用批量插入 =====
# ===== 性能优化:批量查询现有原始数据IN查询=====
# 使用组合键 (NYID, sort) 查询现有数据,过滤重复数据
existing_data = db.query(text("*")).from_statement(
text(f"SELECT * FROM `{table_name}` WHERE account_id = :account_id")
).params(account_id=account_id).all()
# 使用组合键创建查找表key = f"{NYID}_{sort}"
existing_map = {
f"{item[7]}_{item[8]}": item # NYID是第8个字段索引7sort是第9个字段索引8
for item in existing_data
}
logger.info(f"Found {len(existing_data)} existing records in {table_name}")
# ===== 批量处理插入和跳过 =====
to_insert = []
skipped_count = 0
for item_data in data:
nyid = str(item_data.get('NYID')) # 统一转换为字符串
sort = item_data.get('sort')
# 构建组合键
key = f"{nyid}_{sort}"
if key in existing_map:
# 数据已存在,跳过
logger.info(f"Skip existing data: NYID {nyid}, sort {sort}")
skipped_count += 1
else:
# 记录需要插入的数据
to_insert.append(item_data)
logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert")
# ===== 执行批量插入 =====
if not to_insert:
logger.info("No new records to insert, all data already exists")
return 0
# 将数据分组每组1000条MySQL默认支持
batch_size = 1000
total_inserted = 0
logger.info(f"Starting batch insert into {table_name}, total records: {len(data)}")
for i in range(0, len(data), batch_size):
batch_data = data[i:i + batch_size]
for i in range(0, len(to_insert), batch_size):
batch_data = to_insert[i:i + batch_size]
# 构建批量参数
values_list = []
@@ -447,4 +477,5 @@ class OriginalDataService(BaseService[OriginalData]):
total_inserted += len(batch_data)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records")
logger.info(f"Batch import completed: {total_inserted} records inserted, {skipped_count} duplicates skipped")
return total_inserted