Files
railway_cloud/app/services/account.py

187 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from ..models.account import Account
from ..schemas.account import AccountCreate, AccountUpdate, AccountResponse
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
class AccountService:
@staticmethod
def create_account(db: Session, account_data: AccountCreate) -> AccountResponse:
"""创建账号并自动创建对应的原始数据表"""
from .original_data import OriginalDataService
db_account = Account(**account_data.dict())
db.add(db_account)
db.commit()
db.refresh(db_account)
# 创建对应的原始数据表
try:
original_service = OriginalDataService()
original_service.create_table_for_account(db, db_account.id)
logger.info(f"Created original data table for account {db_account.id}")
except Exception as e:
logger.error(f"Failed to create original data table for account {db_account.id}: {str(e)}")
# 注意:这里不回滚账号创建,因为表创建失败不应该影响账号创建
# 可以稍后手动重试创建表
return AccountResponse.from_orm_account(db_account)
@staticmethod
def search_accounts(db: Session, account_id: Optional[int] = None,
username: Optional[str] = None, project_name: Optional[str] = None,
status: Optional[int] = None, today_updated: Optional[int] = None,
yh_id: Optional[str] = None, cl_name: Optional[str] = None) -> List[AccountResponse]:
"""根据多种条件搜索账号"""
query = db.query(Account)
if account_id is not None:
query = query.filter(Account.id == account_id)
if username is not None:
query = query.filter(Account.username.like(f"%{username}%"))
if project_name is not None:
query = query.filter(Account.project_name.like(f"%{project_name}%"))
if status is not None:
query = query.filter(Account.status == status)
if today_updated is not None:
query = query.filter(Account.today_updated == today_updated)
if yh_id is not None:
query = query.filter(Account.yh_id == yh_id)
if cl_name is not None:
query = query.filter(Account.cl_name.like(f"%{cl_name}%"))
accounts = query.all()
return [AccountResponse.from_orm_account(account) for account in accounts]
@staticmethod
def get_account(db: Session, account_id: int) -> Optional[AccountResponse]:
"""根据ID获取账号"""
account = db.query(Account).filter(Account.id == account_id).first()
if account:
return AccountResponse.from_orm_account(account)
return None
@staticmethod
def get_accounts_batch(db: Session, account_ids: List[int]) -> List[Account]:
"""批量根据ID列表获取账号数据返回模型对象用于批量关联"""
if not account_ids:
return []
return db.query(Account).filter(Account.id.in_(account_ids)).all()
@staticmethod
def get_account_by_username(db: Session, username: str) -> Optional[Account]:
"""根据用户名获取账号"""
return db.query(Account).filter(Account.username == username).first()
@staticmethod
def get_accounts(db: Session, skip: int = 0, limit: int = 100) -> List[AccountResponse]:
"""获取账号列表"""
accounts = db.query(Account).offset(skip).limit(limit).all()
return [AccountResponse.from_orm_account(account) for account in accounts]
@staticmethod
def update_account(db: Session, account_id: int, account_data: AccountUpdate) -> Optional[AccountResponse]:
"""更新账号"""
db_account = db.query(Account).filter(Account.id == account_id).first()
if db_account:
update_data = account_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_account, field, value)
db.commit()
db.refresh(db_account)
return AccountResponse.from_orm_account(db_account)
return None
@staticmethod
def delete_account(db: Session, account_id: int) -> bool:
"""删除账号"""
db_account = db.query(Account).filter(Account.id == account_id).first()
if db_account:
db.delete(db_account)
db.commit()
return True
return False
@staticmethod
def get_accounts_by_linecode(db: Session, linecode: str) -> List[Account]:
"""
通过水准线路编码查询账号信息(优化版,减少数据库查询次数)
业务逻辑:
1. 根据linecode在水准数据表查询最新的NYID
2. 根据NYID在沉降数据表批量查询所有point_id去重
3. 根据point_id列表在观测点表批量查询所有section_id去重
4. 根据section_id列表在断面表批量查询所有account_id去重
5. 根据account_id列表在账号表批量查询账号信息
使用IN查询避免循环大幅提升性能
"""
from ..models.level_data import LevelData
from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint
from ..models.section_data import SectionData
try:
logger.info(f"开始通过linecode={linecode}查询账号信息")
# 1. 根据linecode查询最新的水准数据按NYID降序取第一条
level_data = db.query(LevelData).filter(
LevelData.linecode == linecode
).order_by(LevelData.NYID.desc()).first()
if not level_data:
logger.warning(f"未找到linecode={linecode}对应的水准数据")
return []
nyid = level_data.NYID
logger.info(f"找到最新期数NYID={nyid}")
# 2. 根据NYID批量查询沉降数据提取所有point_id去重
settlement_list = db.query(SettlementData.point_id).filter(
SettlementData.NYID == nyid
).distinct().all()
if not settlement_list:
logger.warning(f"未找到NYID={nyid}对应的沉降数据")
return []
point_ids = [s.point_id for s in settlement_list if s.point_id]
logger.info(f"找到{len(point_ids)}个观测点ID")
# 3. 根据point_id列表批量查询观测点数据提取所有section_id去重
checkpoint_list = db.query(Checkpoint.section_id).filter(
Checkpoint.point_id.in_(point_ids)
).distinct().all()
if not checkpoint_list:
logger.warning(f"未找到对应的观测点数据")
return []
section_ids = [c.section_id for c in checkpoint_list if c.section_id]
logger.info(f"找到{len(section_ids)}个断面ID")
# 4. 根据section_id列表批量查询断面数据提取所有account_id去重
section_list = db.query(SectionData.account_id).filter(
SectionData.section_id.in_(section_ids)
).distinct().all()
if not section_list:
logger.warning(f"未找到对应的断面数据")
return []
account_ids = [s.account_id for s in section_list if s.account_id]
logger.info(f"找到{len(account_ids)}个账号ID")
# 5. 根据account_id列表批量查询账号信息
accounts = db.query(Account).filter(
Account.id.in_(account_ids)
).all()
logger.info(f"查询完成,共找到{len(accounts)}个账号")
return accounts
except Exception as e:
logger.error(f"通过linecode={linecode}查询账号失败: {str(e)}", exc_info=True)
raise e