Files
railway_cloud/app/services/original_data.py
2025-10-24 16:45:23 +08:00

399 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from sqlalchemy import text, inspect
from typing import List, Optional, Dict, Any
from ..models.original_data import OriginalData, get_original_data_model, get_table_name
from .base import BaseService
from ..models.settlement_data import SettlementData
from ..models.account import Account
from ..core.database import engine
import logging
logger = logging.getLogger(__name__)
class OriginalDataService(BaseService[OriginalData]):
def __init__(self):
super().__init__(OriginalData)
def _get_table_name(self, account_id: int) -> str:
"""获取原始数据表名"""
return get_table_name(account_id)
def _ensure_table_exists(self, db: Session, account_id: int) -> bool:
"""
确保指定账号的原始数据表存在,不存在则创建
Args:
db: 数据库会话
account_id: 账号ID
Returns:
bool: 表是否存在或创建成功
"""
table_name = self._get_table_name(account_id)
inspector = inspect(engine)
# 检查表是否存在
if table_name in inspector.get_table_names():
logger.info(f"Table {table_name} already exists")
return True
# 表不存在,创建表
try:
create_table_sql = f"""
CREATE TABLE `{table_name}` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`account_id` INT NOT NULL COMMENT '账号ID',
`bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称',
`mtime` DATETIME NOT NULL COMMENT '测点观测时间',
`bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符',
`bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)',
`bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)',
`NYID` VARCHAR(100) NOT NULL COMMENT '期数id',
`sort` INT COMMENT '序号',
INDEX `idx_nyid` (`NYID`),
INDEX `idx_account_id` (`account_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}'
"""
db.execute(text(create_table_sql))
db.commit()
logger.info(f"Table {table_name} created successfully")
return True
except Exception as e:
db.rollback()
logger.error(f"Failed to create table {table_name}: {str(e)}")
raise Exception(f"创建原始数据表失败: {str(e)}")
def create_table_for_account(self, db: Session, account_id: int) -> Dict[str, Any]:
"""
为指定账号创建原始数据表
Args:
db: 数据库会话
account_id: 账号ID
Returns:
操作结果
"""
try:
# 验证账号是否存在
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
return {
'success': False,
'message': f'账号ID {account_id} 不存在'
}
# 创建表
self._ensure_table_exists(db, account_id)
return {
'success': True,
'message': f'原始数据表 {self._get_table_name(account_id)} 创建成功'
}
except Exception as e:
logger.error(f"Failed to create table for account {account_id}: {str(e)}")
return {
'success': False,
'message': f'创建原始数据表失败: {str(e)}'
}
def get_by_nyid(self, db: Session, account_id: int, nyid: str) -> List[OriginalData]:
"""
根据期数ID获取原始数据
Args:
db: 数据库会话
account_id: 账号ID
nyid: 期数ID
Returns:
原始数据列表
"""
try:
table_name = self._get_table_name(account_id)
# 使用原生SQL查询
query = text(f"SELECT * FROM `{table_name}` WHERE NYID = :nyid")
result = db.execute(query, {"nyid": nyid})
return result.fetchall()
except Exception as e:
logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}")
return []
def get_by_bfpcode(self, db: Session, account_id: int, bfpcode: str) -> List[OriginalData]:
"""
根据前(后)视点名称获取原始数据
Args:
db: 数据库会话
account_id: 账号ID
bfpcode: 前(后)视点名称
Returns:
原始数据列表
"""
try:
table_name = self._get_table_name(account_id)
query = text(f"SELECT * FROM `{table_name}` WHERE bfpcode = :bfpcode")
result = db.execute(query, {"bfpcode": bfpcode})
return result.fetchall()
except Exception as e:
logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}")
return []
def get_all_original_data_tables(self, db: Session) -> List[str]:
"""
获取所有原始数据分表的表名
Args:
db: 数据库会话
Returns:
原始数据表名列表
"""
try:
inspector = inspect(engine)
all_tables = inspector.get_table_names()
# 筛选出所有原始数据分表
original_tables = [table for table in all_tables if table.startswith('original_data_')]
logger.info(f"Found {len(original_tables)} original data tables: {original_tables}")
return original_tables
except Exception as e:
logger.error(f"Failed to get original data tables: {str(e)}")
return []
def search_original_data(self, db: Session,
account_id: Optional[int] = None,
id: Optional[int] = None,
bfpcode: Optional[str] = None,
bffb: Optional[str] = None,
nyid: Optional[str] = None,
bfpl: Optional[str] = None) -> List[Any]:
"""
根据多个条件搜索原始数据
如果未提供account_id则遍历所有分表查询
Args:
db: 数据库会话
account_id: 账号ID可选。不填则查询所有分表
其他查询条件...
Returns:
原始数据列表
"""
print("搜索原始数据,参数:", {
"account_id": account_id,
"id": id,
"bfpcode": bfpcode,
"bffb": bffb,
"nyid": nyid,
"bfpl": bfpl})
try:
# 构建查询条件
conditions = []
params = {}
if id is not None:
conditions.append("id = :id")
params["id"] = id
if bfpcode is not None:
conditions.append("bfpcode = :bfpcode")
params["bfpcode"] = bfpcode
if bffb is not None:
conditions.append("bffb = :bffb")
params["bffb"] = bffb
if nyid is not None:
conditions.append("NYID = :nyid")
params["nyid"] = nyid
if bfpl is not None:
conditions.append("bfpl = :bfpl")
params["bfpl"] = bfpl
where_clause = " AND ".join(conditions) if conditions else "1=1"
# 如果指定了account_id只查询该账号的分表
if account_id is not None:
table_name = self._get_table_name(account_id)
query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}")
result = db.execute(query, params)
return result.fetchall()
# 未指定account_id遍历所有分表
all_results = []
tables = self.get_all_original_data_tables(db)
for table_name in tables:
try:
query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}")
result = db.execute(query, params)
rows = result.fetchall()
all_results.extend(rows)
logger.info(f"Found {len(rows)} records in table {table_name}")
except Exception as e:
logger.warning(f"Failed to query table {table_name}: {str(e)}")
continue
logger.info(f"Total found {len(all_results)} records from {len(tables)} tables")
return all_results
except Exception as e:
logger.error(f"Failed to search original data: {str(e)}")
return []
def _check_settlement_exists(self, db: Session, nyid: str) -> bool:
"""检查期数id沉降数据是否存在"""
settlement = db.query(SettlementData).filter(SettlementData.NYID == nyid).first()
return settlement is not None
def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]:
"""
批量导入原始数据到指定账号的分表,直接新增,无需检查重复
支持事务回滚,失败时重试一次
Args:
db: 数据库会话
data: 数据列表,每条数据必须包含account_id字段
Returns:
操作结果
"""
logger = logging.getLogger(__name__)
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
# 获取第一条数据的account_id
account_id = data[0].get('account_id')
if not account_id:
return {
'success': False,
'message': '数据中缺少account_id字段',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# 验证账号是否存在
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
return {
'success': False,
'message': f'账号ID {account_id} 不存在',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# 确保表存在
try:
self._ensure_table_exists(db, account_id)
except Exception as e:
return {
'success': False,
'message': f'创建原始数据表失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
table_name = self._get_table_name(account_id)
for attempt in range(2): # 最多重试1次
try:
db.begin()
success_count = 0
failed_count = 0
failed_items = []
nyid = data[0].get('NYID')
# 检查该期数数据是否已存在
check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid")
is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0]
if is_exists > 0:
db.rollback()
return {
'success': True,
'message': '数据已存在',
'total_count': 0,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
for item_data in data:
try:
# 判断期数id是否存在
settlement = self._check_settlement_exists(db, item_data.get('NYID'))
if not settlement:
logger.error(f"Settlement {item_data.get('NYID')} not found")
raise Exception(f"Settlement {item_data.get('NYID')} not found")
# 构建插入SQL
insert_sql = text(f"""
INSERT INTO `{table_name}`
(account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort)
VALUES
(:account_id, :bfpcode, :mtime, :bffb, :bfpl, :bfpvalue, :NYID, :sort)
""")
db.execute(insert_sql, {
"account_id": account_id,
"bfpcode": item_data.get('bfpcode'),
"mtime": item_data.get('mtime'),
"bffb": item_data.get('bffb'),
"bfpl": item_data.get('bfpl'),
"bfpvalue": item_data.get('bfpvalue'),
"NYID": item_data.get('NYID'),
"sort": item_data.get('sort')
})
logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')} in table {table_name}")
success_count += 1
except Exception as e:
failed_count += 1
failed_items.append({
'data': item_data,
'error': str(e)
})
logger.error(f"Failed to process original data {item_data.get('bfpcode')}-{item_data.get('NYID')}: {str(e)}")
raise e
db.commit()
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
db.rollback()
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import original data failed after retries")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}