Files
railway_cloud/app/services/account.py
2025-09-27 10:23:00 +08:00

71 lines
2.8 KiB
Python

from sqlalchemy.orm import Session
from ..models.account import Account
from ..schemas.account import AccountCreate, AccountUpdate
from typing import List, Optional
class AccountService:
@staticmethod
def create_account(db: Session, account_data: AccountCreate) -> Account:
"""创建账号"""
db_account = Account(**account_data.dict())
db.add(db_account)
db.commit()
db.refresh(db_account)
return db_account
@staticmethod
def search_accounts(db: Session, account_id: Optional[int] = None,
username: Optional[str] = None, project_name: Optional[str] = None,
status: Optional[int] = None, today_updated: Optional[int] = None) -> List[Account]:
"""根据多种条件搜索账号"""
query = db.query(Account)
if account_id is not None:
query = query.filter(Account.id == account_id)
if username is not None:
query = query.filter(Account.username.like(f"%{username}%"))
if project_name is not None:
query = query.filter(Account.project_name.like(f"%{project_name}%"))
if status is not None:
query = query.filter(Account.status == status)
if today_updated is not None:
query = query.filter(Account.today_updated == today_updated)
return query.all()
@staticmethod
def get_account(db: Session, account_id: int) -> Optional[Account]:
"""根据ID获取账号"""
return db.query(Account).filter(Account.id == account_id).first()
@staticmethod
def get_account_by_username(db: Session, username: str) -> Optional[Account]:
"""根据用户名获取账号"""
return db.query(Account).filter(Account.username == username).first()
@staticmethod
def get_accounts(db: Session, skip: int = 0, limit: int = 100) -> List[Account]:
"""获取账号列表"""
return db.query(Account).offset(skip).limit(limit).all()
@staticmethod
def update_account(db: Session, account_id: int, account_data: AccountUpdate) -> Optional[Account]:
"""更新账号"""
db_account = db.query(Account).filter(Account.id == account_id).first()
if db_account:
update_data = account_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_account, field, value)
db.commit()
db.refresh(db_account)
return db_account
@staticmethod
def delete_account(db: Session, account_id: int) -> bool:
"""删除账号"""
db_account = db.query(Account).filter(Account.id == account_id).first()
if db_account:
db.delete(db_account)
db.commit()
return True
return False