from sqlalchemy.orm import Session from ..models.account import Account from ..schemas.account import AccountCreate, AccountUpdate, AccountResponse from typing import List, Optional import logging logger = logging.getLogger(__name__) class AccountService: @staticmethod def create_account(db: Session, account_data: AccountCreate) -> AccountResponse: """创建账号并自动创建对应的原始数据表""" from .original_data import OriginalDataService db_account = Account(**account_data.dict()) db.add(db_account) db.commit() db.refresh(db_account) # 创建对应的原始数据表 try: original_service = OriginalDataService() original_service.create_table_for_account(db, db_account.id) logger.info(f"Created original data table for account {db_account.id}") except Exception as e: logger.error(f"Failed to create original data table for account {db_account.id}: {str(e)}") # 注意:这里不回滚账号创建,因为表创建失败不应该影响账号创建 # 可以稍后手动重试创建表 return AccountResponse.from_orm_account(db_account) @staticmethod def search_accounts(db: Session, account_id: Optional[int] = None, username: Optional[str] = None, project_name: Optional[str] = None, status: Optional[int] = None, today_updated: Optional[int] = None, yh_id: Optional[str] = None, cl_name: Optional[str] = None) -> List[AccountResponse]: """根据多种条件搜索账号""" query = db.query(Account) if account_id is not None: query = query.filter(Account.id == account_id) if username is not None: query = query.filter(Account.username.like(f"%{username}%")) if project_name is not None: query = query.filter(Account.project_name.like(f"%{project_name}%")) if status is not None: query = query.filter(Account.status == status) if today_updated is not None: query = query.filter(Account.today_updated == today_updated) if yh_id is not None: query = query.filter(Account.yh_id == yh_id) if cl_name is not None: query = query.filter(Account.cl_name.like(f"%{cl_name}%")) accounts = query.all() return [AccountResponse.from_orm_account(account) for account in accounts] @staticmethod def get_account(db: Session, account_id: int) -> Optional[AccountResponse]: """根据ID获取账号""" account = db.query(Account).filter(Account.id == account_id).first() if account: return AccountResponse.from_orm_account(account) return None @staticmethod def get_accounts_batch(db: Session, account_ids: List[int]) -> List[Account]: """批量根据ID列表获取账号数据(返回模型对象,用于批量关联)""" if not account_ids: return [] return db.query(Account).filter(Account.id.in_(account_ids)).all() @staticmethod def get_account_by_username(db: Session, username: str) -> Optional[Account]: """根据用户名获取账号""" return db.query(Account).filter(Account.username == username).first() @staticmethod def get_accounts(db: Session, skip: int = 0, limit: int = 100) -> List[AccountResponse]: """获取账号列表""" accounts = db.query(Account).offset(skip).limit(limit).all() return [AccountResponse.from_orm_account(account) for account in accounts] @staticmethod def update_account(db: Session, account_id: int, account_data: AccountUpdate) -> Optional[AccountResponse]: """更新账号""" db_account = db.query(Account).filter(Account.id == account_id).first() if db_account: update_data = account_data.dict(exclude_unset=True) for field, value in update_data.items(): setattr(db_account, field, value) db.commit() db.refresh(db_account) return AccountResponse.from_orm_account(db_account) return None @staticmethod def delete_account(db: Session, account_id: int) -> bool: """删除账号""" db_account = db.query(Account).filter(Account.id == account_id).first() if db_account: db.delete(db_account) db.commit() return True return False @staticmethod def get_accounts_by_linecode(db: Session, linecode: str) -> List[Account]: """ 通过水准线路编码查询账号信息(优化版,减少数据库查询次数) 业务逻辑: 1. 根据linecode在水准数据表查询最新的NYID 2. 根据NYID在沉降数据表批量查询所有point_id(去重) 3. 根据point_id列表在观测点表批量查询所有section_id(去重) 4. 根据section_id列表在断面表批量查询所有account_id(去重) 5. 根据account_id列表在账号表批量查询账号信息 使用IN查询避免循环,大幅提升性能 """ from ..models.level_data import LevelData from ..models.settlement_data import SettlementData from ..models.checkpoint import Checkpoint from ..models.section_data import SectionData try: logger.info(f"开始通过linecode={linecode}查询账号信息") # 1. 根据linecode查询最新的水准数据(按NYID降序取第一条) level_data = db.query(LevelData).filter( LevelData.linecode == linecode ).order_by(LevelData.NYID.desc()).first() if not level_data: logger.warning(f"未找到linecode={linecode}对应的水准数据") return [] nyid = level_data.NYID logger.info(f"找到最新期数NYID={nyid}") # 2. 根据NYID批量查询沉降数据,提取所有point_id(去重) settlement_list = db.query(SettlementData.point_id).filter( SettlementData.NYID == nyid ).distinct().all() if not settlement_list: logger.warning(f"未找到NYID={nyid}对应的沉降数据") return [] point_ids = [s.point_id for s in settlement_list if s.point_id] logger.info(f"找到{len(point_ids)}个观测点ID") # 3. 根据point_id列表批量查询观测点数据,提取所有section_id(去重) checkpoint_list = db.query(Checkpoint.section_id).filter( Checkpoint.point_id.in_(point_ids) ).distinct().all() if not checkpoint_list: logger.warning(f"未找到对应的观测点数据") return [] section_ids = [c.section_id for c in checkpoint_list if c.section_id] logger.info(f"找到{len(section_ids)}个断面ID") # 4. 根据section_id列表批量查询断面数据,提取所有account_id(去重) section_list = db.query(SectionData.account_id).filter( SectionData.section_id.in_(section_ids) ).distinct().all() if not section_list: logger.warning(f"未找到对应的断面数据") return [] account_ids = [s.account_id for s in section_list if s.account_id] logger.info(f"找到{len(account_ids)}个账号ID") # 5. 根据account_id列表批量查询账号信息 accounts = db.query(Account).filter( Account.id.in_(account_ids) ).all() logger.info(f"查询完成,共找到{len(accounts)}个账号") return accounts except Exception as e: logger.error(f"通过linecode={linecode}查询账号失败: {str(e)}", exc_info=True) raise e