Files
railway_cloud/app/services/account.py

104 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from ..models.account import Account
from ..schemas.account import AccountCreate, AccountUpdate, AccountResponse
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
class AccountService:
@staticmethod
def create_account(db: Session, account_data: AccountCreate) -> AccountResponse:
"""创建账号并自动创建对应的原始数据表"""
from .original_data import OriginalDataService
db_account = Account(**account_data.dict())
db.add(db_account)
db.commit()
db.refresh(db_account)
# 创建对应的原始数据表
try:
original_service = OriginalDataService()
original_service.create_table_for_account(db, db_account.id)
logger.info(f"Created original data table for account {db_account.id}")
except Exception as e:
logger.error(f"Failed to create original data table for account {db_account.id}: {str(e)}")
# 注意:这里不回滚账号创建,因为表创建失败不应该影响账号创建
# 可以稍后手动重试创建表
return AccountResponse.from_orm_account(db_account)
@staticmethod
def search_accounts(db: Session, account_id: Optional[int] = None,
username: Optional[str] = None, project_name: Optional[str] = None,
status: Optional[int] = None, today_updated: Optional[int] = None,
yh_id: Optional[str] = None) -> List[AccountResponse]:
"""根据多种条件搜索账号"""
query = db.query(Account)
if account_id is not None:
query = query.filter(Account.id == account_id)
if username is not None:
query = query.filter(Account.username.like(f"%{username}%"))
if project_name is not None:
query = query.filter(Account.project_name.like(f"%{project_name}%"))
if status is not None:
query = query.filter(Account.status == status)
if today_updated is not None:
query = query.filter(Account.today_updated == today_updated)
if yh_id is not None:
query = query.filter(Account.yh_id == yh_id)
accounts = query.all()
return [AccountResponse.from_orm_account(account) for account in accounts]
@staticmethod
def get_account(db: Session, account_id: int) -> Optional[AccountResponse]:
"""根据ID获取账号"""
account = db.query(Account).filter(Account.id == account_id).first()
if account:
return AccountResponse.from_orm_account(account)
return None
@staticmethod
def get_accounts_batch(db: Session, account_ids: List[int]) -> List[Account]:
"""批量根据ID列表获取账号数据返回模型对象用于批量关联"""
if not account_ids:
return []
return db.query(Account).filter(Account.id.in_(account_ids)).all()
@staticmethod
def get_account_by_username(db: Session, username: str) -> Optional[Account]:
"""根据用户名获取账号"""
return db.query(Account).filter(Account.username == username).first()
@staticmethod
def get_accounts(db: Session, skip: int = 0, limit: int = 100) -> List[AccountResponse]:
"""获取账号列表"""
accounts = db.query(Account).offset(skip).limit(limit).all()
return [AccountResponse.from_orm_account(account) for account in accounts]
@staticmethod
def update_account(db: Session, account_id: int, account_data: AccountUpdate) -> Optional[AccountResponse]:
"""更新账号"""
db_account = db.query(Account).filter(Account.id == account_id).first()
if db_account:
update_data = account_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_account, field, value)
db.commit()
db.refresh(db_account)
return AccountResponse.from_orm_account(db_account)
return None
@staticmethod
def delete_account(db: Session, account_id: int) -> bool:
"""删除账号"""
db_account = db.query(Account).filter(Account.id == account_id).first()
if db_account:
db.delete(db_account)
db.commit()
return True
return False