Files
railway_cloud/app/services/original_data.py
2025-11-18 09:27:06 +08:00

443 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from sqlalchemy import text, inspect
from typing import List, Optional, Dict, Any
from ..models.original_data import OriginalData, get_original_data_model, get_table_name
from .base import BaseService
from ..models.settlement_data import SettlementData
from ..models.account import Account
from ..core.database import engine
import logging
logger = logging.getLogger(__name__)
class OriginalDataService(BaseService[OriginalData]):
def __init__(self):
super().__init__(OriginalData)
def _get_table_name(self, account_id: int) -> str:
"""获取原始数据表名"""
return get_table_name(account_id)
def _ensure_table_exists(self, db: Session, account_id: int) -> bool:
"""
确保指定账号的原始数据表存在,不存在则创建
Args:
db: 数据库会话
account_id: 账号ID
Returns:
bool: 表是否存在或创建成功
"""
table_name = self._get_table_name(account_id)
inspector = inspect(engine)
# 检查表是否存在
if table_name in inspector.get_table_names():
logger.info(f"Table {table_name} already exists")
return True
# 表不存在,创建表 - 添加重试机制
max_retries = 3
for attempt in range(max_retries):
try:
create_table_sql = f"""
CREATE TABLE `{table_name}` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`account_id` INT NOT NULL COMMENT '账号ID',
`bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称',
`mtime` DATETIME NOT NULL COMMENT '测点观测时间',
`bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符',
`bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)',
`bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)',
`NYID` VARCHAR(100) NOT NULL COMMENT '期数id',
`sort` INT COMMENT '序号',
INDEX `idx_nyid` (`NYID`),
INDEX `idx_account_id` (`account_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}'
"""
# 使用引擎直接执行不依赖db会话的事务
with engine.begin() as conn: # 创建独立连接
conn.execute(text(create_table_sql))
logger.info(f"Table {table_name} created successfully on attempt {attempt + 1}")
return True
except Exception as e:
logger.warning(f"Attempt {attempt + 1} to create table {table_name} failed: {str(e)}")
# 检查是否是表已存在错误(并发情况下可能发生)
if "already exists" in str(e).lower() or "exist" in str(e).lower():
logger.info(f"Table {table_name} was created by another process")
return True
if attempt == max_retries - 1:
db.rollback() # 确保回滚当前事务
logger.error(f"Failed to create table {table_name} after {max_retries} attempts: {str(e)}")
raise Exception(f"创建原始数据表失败: {str(e)}")
# 短暂延迟后重试
import time
time.sleep(0.1 * (attempt + 1))
return False
def create_table_for_account(self, db: Session, account_id: int) -> Dict[str, Any]:
"""
为指定账号创建原始数据表
Args:
db: 数据库会话
account_id: 账号ID
Returns:
操作结果
"""
try:
# 验证账号是否存在
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
return {
'success': False,
'message': f'账号ID {account_id} 不存在'
}
# 创建表 - 现在使用增强的错误处理
table_created = self._ensure_table_exists(db, account_id)
if table_created:
return {
'success': True,
'message': f'原始数据表 {self._get_table_name(account_id)} 创建成功'
}
else:
return {
'success': False,
'message': f'原始数据表 {self._get_table_name(account_id)} 创建失败'
}
except Exception as e:
logger.error(f"Failed to create table for account {account_id}: {str(e)}")
return {
'success': False,
'message': f'创建原始数据表失败: {str(e)}'
}
def get_by_nyid(self, db: Session, account_id: int, nyid: str) -> List[OriginalData]:
"""
根据期数ID获取原始数据
Args:
db: 数据库会话
account_id: 账号ID
nyid: 期数ID
Returns:
原始数据列表
"""
try:
table_name = self._get_table_name(account_id)
# 使用原生SQL查询
query = text(f"SELECT * FROM `{table_name}` WHERE NYID = :nyid")
result = db.execute(query, {"nyid": nyid})
return result.fetchall()
except Exception as e:
logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}")
return []
def get_by_bfpcode(self, db: Session, account_id: int, bfpcode: str) -> List[OriginalData]:
"""
根据前(后)视点名称获取原始数据
Args:
db: 数据库会话
account_id: 账号ID
bfpcode: 前(后)视点名称
Returns:
原始数据列表
"""
try:
table_name = self._get_table_name(account_id)
query = text(f"SELECT * FROM `{table_name}` WHERE bfpcode = :bfpcode")
result = db.execute(query, {"bfpcode": bfpcode})
return result.fetchall()
except Exception as e:
logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}")
return []
def get_all_original_data_tables(self, db: Session) -> List[str]:
"""
获取所有原始数据分表的表名
Args:
db: 数据库会话
Returns:
原始数据表名列表
"""
try:
inspector = inspect(engine)
all_tables = inspector.get_table_names()
# 筛选出所有原始数据分表
original_tables = [table for table in all_tables if table.startswith('original_data_')]
logger.info(f"Found {len(original_tables)} original data tables: {original_tables}")
return original_tables
except Exception as e:
logger.error(f"Failed to get original data tables: {str(e)}")
return []
def search_original_data(self, db: Session,
account_id: Optional[int] = None,
id: Optional[int] = None,
bfpcode: Optional[str] = None,
bffb: Optional[str] = None,
nyid: Optional[str] = None,
bfpl: Optional[str] = None) -> List[Any]:
"""
根据多个条件搜索原始数据
如果未提供account_id则遍历所有分表查询
Args:
db: 数据库会话
account_id: 账号ID可选。不填则查询所有分表
其他查询条件...
Returns:
原始数据列表
"""
print("搜索原始数据,参数:", {
"account_id": account_id,
"id": id,
"bfpcode": bfpcode,
"bffb": bffb,
"nyid": nyid,
"bfpl": bfpl})
try:
# 构建查询条件
conditions = []
params = {}
if id is not None:
conditions.append("id = :id")
params["id"] = id
if bfpcode is not None:
conditions.append("bfpcode = :bfpcode")
params["bfpcode"] = bfpcode
if bffb is not None:
conditions.append("bffb = :bffb")
params["bffb"] = bffb
if nyid is not None:
conditions.append("NYID = :nyid")
params["nyid"] = nyid
if bfpl is not None:
conditions.append("bfpl = :bfpl")
params["bfpl"] = bfpl
where_clause = " AND ".join(conditions) if conditions else "1=1"
# 如果指定了account_id只查询该账号的分表
if account_id is not None:
table_name = self._get_table_name(account_id)
query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}")
result = db.execute(query, params)
return result.fetchall()
# 未指定account_id遍历所有分表
all_results = []
tables = self.get_all_original_data_tables(db)
for table_name in tables:
try:
query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}")
result = db.execute(query, params)
rows = result.fetchall()
all_results.extend(rows)
logger.info(f"Found {len(rows)} records in table {table_name}")
except Exception as e:
logger.warning(f"Failed to query table {table_name}: {str(e)}")
continue
logger.info(f"Total found {len(all_results)} records from {len(tables)} tables")
return all_results
except Exception as e:
logger.error(f"Failed to search original data: {str(e)}")
return []
def _check_settlement_exists(self, db: Session, nyid: str) -> bool:
"""检查期数id沉降数据是否存在"""
settlement = db.query(SettlementData).filter(SettlementData.NYID == nyid).first()
return settlement is not None
def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]:
"""
批量导入原始数据到指定账号的分表 - 性能优化版
使用批量插入替代逐条插入,大幅提升导入速度
Args:
db: 数据库会话
data: 数据列表,每条数据必须包含account_id字段
Returns:
操作结果
"""
logger = logging.getLogger(__name__)
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
# 获取第一条数据的account_id
account_id = data[0].get('account_id')
if not account_id:
return {
'success': False,
'message': '数据中缺少account_id字段',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# 验证账号是否存在
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
return {
'success': False,
'message': f'账号ID {account_id} 不存在',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# 确保表存在 - 现在使用增强的错误处理和重试机制
table_created = self._ensure_table_exists(db, account_id)
if not table_created:
return {
'success': False,
'message': '创建原始数据表失败',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
table_name = self._get_table_name(account_id)
for attempt in range(2): # 最多重试1次
try:
db.begin()
success_count = 0
failed_count = 0
failed_items = []
nyid = str(data[0].get('NYID')) # 统一转换为字符串
# 检查该期数数据是否已存在
check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid")
is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0]
if is_exists > 0:
db.rollback()
return {
'success': True,
'message': '数据已存在',
'total_count': 0,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
# ===== 性能优化:批量查询沉降数据 =====
# 统一转换为字符串处理数据库NYID字段是VARCHAR类型
nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID')))
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
settlement_map = {s.NYID: s for s in settlements}
missing_nyids = set(nyid_list) - set(settlement_map.keys())
if missing_nyids:
logger.warning(f"[批量导入原始数据] 批量查询settlement数据失败 Nyid: {list(missing_nyids)}")
db.rollback()
return {
'success': False,
'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# ===== 性能优化:使用批量插入 =====
# 将数据分组每组1000条MySQL默认支持
batch_size = 1000
for i in range(0, len(data), batch_size):
batch_data = data[i:i + batch_size]
# 构建批量参数
values_list = []
params = {}
for idx, item_data in enumerate(batch_data):
values_list.append(
f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, "
f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})"
)
params.update({
f"account_id_{idx}": account_id,
f"bfpcode_{idx}": item_data.get('bfpcode'),
f"mtime_{idx}": item_data.get('mtime'),
f"bffb_{idx}": item_data.get('bffb'),
f"bfpl_{idx}": item_data.get('bfpl'),
f"bfpvalue_{idx}": item_data.get('bfpvalue'),
f"NYID_{idx}": item_data.get('NYID'),
f"sort_{idx}": item_data.get('sort')
})
# 批量插入SQL - 使用字符串拼接修复TextClause拼接问题
insert_sql = f"""
INSERT INTO `{table_name}`
(account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort)
VALUES {", ".join(values_list)}
"""
final_sql = text(insert_sql)
db.execute(final_sql, params)
success_count += len(batch_data)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records")
db.commit()
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
db.rollback()
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import original data failed after retries")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}