from sqlalchemy.orm import Session from sqlalchemy import text, inspect from typing import List, Optional, Dict, Any from ..models.original_data import OriginalData, get_original_data_model, get_table_name from .base import BaseService from ..models.settlement_data import SettlementData from ..models.account import Account from ..core.database import engine import logging logger = logging.getLogger(__name__) class OriginalDataService(BaseService[OriginalData]): def __init__(self): super().__init__(OriginalData) def _get_table_name(self, account_id: int) -> str: """获取原始数据表名""" return get_table_name(account_id) def _ensure_table_exists(self, db: Session, account_id: int) -> bool: """ 确保指定账号的原始数据表存在,不存在则创建 Args: db: 数据库会话 account_id: 账号ID Returns: bool: 表是否存在或创建成功 """ table_name = self._get_table_name(account_id) inspector = inspect(engine) # 检查表是否存在 if table_name in inspector.get_table_names(): logger.info(f"Table {table_name} already exists") return True # 表不存在,创建表 - 添加重试机制 max_retries = 3 for attempt in range(max_retries): try: create_table_sql = f""" CREATE TABLE `{table_name}` ( `id` INT AUTO_INCREMENT PRIMARY KEY, `account_id` INT NOT NULL COMMENT '账号ID', `bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称', `mtime` DATETIME NOT NULL COMMENT '测点观测时间', `bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符', `bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)', `bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)', `NYID` VARCHAR(100) NOT NULL COMMENT '期数id', `sort` INT COMMENT '序号', INDEX `idx_nyid` (`NYID`), INDEX `idx_account_id` (`account_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}' """ # 使用引擎直接执行,不依赖db会话的事务 with engine.begin() as conn: # 创建独立连接 conn.execute(text(create_table_sql)) logger.info(f"Table {table_name} created successfully on attempt {attempt + 1}") return True except Exception as e: logger.warning(f"Attempt {attempt + 1} to create table {table_name} failed: {str(e)}") # 检查是否是表已存在错误(并发情况下可能发生) if "already exists" in str(e).lower() or "exist" in str(e).lower(): logger.info(f"Table {table_name} was created by another process") return True if attempt == max_retries - 1: db.rollback() # 确保回滚当前事务 logger.error(f"Failed to create table {table_name} after {max_retries} attempts: {str(e)}") raise Exception(f"创建原始数据表失败: {str(e)}") # 短暂延迟后重试 import time time.sleep(0.1 * (attempt + 1)) return False def create_table_for_account(self, db: Session, account_id: int) -> Dict[str, Any]: """ 为指定账号创建原始数据表 Args: db: 数据库会话 account_id: 账号ID Returns: 操作结果 """ try: # 验证账号是否存在 account = db.query(Account).filter(Account.id == account_id).first() if not account: return { 'success': False, 'message': f'账号ID {account_id} 不存在' } # 创建表 - 现在使用增强的错误处理 table_created = self._ensure_table_exists(db, account_id) if table_created: return { 'success': True, 'message': f'原始数据表 {self._get_table_name(account_id)} 创建成功' } else: return { 'success': False, 'message': f'原始数据表 {self._get_table_name(account_id)} 创建失败' } except Exception as e: logger.error(f"Failed to create table for account {account_id}: {str(e)}") return { 'success': False, 'message': f'创建原始数据表失败: {str(e)}' } def get_by_nyid(self, db: Session, account_id: int, nyid: str) -> List[OriginalData]: """ 根据期数ID获取原始数据 Args: db: 数据库会话 account_id: 账号ID nyid: 期数ID Returns: 原始数据列表 """ try: table_name = self._get_table_name(account_id) # 使用原生SQL查询 query = text(f"SELECT * FROM `{table_name}` WHERE NYID = :nyid") result = db.execute(query, {"nyid": nyid}) return result.fetchall() except Exception as e: logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") return [] def get_by_bfpcode(self, db: Session, account_id: int, bfpcode: str) -> List[OriginalData]: """ 根据前(后)视点名称获取原始数据 Args: db: 数据库会话 account_id: 账号ID bfpcode: 前(后)视点名称 Returns: 原始数据列表 """ try: table_name = self._get_table_name(account_id) query = text(f"SELECT * FROM `{table_name}` WHERE bfpcode = :bfpcode") result = db.execute(query, {"bfpcode": bfpcode}) return result.fetchall() except Exception as e: logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") return [] def get_all_original_data_tables(self, db: Session) -> List[str]: """ 获取所有原始数据分表的表名 Args: db: 数据库会话 Returns: 原始数据表名列表 """ try: inspector = inspect(engine) all_tables = inspector.get_table_names() # 筛选出所有原始数据分表 original_tables = [table for table in all_tables if table.startswith('original_data_')] logger.info(f"Found {len(original_tables)} original data tables: {original_tables}") return original_tables except Exception as e: logger.error(f"Failed to get original data tables: {str(e)}") return [] def search_original_data(self, db: Session, account_id: Optional[int] = None, id: Optional[int] = None, bfpcode: Optional[str] = None, bffb: Optional[str] = None, nyid: Optional[str] = None, bfpl: Optional[str] = None) -> List[Any]: """ 根据多个条件搜索原始数据 如果未提供account_id,则遍历所有分表查询 Args: db: 数据库会话 account_id: 账号ID,可选。不填则查询所有分表 其他查询条件... Returns: 原始数据列表 """ print("搜索原始数据,参数:", { "account_id": account_id, "id": id, "bfpcode": bfpcode, "bffb": bffb, "nyid": nyid, "bfpl": bfpl}) try: # 构建查询条件 conditions = [] params = {} if id is not None: conditions.append("id = :id") params["id"] = id if bfpcode is not None: conditions.append("bfpcode = :bfpcode") params["bfpcode"] = bfpcode if bffb is not None: conditions.append("bffb = :bffb") params["bffb"] = bffb if nyid is not None: conditions.append("NYID = :nyid") params["nyid"] = nyid if bfpl is not None: conditions.append("bfpl = :bfpl") params["bfpl"] = bfpl where_clause = " AND ".join(conditions) if conditions else "1=1" # 如果指定了account_id,只查询该账号的分表 if account_id is not None: table_name = self._get_table_name(account_id) query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") result = db.execute(query, params) return result.fetchall() # 未指定account_id,遍历所有分表 all_results = [] tables = self.get_all_original_data_tables(db) for table_name in tables: try: query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") result = db.execute(query, params) rows = result.fetchall() all_results.extend(rows) logger.info(f"Found {len(rows)} records in table {table_name}") except Exception as e: logger.warning(f"Failed to query table {table_name}: {str(e)}") continue logger.info(f"Total found {len(all_results)} records from {len(tables)} tables") return all_results except Exception as e: logger.error(f"Failed to search original data: {str(e)}") return [] def _check_settlement_exists(self, db: Session, nyid: str) -> bool: """检查期数id沉降数据是否存在""" settlement = db.query(SettlementData).filter(SettlementData.NYID == nyid).first() return settlement is not None def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入原始数据到指定账号的分表 - 性能优化版 使用批量插入替代逐条插入,大幅提升导入速度 Args: db: 数据库会话 data: 数据列表,每条数据必须包含account_id字段 Returns: 操作结果 """ logger = logging.getLogger(__name__) total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } # 获取第一条数据的account_id account_id = data[0].get('account_id') if not account_id: return { 'success': False, 'message': '数据中缺少account_id字段', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # 验证账号是否存在 account = db.query(Account).filter(Account.id == account_id).first() if not account: return { 'success': False, 'message': f'账号ID {account_id} 不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # 确保表存在 - 现在使用增强的错误处理和重试机制 table_created = self._ensure_table_exists(db, account_id) if not table_created: return { 'success': False, 'message': '创建原始数据表失败', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } table_name = self._get_table_name(account_id) # 检查是否已在事务中(避免重复开始事务) in_transaction = db.in_transaction() # 如果不在事务中,才需要手动管理事务 if not in_transaction: for attempt in range(2): # 最多重试1次 try: db.begin() success_count = 0 failed_count = 0 failed_items = [] # 执行数据导入操作 success_count = self._execute_import(db, table_name, data, account_id) db.commit() logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") break except Exception as e: db.rollback() logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") if attempt == 1: # 最后一次重试失败 logger.error("Batch import original data failed after retries") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } else: # 如果已在事务中,直接执行操作(不管理事务) try: success_count = self._execute_import(db, table_name, data, account_id) logger.info(f"Batch import original data completed in existing transaction. Success: {success_count}") except Exception as e: logger.error(f"Batch import failed in existing transaction: {str(e)}") # 抛出异常,让外部处理事务回滚 raise return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int: """执行数据导入操作(抽取的公共逻辑)""" logger.info(f"Starting batch import into {table_name}, total records: {len(data)}") # ===== 性能优化:批量查询沉降数据 ===== # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) from ..models.settlement_data import SettlementData settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() settlement_map = {s.NYID: s for s in settlements} missing_nyids = set(nyid_list) - set(settlement_map.keys()) if missing_nyids: raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}') # ===== 性能优化:批量查询现有原始数据(IN查询)===== # 使用组合键 (NYID, sort) 查询现有数据,过滤重复数据 existing_data = db.query(text("*")).from_statement( text(f"SELECT * FROM `{table_name}` WHERE account_id = :account_id") ).params(account_id=account_id).all() # 使用组合键创建查找表:key = f"{NYID}_{sort}" existing_map = { f"{item[7]}_{item[8]}": item # NYID是第8个字段(索引7),sort是第9个字段(索引8) for item in existing_data } logger.info(f"Found {len(existing_data)} existing records in {table_name}") # ===== 批量处理插入和跳过 ===== to_insert = [] skipped_count = 0 for item_data in data: nyid = str(item_data.get('NYID')) # 统一转换为字符串 sort = item_data.get('sort') # 构建组合键 key = f"{nyid}_{sort}" if key in existing_map: # 数据已存在,跳过 # logger.info(f"Skip existing data: NYID {nyid}, sort {sort}") skipped_count += 1 else: # 记录需要插入的数据 to_insert.append(item_data) # logger.info(f"Skip existing data: {skipped_count} duplicates found so far") logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert") # ===== 执行批量插入 ===== if not to_insert: logger.info("No new records to insert, all data already exists") return 0 # 将数据分组,每组1000条(MySQL默认支持) batch_size = 1000 total_inserted = 0 for i in range(0, len(to_insert), batch_size): batch_data = to_insert[i:i + batch_size] # 构建批量参数 values_list = [] params = {} for idx, item_data in enumerate(batch_data): values_list.append( f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, " f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})" ) params.update({ f"account_id_{idx}": account_id, f"bfpcode_{idx}": item_data.get('bfpcode'), f"mtime_{idx}": item_data.get('mtime'), f"bffb_{idx}": item_data.get('bffb'), f"bfpl_{idx}": item_data.get('bfpl'), f"bfpvalue_{idx}": item_data.get('bfpvalue'), f"NYID_{idx}": item_data.get('NYID'), f"sort_{idx}": item_data.get('sort') }) # 批量插入SQL - 使用字符串拼接(修复TextClause拼接问题) insert_sql = f""" INSERT INTO `{table_name}` (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) VALUES {", ".join(values_list)} """ final_sql = text(insert_sql) db.execute(final_sql, params) total_inserted += len(batch_data) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") logger.info(f"Batch import completed: {total_inserted} records inserted, {skipped_count} duplicates skipped") return total_inserted