832 lines
35 KiB
Python
832 lines
35 KiB
Python
"""
|
||
数据库操作服务层
|
||
直接操作数据库,跳过HTTP请求环节
|
||
复用现有的业务逻辑和模型
|
||
"""
|
||
import os
|
||
import sys
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import List, Optional, Dict, Any
|
||
|
||
# 添加项目根目录到Python路径
|
||
PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute()
|
||
sys.path.insert(0, str(PROJECT_ROOT))
|
||
|
||
# 导入数据库操作相关模块
|
||
from sqlalchemy import create_engine, text, inspect
|
||
from sqlalchemy.orm import sessionmaker, Session
|
||
from dotenv import load_dotenv
|
||
|
||
# 导入数据模型
|
||
from app.models.section_data import SectionData
|
||
from app.models.checkpoint import Checkpoint
|
||
from app.models.settlement_data import SettlementData
|
||
from app.models.level_data import LevelData
|
||
from app.models.account import Account
|
||
|
||
# 导入原始数据动态表支持
|
||
from app.models.original_data import get_original_data_model, get_table_name
|
||
|
||
# 加载环境变量
|
||
load_dotenv(PROJECT_ROOT / ".env")
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class DatabaseService:
|
||
"""数据库操作服务类 - 直接操作数据库"""
|
||
|
||
def __init__(self):
|
||
"""初始化数据库连接"""
|
||
# 构建数据库URL
|
||
database_url = os.getenv("DATABASE_URL")
|
||
if not database_url:
|
||
# 从单独的数据库配置构建URL
|
||
db_host = os.getenv("DB_HOST", "localhost")
|
||
db_port = os.getenv("DB_PORT", "3306")
|
||
db_user = os.getenv("DB_USER", "root")
|
||
db_password = os.getenv("DB_PASSWORD", "root")
|
||
db_name = os.getenv("DB_NAME", "railway")
|
||
database_url = f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
|
||
|
||
logger.info(f"数据库连接: {database_url}")
|
||
|
||
# 创建数据库引擎和会话工厂
|
||
self.engine = create_engine(
|
||
database_url,
|
||
pool_pre_ping=True,
|
||
pool_recycle=3600,
|
||
echo=False # 设置为True可以看到SQL语句
|
||
)
|
||
self.SessionLocal = sessionmaker(bind=self.engine)
|
||
|
||
def get_db_session(self) -> Session:
|
||
"""获取数据库会话"""
|
||
return self.SessionLocal()
|
||
|
||
# ==================== 断面数据服务 ====================
|
||
|
||
def batch_import_sections(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""批量导入断面数据 - 复用现有业务逻辑"""
|
||
total_count = len(data)
|
||
success_count = 0
|
||
failed_count = 0
|
||
failed_items = []
|
||
|
||
if total_count == 0:
|
||
return {
|
||
'success': False,
|
||
'message': '导入数据不能为空',
|
||
'total_count': 0,
|
||
'success_count': 0,
|
||
'failed_count': 0,
|
||
'failed_items': []
|
||
}
|
||
|
||
try:
|
||
db.begin()
|
||
|
||
# 批量查询现有断面数据(IN查询)
|
||
section_id_list = list(set(str(item.get('section_id')) for item in data if item.get('section_id')))
|
||
logger.info(f"Checking {len(section_id_list)} unique section_ids")
|
||
|
||
existing_sections = db.query(SectionData).filter(SectionData.section_id.in_(section_id_list)).all()
|
||
|
||
# 使用section_id创建查找表
|
||
existing_map = {
|
||
section.section_id: section
|
||
for section in existing_sections
|
||
}
|
||
logger.info(f"Found {len(existing_sections)} existing sections")
|
||
|
||
# 批量处理插入和跳过
|
||
to_insert = []
|
||
|
||
for item_data in data:
|
||
section_id = str(item_data.get('section_id'))
|
||
|
||
if section_id in existing_map:
|
||
# 数据已存在,跳过
|
||
logger.info(f"Continue section data: {section_id}")
|
||
failed_count += 1
|
||
failed_items.append({
|
||
'data': item_data,
|
||
'error': '数据已存在,跳过插入操作'
|
||
})
|
||
else:
|
||
# 记录需要插入的数据
|
||
to_insert.append(item_data)
|
||
|
||
# 执行批量插入
|
||
if to_insert:
|
||
logger.info(f"Inserting {len(to_insert)} new records")
|
||
batch_size = 500
|
||
for i in range(0, len(to_insert), batch_size):
|
||
batch = to_insert[i:i + batch_size]
|
||
try:
|
||
section_data_list = [
|
||
SectionData(
|
||
section_id=str(item.get('section_id')),
|
||
mileage=item.get('mileage'),
|
||
work_site=item.get('work_site'),
|
||
basic_types=item.get('basic_types'),
|
||
height=item.get('height'),
|
||
status=item.get('status'),
|
||
number=str(item.get('number')) if item.get('number') else None,
|
||
transition_paragraph=item.get('transition_paragraph'),
|
||
design_fill_height=item.get('design_fill_height'),
|
||
compression_layer_thickness=item.get('compression_layer_thickness'),
|
||
treatment_depth=item.get('treatment_depth'),
|
||
foundation_treatment_method=item.get('foundation_treatment_method'),
|
||
rock_mass_classification=item.get('rock_mass_classification'),
|
||
account_id=str(item.get('account_id')) if item.get('account_id') else None
|
||
)
|
||
for item in batch
|
||
]
|
||
db.add_all(section_data_list)
|
||
success_count += len(batch)
|
||
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
|
||
except Exception as e:
|
||
failed_count += len(batch)
|
||
failed_items.extend([
|
||
{
|
||
'data': item,
|
||
'error': f'插入失败: {str(e)}'
|
||
}
|
||
for item in batch
|
||
])
|
||
logger.error(f"Failed to insert batch: {str(e)}")
|
||
raise e
|
||
|
||
db.commit()
|
||
logger.info(f"Batch import sections completed. Success: {success_count}, Failed: {failed_count}")
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
|
||
'total_count': total_count,
|
||
'success_count': success_count,
|
||
'failed_count': failed_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"Batch import sections failed: {str(e)}")
|
||
return {
|
||
'success': False,
|
||
'message': f'批量导入失败: {str(e)}',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
# ==================== 观测点数据服务 ====================
|
||
|
||
def batch_import_checkpoints(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""批量导入观测点数据 - 复用现有业务逻辑"""
|
||
total_count = len(data)
|
||
success_count = 0
|
||
failed_count = 0
|
||
failed_items = []
|
||
|
||
if total_count == 0:
|
||
return {
|
||
'success': False,
|
||
'message': '导入数据不能为空',
|
||
'total_count': 0,
|
||
'success_count': 0,
|
||
'failed_count': 0,
|
||
'failed_items': []
|
||
}
|
||
|
||
try:
|
||
db.begin()
|
||
|
||
# 批量查询断面数据(IN查询)
|
||
section_id_list = list(set(str(item.get('section_id')) for item in data if item.get('section_id')))
|
||
logger.info(f"Checking {len(section_id_list)} unique section_ids in section data")
|
||
|
||
sections = db.query(SectionData).filter(SectionData.section_id.in_(section_id_list)).all()
|
||
section_map = {s.section_id: s for s in sections}
|
||
missing_section_ids = set(section_id_list) - set(section_map.keys())
|
||
|
||
# 记录缺失的断面
|
||
for item_data in data:
|
||
section_id = str(item_data.get('section_id'))
|
||
if section_id in missing_section_ids:
|
||
failed_count += 1
|
||
failed_items.append({
|
||
'data': item_data,
|
||
'error': '断面ID不存在,跳过插入操作'
|
||
})
|
||
|
||
# 如果所有数据都失败,直接返回
|
||
if failed_count == total_count:
|
||
db.rollback()
|
||
return {
|
||
'success': False,
|
||
'message': '所有断面ID都不存在',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
# 批量查询现有观测点数据
|
||
valid_items = [item for item in data if str(item.get('section_id')) not in missing_section_ids]
|
||
if valid_items:
|
||
point_id_list = list(set(str(item.get('point_id')) for item in valid_items if item.get('point_id')))
|
||
existing_checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all()
|
||
|
||
existing_map = {
|
||
checkpoint.point_id: checkpoint
|
||
for checkpoint in existing_checkpoints
|
||
}
|
||
logger.info(f"Found {len(existing_checkpoints)} existing checkpoints")
|
||
|
||
to_insert = []
|
||
|
||
for item_data in valid_items:
|
||
point_id = str(item_data.get('point_id'))
|
||
|
||
if point_id in existing_map:
|
||
# 数据已存在,跳过
|
||
logger.info(f"Continue checkpoint data: {point_id}")
|
||
failed_count += 1
|
||
failed_items.append({
|
||
'data': item_data,
|
||
'error': '数据已存在,跳过插入操作'
|
||
})
|
||
else:
|
||
to_insert.append(item_data)
|
||
|
||
# 执行批量插入
|
||
if to_insert:
|
||
logger.info(f"Inserting {len(to_insert)} new records")
|
||
batch_size = 500
|
||
for i in range(0, len(to_insert), batch_size):
|
||
batch = to_insert[i:i + batch_size]
|
||
try:
|
||
checkpoint_list = [
|
||
Checkpoint(
|
||
point_id=str(item.get('point_id')),
|
||
aname=item.get('aname'),
|
||
section_id=str(item.get('section_id')),
|
||
burial_date=item.get('burial_date')
|
||
)
|
||
for item in batch
|
||
]
|
||
db.add_all(checkpoint_list)
|
||
success_count += len(batch)
|
||
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
|
||
except Exception as e:
|
||
failed_count += len(batch)
|
||
failed_items.extend([
|
||
{
|
||
'data': item,
|
||
'error': f'插入失败: {str(e)}'
|
||
}
|
||
for item in batch
|
||
])
|
||
logger.error(f"Failed to insert batch: {str(e)}")
|
||
raise e
|
||
|
||
db.commit()
|
||
logger.info(f"Batch import checkpoints completed. Success: {success_count}, Failed: {failed_count}")
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
|
||
'total_count': total_count,
|
||
'success_count': success_count,
|
||
'failed_count': failed_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"Batch import checkpoints failed: {str(e)}")
|
||
return {
|
||
'success': False,
|
||
'message': f'批量导入失败: {str(e)}',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
# ==================== 沉降数据服务 ====================
|
||
|
||
def batch_import_settlement_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""批量导入沉降数据 - 复用现有业务逻辑"""
|
||
total_count = len(data)
|
||
success_count = 0
|
||
failed_count = 0
|
||
failed_items = []
|
||
|
||
if total_count == 0:
|
||
return {
|
||
'success': False,
|
||
'message': '导入数据不能为空',
|
||
'total_count': 0,
|
||
'success_count': 0,
|
||
'failed_count': 0,
|
||
'failed_items': []
|
||
}
|
||
|
||
try:
|
||
db.begin()
|
||
|
||
# 批量查询观测点数据
|
||
point_id_list = list(set(str(item.get('point_id')) for item in data if item.get('point_id')))
|
||
logger.info(f"Checking {len(point_id_list)} unique point_ids in checkpoint data")
|
||
|
||
checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all()
|
||
checkpoint_map = {c.point_id: c for c in checkpoints}
|
||
missing_point_ids = set(point_id_list) - set(checkpoint_map.keys())
|
||
|
||
# 记录缺失的观测点
|
||
for item_data in data:
|
||
point_id = str(item_data.get('point_id'))
|
||
if point_id in missing_point_ids:
|
||
failed_count += 1
|
||
failed_items.append({
|
||
'data': item_data,
|
||
'error': '测点id不存在,跳过插入操作'
|
||
})
|
||
|
||
if failed_count == total_count:
|
||
db.rollback()
|
||
return {
|
||
'success': False,
|
||
'message': '所有观测点ID都不存在',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
# 批量查询现有沉降数据
|
||
valid_items = [item for item in data if str(item.get('point_id')) not in missing_point_ids]
|
||
if valid_items:
|
||
existing_data = db.query(SettlementData).filter(
|
||
SettlementData.point_id.in_(point_id_list)
|
||
).all()
|
||
|
||
existing_map = {
|
||
f"{item.point_id}_{item.NYID}": item
|
||
for item in existing_data
|
||
}
|
||
logger.info(f"Found {len(existing_data)} existing settlement records")
|
||
|
||
to_insert = []
|
||
|
||
for item_data in valid_items:
|
||
point_id = str(item_data.get('point_id'))
|
||
nyid = str(item_data.get('NYID'))
|
||
|
||
key = f"{point_id}_{nyid}"
|
||
|
||
if key in existing_map:
|
||
# 数据已存在,跳过
|
||
logger.info(f"Continue settlement data: {point_id}-{nyid}")
|
||
failed_count += 1
|
||
failed_items.append({
|
||
'data': item_data,
|
||
'error': '数据已存在,跳过插入操作'
|
||
})
|
||
else:
|
||
to_insert.append(item_data)
|
||
|
||
# 执行批量插入
|
||
if to_insert:
|
||
logger.info(f"Inserting {len(to_insert)} new records")
|
||
batch_size = 500
|
||
for i in range(0, len(to_insert), batch_size):
|
||
batch = to_insert[i:i + batch_size]
|
||
try:
|
||
settlement_data_list = [
|
||
SettlementData(
|
||
point_id=str(item.get('point_id')),
|
||
CVALUE=item.get('CVALUE'),
|
||
MAVALUE=item.get('MAVALUE'),
|
||
MTIME_W=item.get('MTIME_W'),
|
||
NYID=str(item.get('NYID')),
|
||
PRELOADH=item.get('PRELOADH'),
|
||
PSTATE=item.get('PSTATE'),
|
||
REMARK=item.get('REMARK'),
|
||
WORKINFO=item.get('WORKINFO'),
|
||
createdate=item.get('createdate'),
|
||
day=item.get('day'),
|
||
day_jg=item.get('day_jg'),
|
||
isgzjdxz=item.get('isgzjdxz'),
|
||
mavalue_bc=item.get('mavalue_bc'),
|
||
mavalue_lj=item.get('mavalue_lj'),
|
||
sjName=item.get('sjName'),
|
||
useflag=item.get('useflag'),
|
||
workinfoname=item.get('workinfoname'),
|
||
upd_remark=item.get('upd_remark')
|
||
)
|
||
for item in batch
|
||
]
|
||
db.add_all(settlement_data_list)
|
||
success_count += len(batch)
|
||
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
|
||
except Exception as e:
|
||
failed_count += len(batch)
|
||
failed_items.extend([
|
||
{
|
||
'data': item,
|
||
'error': f'插入失败: {str(e)}'
|
||
}
|
||
for item in batch
|
||
])
|
||
logger.error(f"Failed to insert batch: {str(e)}")
|
||
raise e
|
||
|
||
db.commit()
|
||
logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}")
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
|
||
'total_count': total_count,
|
||
'success_count': success_count,
|
||
'failed_count': failed_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"Batch import settlement data failed: {str(e)}")
|
||
return {
|
||
'success': False,
|
||
'message': f'批量导入失败: {str(e)}',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
# ==================== 水准数据服务 ====================
|
||
|
||
def batch_import_level_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""批量导入水准数据 - 复用现有业务逻辑"""
|
||
total_count = len(data)
|
||
success_count = 0
|
||
failed_count = 0
|
||
failed_items = []
|
||
|
||
if total_count == 0:
|
||
return {
|
||
'success': False,
|
||
'message': '导入数据不能为空',
|
||
'total_count': 0,
|
||
'success_count': 0,
|
||
'failed_count': 0,
|
||
'failed_items': []
|
||
}
|
||
|
||
try:
|
||
db.begin()
|
||
|
||
# 批量查询沉降数据
|
||
nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID')))
|
||
logger.info(f"Checking {len(nyid_list)} unique NYIDs in settlement data")
|
||
|
||
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
|
||
settlement_map = {s.NYID: s for s in settlements}
|
||
missing_nyids = set(nyid_list) - set(settlement_map.keys())
|
||
|
||
# 记录缺失的NYID
|
||
for item_data in data:
|
||
nyid = str(item_data.get('NYID'))
|
||
if nyid in missing_nyids:
|
||
failed_count += 1
|
||
failed_items.append({
|
||
'data': item_data,
|
||
'error': '期数ID在沉降表中不存在,跳过插入操作'
|
||
})
|
||
|
||
if failed_count == total_count:
|
||
db.rollback()
|
||
return {
|
||
'success': False,
|
||
'message': '所有期数ID在沉降表中都不存在',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
# 批量查询现有水准数据
|
||
valid_items = [item for item in data if str(item.get('NYID')) not in missing_nyids]
|
||
if valid_items:
|
||
existing_data = db.query(LevelData).filter(
|
||
LevelData.NYID.in_(nyid_list)
|
||
).all()
|
||
|
||
existing_map = {
|
||
f"{item.NYID}_{item.linecode}": item
|
||
for item in existing_data
|
||
}
|
||
logger.info(f"Found {len(existing_data)} existing level records")
|
||
|
||
to_insert = []
|
||
|
||
for item_data in valid_items:
|
||
nyid = str(item_data.get('NYID'))
|
||
linecode = item_data.get('linecode')
|
||
|
||
key = f"{nyid}_{linecode}"
|
||
|
||
if key in existing_map:
|
||
# 数据已存在,跳过
|
||
logger.info(f"Continue level data: {nyid}-{linecode}")
|
||
failed_count += 1
|
||
failed_items.append({
|
||
'data': item_data,
|
||
'error': '数据已存在,跳过插入操作'
|
||
})
|
||
else:
|
||
to_insert.append(item_data)
|
||
|
||
# 执行批量插入
|
||
if to_insert:
|
||
logger.info(f"Inserting {len(to_insert)} new records")
|
||
batch_size = 500
|
||
for i in range(0, len(to_insert), batch_size):
|
||
batch = to_insert[i:i + batch_size]
|
||
try:
|
||
level_data_list = [
|
||
LevelData(
|
||
linecode=str(item.get('linecode')),
|
||
benchmarkids=item.get('benchmarkids'),
|
||
wsphigh=item.get('wsphigh'),
|
||
mtype=item.get('mtype'),
|
||
NYID=str(item.get('NYID')),
|
||
createDate=item.get('createDate'),
|
||
wspversion=item.get('wspversion'),
|
||
barometric=str(item.get('barometric')) if item.get('barometric') is not None else None,
|
||
equipbrand=item.get('equipbrand'),
|
||
instrumodel=item.get('instrumodel'),
|
||
serialnum=item.get('serialnum'),
|
||
sjname=item.get('sjname'),
|
||
temperature=str(item.get('temperature')) if item.get('temperature') is not None else None,
|
||
weather=str(item.get('weather')) if item.get('weather') is not None else None
|
||
)
|
||
for item in batch
|
||
]
|
||
db.add_all(level_data_list)
|
||
success_count += len(batch)
|
||
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
|
||
except Exception as e:
|
||
failed_count += len(batch)
|
||
failed_items.extend([
|
||
{
|
||
'data': item,
|
||
'error': f'插入失败: {str(e)}'
|
||
}
|
||
for item in batch
|
||
])
|
||
logger.error(f"Failed to insert batch: {str(e)}")
|
||
raise e
|
||
|
||
db.commit()
|
||
logger.info(f"Batch import level data completed. Success: {success_count}, Failed: {failed_count}")
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
|
||
'total_count': total_count,
|
||
'success_count': success_count,
|
||
'failed_count': failed_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"Batch import level data failed: {str(e)}")
|
||
return {
|
||
'success': False,
|
||
'message': f'批量导入失败: {str(e)}',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
# ==================== 原始数据服务 ====================
|
||
|
||
def _ensure_table_exists(self, account_id: int) -> bool:
|
||
"""确保指定账号的原始数据表存在,不存在则创建"""
|
||
table_name = get_table_name(account_id)
|
||
inspector = inspect(self.engine)
|
||
|
||
# 检查表是否存在
|
||
if table_name in inspector.get_table_names():
|
||
logger.info(f"Table {table_name} already exists")
|
||
return True
|
||
|
||
# 表不存在,创建表
|
||
max_retries = 3
|
||
for attempt in range(max_retries):
|
||
try:
|
||
create_table_sql = f"""
|
||
CREATE TABLE `{table_name}` (
|
||
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||
`account_id` INT NOT NULL COMMENT '账号ID',
|
||
`bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称',
|
||
`mtime` DATETIME NOT NULL COMMENT '测点观测时间',
|
||
`bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符',
|
||
`bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)',
|
||
`bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)',
|
||
`NYID` VARCHAR(100) NOT NULL COMMENT '期数id',
|
||
`sort` INT COMMENT '序号',
|
||
INDEX `idx_nyid` (`NYID`),
|
||
INDEX `idx_account_id` (`account_id`)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}'
|
||
"""
|
||
# 使用引擎直接执行,不需要事务(CREATE TABLE是自动提交的)
|
||
with self.engine.begin() as conn:
|
||
conn.execute(text(create_table_sql))
|
||
|
||
logger.info(f"Table {table_name} created successfully")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Attempt {attempt + 1} to create table {table_name} failed: {str(e)}")
|
||
if attempt == max_retries - 1:
|
||
logger.error(f"Failed to create table {table_name} after {max_retries} attempts")
|
||
return False
|
||
|
||
import time
|
||
time.sleep(0.1 * (attempt + 1))
|
||
|
||
return False
|
||
|
||
def batch_import_original_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""批量导入原始数据 - 复用现有业务逻辑,支持分表"""
|
||
total_count = len(data)
|
||
success_count = 0
|
||
failed_count = 0
|
||
failed_items = []
|
||
|
||
if total_count == 0:
|
||
return {
|
||
'success': False,
|
||
'message': '导入数据不能为空',
|
||
'total_count': 0,
|
||
'success_count': 0,
|
||
'failed_count': 0,
|
||
'failed_items': []
|
||
}
|
||
|
||
# 获取数据的account_id
|
||
account_id = data[0].get('account_id')
|
||
if not account_id:
|
||
return {
|
||
'success': False,
|
||
'message': '数据中缺少account_id字段',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': []
|
||
}
|
||
|
||
# 验证账号是否存在
|
||
account = db.query(Account).filter(Account.id == account_id).first()
|
||
if not account:
|
||
return {
|
||
'success': False,
|
||
'message': f'账号ID {account_id} 不存在',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': []
|
||
}
|
||
|
||
# 确保表存在
|
||
table_created = self._ensure_table_exists(account_id)
|
||
if not table_created:
|
||
return {
|
||
'success': False,
|
||
'message': '创建原始数据表失败',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': []
|
||
}
|
||
|
||
table_name = get_table_name(account_id)
|
||
|
||
try:
|
||
# 注意:不开启db.begin(),由上层方法管理事务
|
||
|
||
# 批量查询沉降数据是否存在
|
||
nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID')))
|
||
from app.models.settlement_data import SettlementData
|
||
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
|
||
settlement_map = {s.NYID: s for s in settlements}
|
||
missing_nyids = set(nyid_list) - set(settlement_map.keys())
|
||
|
||
if missing_nyids:
|
||
raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}')
|
||
|
||
# 查询现有原始数据
|
||
existing_data = db.query(text("*")).from_statement(
|
||
text(f"SELECT * FROM `{table_name}` WHERE account_id = :account_id")
|
||
).params(account_id=account_id).all()
|
||
|
||
existing_map = {
|
||
f"{item[7]}_{item[8]}": item # NYID是第8个字段(索引7),sort是第9个字段(索引8)
|
||
for item in existing_data
|
||
}
|
||
logger.info(f"Found {len(existing_data)} existing records in {table_name}")
|
||
|
||
# 批量处理插入和跳过
|
||
to_insert = []
|
||
skipped_count = 0
|
||
|
||
for item_data in data:
|
||
nyid = str(item_data.get('NYID'))
|
||
sort = item_data.get('sort')
|
||
|
||
key = f"{nyid}_{sort}"
|
||
|
||
if key in existing_map:
|
||
# 数据已存在,跳过
|
||
skipped_count += 1
|
||
else:
|
||
to_insert.append(item_data)
|
||
|
||
logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert")
|
||
|
||
# 执行批量插入
|
||
if to_insert:
|
||
logger.info(f"Inserting {len(to_insert)} new records")
|
||
batch_size = 1000
|
||
for i in range(0, len(to_insert), batch_size):
|
||
batch = to_insert[i:i + batch_size]
|
||
|
||
# 构建批量参数
|
||
values_list = []
|
||
params = {}
|
||
for idx, item_data in enumerate(batch):
|
||
values_list.append(
|
||
f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, "
|
||
f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})"
|
||
)
|
||
params.update({
|
||
f"account_id_{idx}": account_id,
|
||
f"bfpcode_{idx}": item_data.get('bfpcode'),
|
||
f"mtime_{idx}": item_data.get('mtime'),
|
||
f"bffb_{idx}": item_data.get('bffb'),
|
||
f"bfpl_{idx}": item_data.get('bfpl'),
|
||
f"bfpvalue_{idx}": item_data.get('bfpvalue'),
|
||
f"NYID_{idx}": item_data.get('NYID'),
|
||
f"sort_{idx}": item_data.get('sort')
|
||
})
|
||
|
||
# 批量插入SQL
|
||
insert_sql = f"""
|
||
INSERT INTO `{table_name}`
|
||
(account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort)
|
||
VALUES {", ".join(values_list)}
|
||
"""
|
||
final_sql = text(insert_sql)
|
||
db.execute(final_sql, params)
|
||
success_count += len(batch)
|
||
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
|
||
|
||
# 注意:不在这里提交事务,由上层方法管理事务
|
||
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
|
||
'total_count': total_count,
|
||
'success_count': success_count,
|
||
'failed_count': failed_count,
|
||
'failed_items': failed_items
|
||
}
|
||
|
||
except Exception as e:
|
||
# 注意:不在这里回滚,由上层方法管理事务
|
||
logger.error(f"Batch import original data failed: {str(e)}")
|
||
return {
|
||
'success': False,
|
||
'message': f'批量导入失败: {str(e)}',
|
||
'total_count': total_count,
|
||
'success_count': 0,
|
||
'failed_count': total_count,
|
||
'failed_items': failed_items
|
||
}
|