from sqlalchemy.orm import Session from ..models.account import Account from ..schemas.account import AccountCreate, AccountUpdate, AccountResponse from typing import List, Optional import logging logger = logging.getLogger(__name__) class AccountService: @staticmethod def create_account(db: Session, account_data: AccountCreate) -> AccountResponse: """创建账号并自动创建对应的原始数据表""" from .original_data import OriginalDataService db_account = Account(**account_data.dict()) db.add(db_account) db.commit() db.refresh(db_account) # 创建对应的原始数据表 try: original_service = OriginalDataService() original_service.create_table_for_account(db, db_account.id) logger.info(f"Created original data table for account {db_account.id}") except Exception as e: logger.error(f"Failed to create original data table for account {db_account.id}: {str(e)}") # 注意:这里不回滚账号创建,因为表创建失败不应该影响账号创建 # 可以稍后手动重试创建表 return AccountResponse.from_orm_account(db_account) @staticmethod def search_accounts(db: Session, account_id: Optional[int] = None, username: Optional[str] = None, project_name: Optional[str] = None, status: Optional[int] = None, today_updated: Optional[int] = None, yh_id: Optional[str] = None, cl_name: Optional[str] = None) -> List[AccountResponse]: """根据多种条件搜索账号""" query = db.query(Account) if account_id is not None: query = query.filter(Account.id == account_id) if username is not None: query = query.filter(Account.username.like(f"%{username}%")) if project_name is not None: query = query.filter(Account.project_name.like(f"%{project_name}%")) if status is not None: query = query.filter(Account.status == status) if today_updated is not None: query = query.filter(Account.today_updated == today_updated) if yh_id is not None: query = query.filter(Account.yh_id == yh_id) if cl_name is not None: query = query.filter(Account.cl_name.like(f"%{cl_name}%")) accounts = query.all() return [AccountResponse.from_orm_account(account) for account in accounts] @staticmethod def get_account(db: Session, account_id: int) -> Optional[AccountResponse]: """根据ID获取账号""" account = db.query(Account).filter(Account.id == account_id).first() if account: return AccountResponse.from_orm_account(account) return None @staticmethod def get_accounts_batch(db: Session, account_ids: List[int]) -> List[Account]: """批量根据ID列表获取账号数据(返回模型对象,用于批量关联)""" if not account_ids: return [] return db.query(Account).filter(Account.id.in_(account_ids)).all() @staticmethod def get_account_by_username(db: Session, username: str) -> Optional[Account]: """根据用户名获取账号""" return db.query(Account).filter(Account.username == username).first() @staticmethod def get_accounts(db: Session, skip: int = 0, limit: int = 100) -> List[AccountResponse]: """获取账号列表""" accounts = db.query(Account).offset(skip).limit(limit).all() return [AccountResponse.from_orm_account(account) for account in accounts] @staticmethod def update_account(db: Session, account_id: int, account_data: AccountUpdate) -> Optional[AccountResponse]: """更新账号""" db_account = db.query(Account).filter(Account.id == account_id).first() if db_account: update_data = account_data.dict(exclude_unset=True) for field, value in update_data.items(): setattr(db_account, field, value) db.commit() db.refresh(db_account) return AccountResponse.from_orm_account(db_account) return None @staticmethod def delete_account(db: Session, account_id: int) -> bool: """删除账号""" db_account = db.query(Account).filter(Account.id == account_id).first() if db_account: db.delete(db_account) db.commit() return True return False