from sqlalchemy.orm import Session from sqlalchemy import text, inspect from typing import List, Optional, Dict, Any from ..models.original_data import OriginalData, get_original_data_model, get_table_name from .base import BaseService from ..models.settlement_data import SettlementData from ..models.account import Account from ..core.database import engine import logging logger = logging.getLogger(__name__) class OriginalDataService(BaseService[OriginalData]): def __init__(self): super().__init__(OriginalData) def _get_table_name(self, account_id: int) -> str: """获取原始数据表名""" return get_table_name(account_id) def _ensure_table_exists(self, db: Session, account_id: int) -> bool: """ 确保指定账号的原始数据表存在,不存在则创建 Args: db: 数据库会话 account_id: 账号ID Returns: bool: 表是否存在或创建成功 """ table_name = self._get_table_name(account_id) inspector = inspect(engine) # 检查表是否存在 if table_name in inspector.get_table_names(): logger.info(f"Table {table_name} already exists") return True # 表不存在,创建表 try: create_table_sql = f""" CREATE TABLE `{table_name}` ( `id` INT AUTO_INCREMENT PRIMARY KEY, `account_id` INT NOT NULL COMMENT '账号ID', `bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称', `mtime` DATETIME NOT NULL COMMENT '测点观测时间', `bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符', `bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)', `bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)', `NYID` VARCHAR(100) NOT NULL COMMENT '期数id', `sort` INT COMMENT '序号', INDEX `idx_nyid` (`NYID`), INDEX `idx_account_id` (`account_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}' """ db.execute(text(create_table_sql)) db.commit() logger.info(f"Table {table_name} created successfully") return True except Exception as e: db.rollback() logger.error(f"Failed to create table {table_name}: {str(e)}") raise Exception(f"创建原始数据表失败: {str(e)}") def create_table_for_account(self, db: Session, account_id: int) -> Dict[str, Any]: """ 为指定账号创建原始数据表 Args: db: 数据库会话 account_id: 账号ID Returns: 操作结果 """ try: # 验证账号是否存在 account = db.query(Account).filter(Account.id == account_id).first() if not account: return { 'success': False, 'message': f'账号ID {account_id} 不存在' } # 创建表 self._ensure_table_exists(db, account_id) return { 'success': True, 'message': f'原始数据表 {self._get_table_name(account_id)} 创建成功' } except Exception as e: logger.error(f"Failed to create table for account {account_id}: {str(e)}") return { 'success': False, 'message': f'创建原始数据表失败: {str(e)}' } def get_by_nyid(self, db: Session, account_id: int, nyid: str) -> List[OriginalData]: """ 根据期数ID获取原始数据 Args: db: 数据库会话 account_id: 账号ID nyid: 期数ID Returns: 原始数据列表 """ try: table_name = self._get_table_name(account_id) # 使用原生SQL查询 query = text(f"SELECT * FROM `{table_name}` WHERE NYID = :nyid") result = db.execute(query, {"nyid": nyid}) return result.fetchall() except Exception as e: logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") return [] def get_by_bfpcode(self, db: Session, account_id: int, bfpcode: str) -> List[OriginalData]: """ 根据前(后)视点名称获取原始数据 Args: db: 数据库会话 account_id: 账号ID bfpcode: 前(后)视点名称 Returns: 原始数据列表 """ try: table_name = self._get_table_name(account_id) query = text(f"SELECT * FROM `{table_name}` WHERE bfpcode = :bfpcode") result = db.execute(query, {"bfpcode": bfpcode}) return result.fetchall() except Exception as e: logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") return [] def get_all_original_data_tables(self, db: Session) -> List[str]: """ 获取所有原始数据分表的表名 Args: db: 数据库会话 Returns: 原始数据表名列表 """ try: inspector = inspect(engine) all_tables = inspector.get_table_names() # 筛选出所有原始数据分表 original_tables = [table for table in all_tables if table.startswith('original_data_')] logger.info(f"Found {len(original_tables)} original data tables: {original_tables}") return original_tables except Exception as e: logger.error(f"Failed to get original data tables: {str(e)}") return [] def search_original_data(self, db: Session, account_id: Optional[int] = None, id: Optional[int] = None, bfpcode: Optional[str] = None, bffb: Optional[str] = None, nyid: Optional[str] = None, bfpl: Optional[str] = None) -> List[Any]: """ 根据多个条件搜索原始数据 如果未提供account_id,则遍历所有分表查询 Args: db: 数据库会话 account_id: 账号ID,可选。不填则查询所有分表 其他查询条件... Returns: 原始数据列表 """ print("搜索原始数据,参数:", { "account_id": account_id, "id": id, "bfpcode": bfpcode, "bffb": bffb, "nyid": nyid, "bfpl": bfpl}) try: # 构建查询条件 conditions = [] params = {} if id is not None: conditions.append("id = :id") params["id"] = id if bfpcode is not None: conditions.append("bfpcode = :bfpcode") params["bfpcode"] = bfpcode if bffb is not None: conditions.append("bffb = :bffb") params["bffb"] = bffb if nyid is not None: conditions.append("NYID = :nyid") params["nyid"] = nyid if bfpl is not None: conditions.append("bfpl = :bfpl") params["bfpl"] = bfpl where_clause = " AND ".join(conditions) if conditions else "1=1" # 如果指定了account_id,只查询该账号的分表 if account_id is not None: table_name = self._get_table_name(account_id) query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") result = db.execute(query, params) return result.fetchall() # 未指定account_id,遍历所有分表 all_results = [] tables = self.get_all_original_data_tables(db) for table_name in tables: try: query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") result = db.execute(query, params) rows = result.fetchall() all_results.extend(rows) logger.info(f"Found {len(rows)} records in table {table_name}") except Exception as e: logger.warning(f"Failed to query table {table_name}: {str(e)}") continue logger.info(f"Total found {len(all_results)} records from {len(tables)} tables") return all_results except Exception as e: logger.error(f"Failed to search original data: {str(e)}") return [] def _check_settlement_exists(self, db: Session, nyid: str) -> bool: """检查期数id沉降数据是否存在""" settlement = db.query(SettlementData).filter(SettlementData.NYID == nyid).first() return settlement is not None def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入原始数据到指定账号的分表 - 性能优化版 使用批量插入替代逐条插入,大幅提升导入速度 Args: db: 数据库会话 data: 数据列表,每条数据必须包含account_id字段 Returns: 操作结果 """ logger = logging.getLogger(__name__) total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } # 获取第一条数据的account_id account_id = data[0].get('account_id') if not account_id: return { 'success': False, 'message': '数据中缺少account_id字段', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # 验证账号是否存在 account = db.query(Account).filter(Account.id == account_id).first() if not account: return { 'success': False, 'message': f'账号ID {account_id} 不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # 确保表存在 try: self._ensure_table_exists(db, account_id) except Exception as e: return { 'success': False, 'message': f'创建原始数据表失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } table_name = self._get_table_name(account_id) for attempt in range(2): # 最多重试1次 try: db.begin() success_count = 0 failed_count = 0 failed_items = [] nyid = str(data[0].get('NYID')) # 统一转换为字符串 # 检查该期数数据是否已存在 check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] if is_exists > 0: db.rollback() return { 'success': True, 'message': '数据已存在', 'total_count': 0, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } # ===== 性能优化:批量查询沉降数据 ===== # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) logger.info(f"Querying settlement data for nyid list: {nyid_list}") settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() logger.info(f"Found {len(settlements)} settlement records") settlement_map = {s.NYID: s for s in settlements} missing_nyids = set(nyid_list) - set(settlement_map.keys()) if missing_nyids: db.rollback() return { 'success': False, 'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # ===== 性能优化:使用批量插入 ===== # 将数据分组,每组1000条(MySQL默认支持) batch_size = 1000 for i in range(0, len(data), batch_size): batch_data = data[i:i + batch_size] # 构建批量参数 values_list = [] params = {} for idx, item_data in enumerate(batch_data): values_list.append( f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, " f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})" ) params.update({ f"account_id_{idx}": account_id, f"bfpcode_{idx}": item_data.get('bfpcode'), f"mtime_{idx}": item_data.get('mtime'), f"bffb_{idx}": item_data.get('bffb'), f"bfpl_{idx}": item_data.get('bfpl'), f"bfpvalue_{idx}": item_data.get('bfpvalue'), f"NYID_{idx}": item_data.get('NYID'), f"sort_{idx}": item_data.get('sort') }) # 批量插入SQL - 使用字符串拼接(修复TextClause拼接问题) insert_sql = f""" INSERT INTO `{table_name}` (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) VALUES {", ".join(values_list)} """ final_sql = text(insert_sql) db.execute(final_sql, params) success_count += len(batch_data) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") db.commit() logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") break except Exception as e: db.rollback() logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") if attempt == 1: # 最后一次重试失败 logger.error("Batch import original data failed after retries") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items }