from sqlalchemy.orm import Session from sqlalchemy import text, inspect from typing import List, Optional, Dict, Any from ..models.original_data import OriginalData, get_original_data_model, get_table_name from .base import BaseService from ..models.settlement_data import SettlementData from ..models.account import Account from ..core.database import engine import logging logger = logging.getLogger(__name__) class OriginalDataService(BaseService[OriginalData]): def __init__(self): super().__init__(OriginalData) def _get_table_name(self, account_id: int) -> str: """获取原始数据表名""" return get_table_name(account_id) def _ensure_table_exists(self, db: Session, account_id: int) -> bool: """ 确保指定账号的原始数据表存在,不存在则创建 Args: db: 数据库会话 account_id: 账号ID Returns: bool: 表是否存在或创建成功 """ table_name = self._get_table_name(account_id) inspector = inspect(engine) # 检查表是否存在 if table_name in inspector.get_table_names(): logger.info(f"Table {table_name} already exists") return True # 表不存在,创建表 try: create_table_sql = f""" CREATE TABLE `{table_name}` ( `id` INT AUTO_INCREMENT PRIMARY KEY, `account_id` INT NOT NULL COMMENT '账号ID', `bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称', `mtime` DATETIME NOT NULL COMMENT '测点观测时间', `bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符', `bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)', `bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)', `NYID` VARCHAR(100) NOT NULL COMMENT '期数id', `sort` INT COMMENT '序号', INDEX `idx_nyid` (`NYID`), INDEX `idx_account_id` (`account_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}' """ db.execute(text(create_table_sql)) db.commit() logger.info(f"Table {table_name} created successfully") return True except Exception as e: db.rollback() logger.error(f"Failed to create table {table_name}: {str(e)}") raise Exception(f"创建原始数据表失败: {str(e)}") def create_table_for_account(self, db: Session, account_id: int) -> Dict[str, Any]: """ 为指定账号创建原始数据表 Args: db: 数据库会话 account_id: 账号ID Returns: 操作结果 """ try: # 验证账号是否存在 account = db.query(Account).filter(Account.id == account_id).first() if not account: return { 'success': False, 'message': f'账号ID {account_id} 不存在' } # 创建表 self._ensure_table_exists(db, account_id) return { 'success': True, 'message': f'原始数据表 {self._get_table_name(account_id)} 创建成功' } except Exception as e: logger.error(f"Failed to create table for account {account_id}: {str(e)}") return { 'success': False, 'message': f'创建原始数据表失败: {str(e)}' } def get_by_nyid(self, db: Session, account_id: int, nyid: str) -> List[OriginalData]: """ 根据期数ID获取原始数据 Args: db: 数据库会话 account_id: 账号ID nyid: 期数ID Returns: 原始数据列表 """ try: table_name = self._get_table_name(account_id) # 使用原生SQL查询 query = text(f"SELECT * FROM `{table_name}` WHERE NYID = :nyid") result = db.execute(query, {"nyid": nyid}) return result.fetchall() except Exception as e: logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") return [] def get_by_bfpcode(self, db: Session, account_id: int, bfpcode: str) -> List[OriginalData]: """ 根据前(后)视点名称获取原始数据 Args: db: 数据库会话 account_id: 账号ID bfpcode: 前(后)视点名称 Returns: 原始数据列表 """ try: table_name = self._get_table_name(account_id) query = text(f"SELECT * FROM `{table_name}` WHERE bfpcode = :bfpcode") result = db.execute(query, {"bfpcode": bfpcode}) return result.fetchall() except Exception as e: logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") return [] def get_all_original_data_tables(self, db: Session) -> List[str]: """ 获取所有原始数据分表的表名 Args: db: 数据库会话 Returns: 原始数据表名列表 """ try: inspector = inspect(engine) all_tables = inspector.get_table_names() # 筛选出所有原始数据分表 original_tables = [table for table in all_tables if table.startswith('original_data_')] logger.info(f"Found {len(original_tables)} original data tables: {original_tables}") return original_tables except Exception as e: logger.error(f"Failed to get original data tables: {str(e)}") return [] def search_original_data(self, db: Session, account_id: Optional[int] = None, id: Optional[int] = None, bfpcode: Optional[str] = None, bffb: Optional[str] = None, nyid: Optional[str] = None, bfpl: Optional[str] = None) -> List[Any]: """ 根据多个条件搜索原始数据 如果未提供account_id,则遍历所有分表查询 Args: db: 数据库会话 account_id: 账号ID,可选。不填则查询所有分表 其他查询条件... Returns: 原始数据列表 """ print("搜索原始数据,参数:", { "account_id": account_id, "id": id, "bfpcode": bfpcode, "bffb": bffb, "nyid": nyid, "bfpl": bfpl}) try: # 构建查询条件 conditions = [] params = {} if id is not None: conditions.append("id = :id") params["id"] = id if bfpcode is not None: conditions.append("bfpcode = :bfpcode") params["bfpcode"] = bfpcode if bffb is not None: conditions.append("bffb = :bffb") params["bffb"] = bffb if nyid is not None: conditions.append("NYID = :nyid") params["nyid"] = nyid if bfpl is not None: conditions.append("bfpl = :bfpl") params["bfpl"] = bfpl where_clause = " AND ".join(conditions) if conditions else "1=1" # 如果指定了account_id,只查询该账号的分表 if account_id is not None: table_name = self._get_table_name(account_id) query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") result = db.execute(query, params) return result.fetchall() # 未指定account_id,遍历所有分表 all_results = [] tables = self.get_all_original_data_tables(db) for table_name in tables: try: query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") result = db.execute(query, params) rows = result.fetchall() all_results.extend(rows) logger.info(f"Found {len(rows)} records in table {table_name}") except Exception as e: logger.warning(f"Failed to query table {table_name}: {str(e)}") continue logger.info(f"Total found {len(all_results)} records from {len(tables)} tables") return all_results except Exception as e: logger.error(f"Failed to search original data: {str(e)}") return [] def _check_settlement_exists(self, db: Session, nyid: str) -> bool: """检查期数id沉降数据是否存在""" settlement = db.query(SettlementData).filter(SettlementData.NYID == nyid).first() return settlement is not None def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: """ 批量导入原始数据到指定账号的分表,直接新增,无需检查重复 支持事务回滚,失败时重试一次 Args: db: 数据库会话 data: 数据列表,每条数据必须包含account_id字段 Returns: 操作结果 """ logger = logging.getLogger(__name__) total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } # 获取第一条数据的account_id account_id = data[0].get('account_id') if not account_id: return { 'success': False, 'message': '数据中缺少account_id字段', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # 验证账号是否存在 account = db.query(Account).filter(Account.id == account_id).first() if not account: return { 'success': False, 'message': f'账号ID {account_id} 不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # 确保表存在 try: self._ensure_table_exists(db, account_id) except Exception as e: return { 'success': False, 'message': f'创建原始数据表失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } table_name = self._get_table_name(account_id) for attempt in range(2): # 最多重试1次 try: db.begin() success_count = 0 failed_count = 0 failed_items = [] nyid = data[0].get('NYID') # 检查该期数数据是否已存在 check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] if is_exists > 0: db.rollback() return { 'success': True, 'message': '数据已存在', 'total_count': 0, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } for item_data in data: try: # 判断期数id是否存在 settlement = self._check_settlement_exists(db, item_data.get('NYID')) if not settlement: logger.error(f"Settlement {item_data.get('NYID')} not found") raise Exception(f"Settlement {item_data.get('NYID')} not found") # 构建插入SQL insert_sql = text(f""" INSERT INTO `{table_name}` (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) VALUES (:account_id, :bfpcode, :mtime, :bffb, :bfpl, :bfpvalue, :NYID, :sort) """) db.execute(insert_sql, { "account_id": account_id, "bfpcode": item_data.get('bfpcode'), "mtime": item_data.get('mtime'), "bffb": item_data.get('bffb'), "bfpl": item_data.get('bfpl'), "bfpvalue": item_data.get('bfpvalue'), "NYID": item_data.get('NYID'), "sort": item_data.get('sort') }) logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')} in table {table_name}") success_count += 1 except Exception as e: failed_count += 1 failed_items.append({ 'data': item_data, 'error': str(e) }) logger.error(f"Failed to process original data {item_data.get('bfpcode')}-{item_data.get('NYID')}: {str(e)}") raise e db.commit() logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") break except Exception as e: db.rollback() logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") if attempt == 1: # 最后一次重试失败 logger.error("Batch import original data failed after retries") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items }