Files
railway_cloud/app/api/account.py

231 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from ..core.database import get_db
from ..core.response_code import ResponseCode, ResponseMessage
from ..schemas.account import (
AccountCreate, AccountUpdate, AccountResponse,
AccountListRequest, AccountGetRequest, AccountUpdateRequest, AccountDeleteRequest,
AccountApiResponse, AccountListResponse,AccountGetRequestYH
)
from ..services.account import AccountService
import json
from typing import List, Optional
from urllib.error import HTTPError, URLError
from urllib import request as urllib_request
from urllib.parse import urlencode
from socket import timeout
router = APIRouter(prefix="/accounts", tags=["账号管理"])
@router.post("/create", response_model=AccountApiResponse, status_code=status.HTTP_201_CREATED)
def create_account(account: AccountCreate, db: Session = Depends(get_db)):
"""创建账号"""
# 检查账号是否已存在
existing_account = AccountService.get_account_by_username(db, account.username)
if existing_account:
return AccountApiResponse(
code=ResponseCode.ACCOUNT_EXISTS,
message=ResponseMessage.ACCOUNT_EXISTS,
data=None
)
account_response = AccountService.create_account(db, account)
return AccountApiResponse(
code=ResponseCode.SUCCESS,
message="账号创建成功",
data=account_response.dict()
)
@router.post("/list", response_model=AccountListResponse)
def get_accounts(request: AccountListRequest, db: Session = Depends(get_db)):
"""获取账号列表"""
accounts = AccountService.get_accounts(db, skip=request.skip, limit=request.limit)
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts),
data=accounts
)
@router.post("/get", response_model=AccountListResponse)
def get_account(request: AccountGetRequest, db: Session = Depends(get_db)):
"""根据多种条件查询账号"""
accounts = AccountService.search_accounts(
db,
account_id=request.account_id,
username=request.username,
project_name=request.project_name,
status=request.status,
today_updated=request.today_updated,
yh_id=request.yh_id,
cl_name=request.cl_name
)
if not accounts:
return AccountListResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
total=0,
data=[]
)
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts),
data=accounts
)
# 宇恒一号特定查询接口
@router.post("/get/yh", response_model=AccountListResponse)
def get_account(request: AccountGetRequestYH, db: Session = Depends(get_db)):
"""根据多种条件查询账号"""
accounts = AccountService.search_accounts(
db,
project_name=request.project_name,
yh_id=request.yh_id
)
if not accounts:
return AccountListResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
total=0,
data=[]
)
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts),
data=accounts
)
@router.post("/update", response_model=AccountApiResponse)
def update_account(request: AccountUpdateRequest, db: Session = Depends(get_db)):
"""更新账号"""
account = AccountService.update_account(db, request.account_id, request.account_data)
if not account:
return AccountApiResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
data=None
)
return AccountApiResponse(
code=ResponseCode.SUCCESS,
message="账号更新成功",
data=account.dict()
)
@router.post("/delete", response_model=AccountApiResponse)
def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db)):
"""删除账号"""
if not AccountService.delete_account(db, request.account_id):
return AccountApiResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
data=None
)
return AccountApiResponse(
code=ResponseCode.SUCCESS,
message="账号删除成功",
data=None
)
# 获取今日上传的数据接口
@router.post("/get_uplaod_data", response_model=AccountListResponse)
def get_uplaod_data(request: AccountGetRequest, db: Session = Depends(get_db)):
"""根据多种条件查询账号,并合并外部接口的 is_ok 字段(仅返回 today_data 中存在的账号)"""
# 1. 从数据库查询账号列表(原有逻辑不变)
accounts = AccountService.search_accounts(
db,
account_id=request.account_id,
username=request.username,
project_name=request.project_name,
status=request.status,
today_updated=request.today_updated,
yh_id=request.yh_id,
cl_name=request.cl_name
)
# 2. 调用外部接口构建 user_is_ok_map替换为 urllib 实现,修复命名冲突和超时异常)
user_is_ok_map = {}
api_url = "https://engineering.yuxindazhineng.com/index/index/get_over_data"
payload = {"user_id": "68c0dbfdb7cbcd616e7c5ab5"}
try:
# 步骤1将表单 payload 转为 URL 编码的字节流urllib POST 要求数据为字节流)
payload_bytes = urlencode(payload).encode("utf-8")
# 步骤2构造 Request 对象(使用重命名后的 urllib_request避免冲突
req = urllib_request.Request(
url=api_url,
data=payload_bytes,
method="POST" # 显式指定 POST 方法
)
# 步骤3发送请求并读取响应设置 10 秒超时,避免接口挂起)
with urllib_request.urlopen(req, timeout=10) as resp:
# 读取响应字节流并解码为 UTF-8 字符串
response_str = resp.read().decode("utf-8")
# 步骤4将 JSON 字符串解析为字典
api_response = json.loads(response_str)
# 步骤5验证接口返回格式并构建 user_is_ok_map 映射
if api_response.get("code") == 0:
today_data = api_response.get("data", []) # 给默认值,避免 KeyError
for item in today_data:
# 安全获取字段并校验类型
if 'user_name' in item and 'is_ok' in item:
user_is_ok_map[item['user_name']] = item['is_ok']
except HTTPError as e:
# 捕获 HTTP 状态码错误4xx/5xx
print(f"外部接口 HTTP 错误:{e.code} - {e.reason}")
except TimeoutError: # 修复:正确的超时异常名称(首字母大写,原 timeout 会未定义报错)
# 捕获请求超时异常
print(f"外部接口调用超时(超过 10 秒)")
except URLError as e:
# 捕获 URL 解析错误、网络连接错误等
print(f"外部接口网络错误:{e.reason}")
except json.JSONDecodeError:
# 捕获非合法 JSON 格式响应
print(f"外部接口返回数据格式错误,非合法 JSON 字符串")
except Exception as e:
# 捕获其他未知异常
print(f"外部接口处理未知异常:{str(e)}")
# 3. 关键修改:仅保留 today_data 中存在的账号(核心过滤逻辑)
accounts_with_is_ok = []
for account in accounts:
# 步骤1将 AccountResponse 对象转为字典Pydantic 模型自带 dict() 方法)
account_dict = account.dict()
# 步骤2获取当前账号 username判断是否在 user_is_ok_map 中(不存在则跳过)
current_username = account_dict.get("username", "")
if current_username not in user_is_ok_map:
continue # 核心:过滤掉 today_data 中没有的账号
# 步骤3给字典添加 is_ok 字段(仅处理存在的账号,无需默认值 0
account_dict['is_ok'] = user_is_ok_map[current_username]
# 步骤4将处理后的字典加入新列表
accounts_with_is_ok.append(account_dict)
# 4. 处理空结果返回(原有逻辑不变)
if not accounts_with_is_ok:
return AccountListResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
total=0,
data=[]
)
# 5. 正常返回:数据传入新列表 accounts_with_is_ok仅包含 today_data 中的账号)
# print(accounts_with_is_ok)
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts_with_is_ok),
data=accounts_with_is_ok
)