Files
railway_cloud/upload_app/direct_import/database_service.py
2025-11-20 17:20:00 +08:00

832 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库操作服务层
直接操作数据库跳过HTTP请求环节
复用现有的业务逻辑和模型
"""
import os
import sys
import logging
from pathlib import Path
from typing import List, Optional, Dict, Any
# 添加项目根目录到Python路径
PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute()
sys.path.insert(0, str(PROJECT_ROOT))
# 导入数据库操作相关模块
from sqlalchemy import create_engine, text, inspect
from sqlalchemy.orm import sessionmaker, Session
from dotenv import load_dotenv
# 导入数据模型
from app.models.section_data import SectionData
from app.models.checkpoint import Checkpoint
from app.models.settlement_data import SettlementData
from app.models.level_data import LevelData
from app.models.account import Account
# 导入原始数据动态表支持
from app.models.original_data import get_original_data_model, get_table_name
# 加载环境变量
load_dotenv(PROJECT_ROOT / ".env")
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DatabaseService:
"""数据库操作服务类 - 直接操作数据库"""
def __init__(self):
"""初始化数据库连接"""
# 构建数据库URL
database_url = os.getenv("DATABASE_URL")
if not database_url:
# 从单独的数据库配置构建URL
db_host = os.getenv("DB_HOST", "localhost")
db_port = os.getenv("DB_PORT", "3306")
db_user = os.getenv("DB_USER", "root")
db_password = os.getenv("DB_PASSWORD", "root")
db_name = os.getenv("DB_NAME", "railway")
database_url = f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
logger.info(f"数据库连接: {database_url}")
# 创建数据库引擎和会话工厂
self.engine = create_engine(
database_url,
pool_pre_ping=True,
pool_recycle=3600,
echo=False # 设置为True可以看到SQL语句
)
self.SessionLocal = sessionmaker(bind=self.engine)
def get_db_session(self) -> Session:
"""获取数据库会话"""
return self.SessionLocal()
# ==================== 断面数据服务 ====================
def batch_import_sections(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""批量导入断面数据 - 复用现有业务逻辑"""
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
try:
db.begin()
# 批量查询现有断面数据IN查询
section_id_list = list(set(str(item.get('section_id')) for item in data if item.get('section_id')))
logger.info(f"Checking {len(section_id_list)} unique section_ids")
existing_sections = db.query(SectionData).filter(SectionData.section_id.in_(section_id_list)).all()
# 使用section_id创建查找表
existing_map = {
section.section_id: section
for section in existing_sections
}
logger.info(f"Found {len(existing_sections)} existing sections")
# 批量处理插入和跳过
to_insert = []
for item_data in data:
section_id = str(item_data.get('section_id'))
if section_id in existing_map:
# 数据已存在,跳过
logger.info(f"Continue section data: {section_id}")
failed_count += 1
failed_items.append({
'data': item_data,
'error': '数据已存在,跳过插入操作'
})
else:
# 记录需要插入的数据
to_insert.append(item_data)
# 执行批量插入
if to_insert:
logger.info(f"Inserting {len(to_insert)} new records")
batch_size = 500
for i in range(0, len(to_insert), batch_size):
batch = to_insert[i:i + batch_size]
try:
section_data_list = [
SectionData(
section_id=str(item.get('section_id')),
mileage=item.get('mileage'),
work_site=item.get('work_site'),
basic_types=item.get('basic_types'),
height=item.get('height'),
status=item.get('status'),
number=str(item.get('number')) if item.get('number') else None,
transition_paragraph=item.get('transition_paragraph'),
design_fill_height=item.get('design_fill_height'),
compression_layer_thickness=item.get('compression_layer_thickness'),
treatment_depth=item.get('treatment_depth'),
foundation_treatment_method=item.get('foundation_treatment_method'),
rock_mass_classification=item.get('rock_mass_classification'),
account_id=str(item.get('account_id')) if item.get('account_id') else None
)
for item in batch
]
db.add_all(section_data_list)
success_count += len(batch)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
except Exception as e:
failed_count += len(batch)
failed_items.extend([
{
'data': item,
'error': f'插入失败: {str(e)}'
}
for item in batch
])
logger.error(f"Failed to insert batch: {str(e)}")
raise e
db.commit()
logger.info(f"Batch import sections completed. Success: {success_count}, Failed: {failed_count}")
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
except Exception as e:
db.rollback()
logger.error(f"Batch import sections failed: {str(e)}")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
# ==================== 观测点数据服务 ====================
def batch_import_checkpoints(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""批量导入观测点数据 - 复用现有业务逻辑"""
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
try:
db.begin()
# 批量查询断面数据IN查询
section_id_list = list(set(str(item.get('section_id')) for item in data if item.get('section_id')))
logger.info(f"Checking {len(section_id_list)} unique section_ids in section data")
sections = db.query(SectionData).filter(SectionData.section_id.in_(section_id_list)).all()
section_map = {s.section_id: s for s in sections}
missing_section_ids = set(section_id_list) - set(section_map.keys())
# 记录缺失的断面
for item_data in data:
section_id = str(item_data.get('section_id'))
if section_id in missing_section_ids:
failed_count += 1
failed_items.append({
'data': item_data,
'error': '断面ID不存在跳过插入操作'
})
# 如果所有数据都失败,直接返回
if failed_count == total_count:
db.rollback()
return {
'success': False,
'message': '所有断面ID都不存在',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
# 批量查询现有观测点数据
valid_items = [item for item in data if str(item.get('section_id')) not in missing_section_ids]
if valid_items:
point_id_list = list(set(str(item.get('point_id')) for item in valid_items if item.get('point_id')))
existing_checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all()
existing_map = {
checkpoint.point_id: checkpoint
for checkpoint in existing_checkpoints
}
logger.info(f"Found {len(existing_checkpoints)} existing checkpoints")
to_insert = []
for item_data in valid_items:
point_id = str(item_data.get('point_id'))
if point_id in existing_map:
# 数据已存在,跳过
logger.info(f"Continue checkpoint data: {point_id}")
failed_count += 1
failed_items.append({
'data': item_data,
'error': '数据已存在,跳过插入操作'
})
else:
to_insert.append(item_data)
# 执行批量插入
if to_insert:
logger.info(f"Inserting {len(to_insert)} new records")
batch_size = 500
for i in range(0, len(to_insert), batch_size):
batch = to_insert[i:i + batch_size]
try:
checkpoint_list = [
Checkpoint(
point_id=str(item.get('point_id')),
aname=item.get('aname'),
section_id=str(item.get('section_id')),
burial_date=item.get('burial_date')
)
for item in batch
]
db.add_all(checkpoint_list)
success_count += len(batch)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
except Exception as e:
failed_count += len(batch)
failed_items.extend([
{
'data': item,
'error': f'插入失败: {str(e)}'
}
for item in batch
])
logger.error(f"Failed to insert batch: {str(e)}")
raise e
db.commit()
logger.info(f"Batch import checkpoints completed. Success: {success_count}, Failed: {failed_count}")
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
except Exception as e:
db.rollback()
logger.error(f"Batch import checkpoints failed: {str(e)}")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
# ==================== 沉降数据服务 ====================
def batch_import_settlement_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""批量导入沉降数据 - 复用现有业务逻辑"""
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
try:
db.begin()
# 批量查询观测点数据
point_id_list = list(set(str(item.get('point_id')) for item in data if item.get('point_id')))
logger.info(f"Checking {len(point_id_list)} unique point_ids in checkpoint data")
checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all()
checkpoint_map = {c.point_id: c for c in checkpoints}
missing_point_ids = set(point_id_list) - set(checkpoint_map.keys())
# 记录缺失的观测点
for item_data in data:
point_id = str(item_data.get('point_id'))
if point_id in missing_point_ids:
failed_count += 1
failed_items.append({
'data': item_data,
'error': '测点id不存在跳过插入操作'
})
if failed_count == total_count:
db.rollback()
return {
'success': False,
'message': '所有观测点ID都不存在',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
# 批量查询现有沉降数据
valid_items = [item for item in data if str(item.get('point_id')) not in missing_point_ids]
if valid_items:
existing_data = db.query(SettlementData).filter(
SettlementData.point_id.in_(point_id_list)
).all()
existing_map = {
f"{item.point_id}_{item.NYID}": item
for item in existing_data
}
logger.info(f"Found {len(existing_data)} existing settlement records")
to_insert = []
for item_data in valid_items:
point_id = str(item_data.get('point_id'))
nyid = str(item_data.get('NYID'))
key = f"{point_id}_{nyid}"
if key in existing_map:
# 数据已存在,跳过
logger.info(f"Continue settlement data: {point_id}-{nyid}")
failed_count += 1
failed_items.append({
'data': item_data,
'error': '数据已存在,跳过插入操作'
})
else:
to_insert.append(item_data)
# 执行批量插入
if to_insert:
logger.info(f"Inserting {len(to_insert)} new records")
batch_size = 500
for i in range(0, len(to_insert), batch_size):
batch = to_insert[i:i + batch_size]
try:
settlement_data_list = [
SettlementData(
point_id=str(item.get('point_id')),
CVALUE=item.get('CVALUE'),
MAVALUE=item.get('MAVALUE'),
MTIME_W=item.get('MTIME_W'),
NYID=str(item.get('NYID')),
PRELOADH=item.get('PRELOADH'),
PSTATE=item.get('PSTATE'),
REMARK=item.get('REMARK'),
WORKINFO=item.get('WORKINFO'),
createdate=item.get('createdate'),
day=item.get('day'),
day_jg=item.get('day_jg'),
isgzjdxz=item.get('isgzjdxz'),
mavalue_bc=item.get('mavalue_bc'),
mavalue_lj=item.get('mavalue_lj'),
sjName=item.get('sjName'),
useflag=item.get('useflag'),
workinfoname=item.get('workinfoname'),
upd_remark=item.get('upd_remark')
)
for item in batch
]
db.add_all(settlement_data_list)
success_count += len(batch)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
except Exception as e:
failed_count += len(batch)
failed_items.extend([
{
'data': item,
'error': f'插入失败: {str(e)}'
}
for item in batch
])
logger.error(f"Failed to insert batch: {str(e)}")
raise e
db.commit()
logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}")
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
except Exception as e:
db.rollback()
logger.error(f"Batch import settlement data failed: {str(e)}")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
# ==================== 水准数据服务 ====================
def batch_import_level_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""批量导入水准数据 - 复用现有业务逻辑"""
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
try:
db.begin()
# 批量查询沉降数据
nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID')))
logger.info(f"Checking {len(nyid_list)} unique NYIDs in settlement data")
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
settlement_map = {s.NYID: s for s in settlements}
missing_nyids = set(nyid_list) - set(settlement_map.keys())
# 记录缺失的NYID
for item_data in data:
nyid = str(item_data.get('NYID'))
if nyid in missing_nyids:
failed_count += 1
failed_items.append({
'data': item_data,
'error': '期数ID在沉降表中不存在跳过插入操作'
})
if failed_count == total_count:
db.rollback()
return {
'success': False,
'message': '所有期数ID在沉降表中都不存在',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
# 批量查询现有水准数据
valid_items = [item for item in data if str(item.get('NYID')) not in missing_nyids]
if valid_items:
existing_data = db.query(LevelData).filter(
LevelData.NYID.in_(nyid_list)
).all()
existing_map = {
f"{item.NYID}_{item.linecode}": item
for item in existing_data
}
logger.info(f"Found {len(existing_data)} existing level records")
to_insert = []
for item_data in valid_items:
nyid = str(item_data.get('NYID'))
linecode = item_data.get('linecode')
key = f"{nyid}_{linecode}"
if key in existing_map:
# 数据已存在,跳过
logger.info(f"Continue level data: {nyid}-{linecode}")
failed_count += 1
failed_items.append({
'data': item_data,
'error': '数据已存在,跳过插入操作'
})
else:
to_insert.append(item_data)
# 执行批量插入
if to_insert:
logger.info(f"Inserting {len(to_insert)} new records")
batch_size = 500
for i in range(0, len(to_insert), batch_size):
batch = to_insert[i:i + batch_size]
try:
level_data_list = [
LevelData(
linecode=str(item.get('linecode')),
benchmarkids=item.get('benchmarkids'),
wsphigh=item.get('wsphigh'),
mtype=item.get('mtype'),
NYID=str(item.get('NYID')),
createDate=item.get('createDate'),
wspversion=item.get('wspversion'),
barometric=str(item.get('barometric')) if item.get('barometric') is not None else None,
equipbrand=item.get('equipbrand'),
instrumodel=item.get('instrumodel'),
serialnum=item.get('serialnum'),
sjname=item.get('sjname'),
temperature=str(item.get('temperature')) if item.get('temperature') is not None else None,
weather=str(item.get('weather')) if item.get('weather') is not None else None
)
for item in batch
]
db.add_all(level_data_list)
success_count += len(batch)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
except Exception as e:
failed_count += len(batch)
failed_items.extend([
{
'data': item,
'error': f'插入失败: {str(e)}'
}
for item in batch
])
logger.error(f"Failed to insert batch: {str(e)}")
raise e
db.commit()
logger.info(f"Batch import level data completed. Success: {success_count}, Failed: {failed_count}")
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
except Exception as e:
db.rollback()
logger.error(f"Batch import level data failed: {str(e)}")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
# ==================== 原始数据服务 ====================
def _ensure_table_exists(self, account_id: int) -> bool:
"""确保指定账号的原始数据表存在,不存在则创建"""
table_name = get_table_name(account_id)
inspector = inspect(self.engine)
# 检查表是否存在
if table_name in inspector.get_table_names():
logger.info(f"Table {table_name} already exists")
return True
# 表不存在,创建表
max_retries = 3
for attempt in range(max_retries):
try:
create_table_sql = f"""
CREATE TABLE `{table_name}` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`account_id` INT NOT NULL COMMENT '账号ID',
`bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称',
`mtime` DATETIME NOT NULL COMMENT '测点观测时间',
`bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符',
`bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)',
`bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)',
`NYID` VARCHAR(100) NOT NULL COMMENT '期数id',
`sort` INT COMMENT '序号',
INDEX `idx_nyid` (`NYID`),
INDEX `idx_account_id` (`account_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}'
"""
# 使用引擎直接执行不需要事务CREATE TABLE是自动提交的
with self.engine.begin() as conn:
conn.execute(text(create_table_sql))
logger.info(f"Table {table_name} created successfully")
return True
except Exception as e:
logger.warning(f"Attempt {attempt + 1} to create table {table_name} failed: {str(e)}")
if attempt == max_retries - 1:
logger.error(f"Failed to create table {table_name} after {max_retries} attempts")
return False
import time
time.sleep(0.1 * (attempt + 1))
return False
def batch_import_original_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""批量导入原始数据 - 复用现有业务逻辑,支持分表"""
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
# 获取数据的account_id
account_id = data[0].get('account_id')
if not account_id:
return {
'success': False,
'message': '数据中缺少account_id字段',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# 验证账号是否存在
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
return {
'success': False,
'message': f'账号ID {account_id} 不存在',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# 确保表存在
table_created = self._ensure_table_exists(account_id)
if not table_created:
return {
'success': False,
'message': '创建原始数据表失败',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
table_name = get_table_name(account_id)
try:
# 注意不开启db.begin(),由上层方法管理事务
# 批量查询沉降数据是否存在
nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID')))
from app.models.settlement_data import SettlementData
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
settlement_map = {s.NYID: s for s in settlements}
missing_nyids = set(nyid_list) - set(settlement_map.keys())
if missing_nyids:
raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}')
# 查询现有原始数据
existing_data = db.query(text("*")).from_statement(
text(f"SELECT * FROM `{table_name}` WHERE account_id = :account_id")
).params(account_id=account_id).all()
existing_map = {
f"{item[7]}_{item[8]}": item # NYID是第8个字段索引7sort是第9个字段索引8
for item in existing_data
}
logger.info(f"Found {len(existing_data)} existing records in {table_name}")
# 批量处理插入和跳过
to_insert = []
skipped_count = 0
for item_data in data:
nyid = str(item_data.get('NYID'))
sort = item_data.get('sort')
key = f"{nyid}_{sort}"
if key in existing_map:
# 数据已存在,跳过
skipped_count += 1
else:
to_insert.append(item_data)
logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert")
# 执行批量插入
if to_insert:
logger.info(f"Inserting {len(to_insert)} new records")
batch_size = 1000
for i in range(0, len(to_insert), batch_size):
batch = to_insert[i:i + batch_size]
# 构建批量参数
values_list = []
params = {}
for idx, item_data in enumerate(batch):
values_list.append(
f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, "
f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})"
)
params.update({
f"account_id_{idx}": account_id,
f"bfpcode_{idx}": item_data.get('bfpcode'),
f"mtime_{idx}": item_data.get('mtime'),
f"bffb_{idx}": item_data.get('bffb'),
f"bfpl_{idx}": item_data.get('bfpl'),
f"bfpvalue_{idx}": item_data.get('bfpvalue'),
f"NYID_{idx}": item_data.get('NYID'),
f"sort_{idx}": item_data.get('sort')
})
# 批量插入SQL
insert_sql = f"""
INSERT INTO `{table_name}`
(account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort)
VALUES {", ".join(values_list)}
"""
final_sql = text(insert_sql)
db.execute(final_sql, params)
success_count += len(batch)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
# 注意:不在这里提交事务,由上层方法管理事务
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
except Exception as e:
# 注意:不在这里回滚,由上层方法管理事务
logger.error(f"Batch import original data failed: {str(e)}")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}