Files
railway_cloud/app/api/account.py
2026-02-03 14:47:13 +08:00

191 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from ..core.database import get_db
from ..core.response_code import ResponseCode, ResponseMessage
from ..schemas.account import (
AccountCreate, AccountUpdate, AccountResponse,
AccountListRequest, AccountGetRequest, AccountUpdateRequest, AccountDeleteRequest,
AccountApiResponse, AccountListResponse,AccountGetRequestYH
)
from ..services.account import AccountService
import requests
router = APIRouter(prefix="/accounts", tags=["账号管理"])
@router.post("/create", response_model=AccountApiResponse, status_code=status.HTTP_201_CREATED)
def create_account(account: AccountCreate, db: Session = Depends(get_db)):
"""创建账号"""
# 检查账号是否已存在
existing_account = AccountService.get_account_by_username(db, account.username)
if existing_account:
return AccountApiResponse(
code=ResponseCode.ACCOUNT_EXISTS,
message=ResponseMessage.ACCOUNT_EXISTS,
data=None
)
account_response = AccountService.create_account(db, account)
return AccountApiResponse(
code=ResponseCode.SUCCESS,
message="账号创建成功",
data=account_response.dict()
)
@router.post("/list", response_model=AccountListResponse)
def get_accounts(request: AccountListRequest, db: Session = Depends(get_db)):
"""获取账号列表"""
accounts = AccountService.get_accounts(db, skip=request.skip, limit=request.limit)
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts),
data=accounts
)
@router.post("/get", response_model=AccountListResponse)
def get_account(request: AccountGetRequest, db: Session = Depends(get_db)):
"""根据多种条件查询账号"""
accounts = AccountService.search_accounts(
db,
account_id=request.account_id,
username=request.username,
project_name=request.project_name,
status=request.status,
today_updated=request.today_updated,
yh_id=request.yh_id,
cl_name=request.cl_name
)
if not accounts:
return AccountListResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
total=0,
data=[]
)
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts),
data=accounts
)
# 宇恒一号特定查询接口
@router.post("/get/yh", response_model=AccountListResponse)
def get_account(request: AccountGetRequestYH, db: Session = Depends(get_db)):
"""根据多种条件查询账号"""
accounts = AccountService.search_accounts(
db,
project_name=request.project_name,
yh_id=request.yh_id
)
if not accounts:
return AccountListResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
total=0,
data=[]
)
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts),
data=accounts
)
@router.post("/update", response_model=AccountApiResponse)
def update_account(request: AccountUpdateRequest, db: Session = Depends(get_db)):
"""更新账号"""
account = AccountService.update_account(db, request.account_id, request.account_data)
if not account:
return AccountApiResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
data=None
)
return AccountApiResponse(
code=ResponseCode.SUCCESS,
message="账号更新成功",
data=account.dict()
)
@router.post("/delete", response_model=AccountApiResponse)
def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db)):
"""删除账号"""
if not AccountService.delete_account(db, request.account_id):
return AccountApiResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
data=None
)
return AccountApiResponse(
code=ResponseCode.SUCCESS,
message="账号删除成功",
data=None
)
# 获取今日上传的数据接口
@router.post("/get_uplaod_data", response_model=AccountListResponse)
def get_account(request: AccountGetRequest, db: Session = Depends(get_db)):
"""根据多种条件查询账号,并合并外部接口的 is_ok 字段"""
# 1. 从数据库查询账号列表
accounts = AccountService.search_accounts(
db,
account_id=request.account_id,
username=request.username,
project_name=request.project_name,
status=request.status,
today_updated=request.today_updated,
yh_id=request.yh_id,
cl_name=request.cl_name
)
# 2. 调用外部接口获取 today_data构建 user_name -> is_ok 映射
user_is_ok_map = {} # 存储映射关系,提升匹配效率
url = "https://engineering.yuxindazhineng.com/index/index/get_over_data"
payload = {"user_id": "68c0dbfdb7cbcd616e7c5ab5"}
try:
# 发送POST请求并解析JSON响应
response = requests.request("POST", url, data=payload).json()
if response['code'] == 0:
today_data = response['data']
# 遍历 today_data构建映射字典
for item in today_data:
# 校验字段是否存在,避免 KeyError 异常
if 'user_name' in item and 'is_ok' in item:
user_name = item['user_name']
is_ok = item['is_ok']
user_is_ok_map[user_name] = is_ok # 键user_nameis_ok
except Exception as e:
# 捕获接口调用/解析异常,不阻断核心业务(数据库查询结果正常返回)
print(f"外部接口调用失败或数据解析异常:{str(e)}")
# 3. 遍历 accounts给每个账号对象添加 is_ok 字段(关键步骤)
for account in accounts:
# 匹配 username数据库和 user_name外部接口
current_username = account.username
# 从映射字典中获取 is_ok无匹配则默认赋值 0可根据业务调整默认值
is_ok_value = user_is_ok_map.get(current_username, 0)
# 给 account 对象添加 is_ok 字段(兼容 Pydantic 模型,需满足对应配置)
account.is_ok = is_ok_value
# 4. 处理空结果返回
if not accounts:
return AccountListResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
total=0,
data=[]
)
# 5. 正常返回结果(包含 is_ok 字段的 accounts
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts),
data=accounts
)