from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List from ..core.database import get_db from ..core.response_code import ResponseCode, ResponseMessage from ..schemas.account import ( AccountCreate, AccountUpdate, AccountResponse, AccountListRequest, AccountGetRequest, AccountUpdateRequest, AccountDeleteRequest, AccountApiResponse, AccountListResponse,AccountGetRequestYH ) from ..services.account import AccountService import json from typing import List, Optional from urllib.error import HTTPError, URLError from urllib import request as urllib_request from urllib.parse import urlencode from socket import timeout router = APIRouter(prefix="/accounts", tags=["账号管理"]) @router.post("/create", response_model=AccountApiResponse, status_code=status.HTTP_201_CREATED) def create_account(account: AccountCreate, db: Session = Depends(get_db)): """创建账号""" # 检查账号是否已存在 existing_account = AccountService.get_account_by_username(db, account.username) if existing_account: return AccountApiResponse( code=ResponseCode.ACCOUNT_EXISTS, message=ResponseMessage.ACCOUNT_EXISTS, data=None ) account_response = AccountService.create_account(db, account) return AccountApiResponse( code=ResponseCode.SUCCESS, message="账号创建成功", data=account_response.dict() ) @router.post("/list", response_model=AccountListResponse) def get_accounts(request: AccountListRequest, db: Session = Depends(get_db)): """获取账号列表""" accounts = AccountService.get_accounts(db, skip=request.skip, limit=request.limit) return AccountListResponse( code=ResponseCode.SUCCESS, message="查询成功", total=len(accounts), data=accounts ) @router.post("/get", response_model=AccountListResponse) def get_account(request: AccountGetRequest, db: Session = Depends(get_db)): """根据多种条件查询账号""" accounts = AccountService.search_accounts( db, account_id=request.account_id, username=request.username, project_name=request.project_name, status=request.status, today_updated=request.today_updated, yh_id=request.yh_id, cl_name=request.cl_name ) if not accounts: return AccountListResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, total=0, data=[] ) return AccountListResponse( code=ResponseCode.SUCCESS, message="查询成功", total=len(accounts), data=accounts ) # 宇恒一号特定查询接口 @router.post("/get/yh", response_model=AccountListResponse) def get_account(request: AccountGetRequestYH, db: Session = Depends(get_db)): """根据多种条件查询账号""" accounts = AccountService.search_accounts( db, project_name=request.project_name, yh_id=request.yh_id ) if not accounts: return AccountListResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, total=0, data=[] ) return AccountListResponse( code=ResponseCode.SUCCESS, message="查询成功", total=len(accounts), data=accounts ) @router.post("/update", response_model=AccountApiResponse) def update_account(request: AccountUpdateRequest, db: Session = Depends(get_db)): """更新账号""" account = AccountService.update_account(db, request.account_id, request.account_data) if not account: return AccountApiResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, data=None ) return AccountApiResponse( code=ResponseCode.SUCCESS, message="账号更新成功", data=account.dict() ) @router.post("/delete", response_model=AccountApiResponse) def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db)): """删除账号""" if not AccountService.delete_account(db, request.account_id): return AccountApiResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, data=None ) return AccountApiResponse( code=ResponseCode.SUCCESS, message="账号删除成功", data=None ) # 获取今日上传的数据接口 @router.post("/get_uplaod_data", response_model=AccountListResponse) def get_uplaod_data(request: AccountGetRequest, db: Session = Depends(get_db)): """根据多种条件查询账号,并合并外部接口的 is_ok 字段(仅返回 today_data 中存在的账号)""" # 1. 从数据库查询账号列表(原有逻辑不变) accounts = AccountService.search_accounts( db, account_id=request.account_id, username=request.username, project_name=request.project_name, status=request.status, today_updated=request.today_updated, yh_id=request.yh_id, cl_name=request.cl_name ) # 2. 调用外部接口构建 user_is_ok_map(替换为 urllib 实现,修复命名冲突和超时异常) user_is_ok_map = {} api_url = "https://engineering.yuxindazhineng.com/index/index/get_over_data" payload = {"user_id": "68c0dbfdb7cbcd616e7c5ab5"} try: # 步骤1:将表单 payload 转为 URL 编码的字节流(urllib POST 要求数据为字节流) payload_bytes = urlencode(payload).encode("utf-8") # 步骤2:构造 Request 对象(使用重命名后的 urllib_request,避免冲突) req = urllib_request.Request( url=api_url, data=payload_bytes, method="POST" # 显式指定 POST 方法 ) # 步骤3:发送请求并读取响应(设置 10 秒超时,避免接口挂起) with urllib_request.urlopen(req, timeout=10) as resp: # 读取响应字节流并解码为 UTF-8 字符串 response_str = resp.read().decode("utf-8") # 步骤4:将 JSON 字符串解析为字典 api_response = json.loads(response_str) # 步骤5:验证接口返回格式并构建 user_is_ok_map 映射 if api_response.get("code") == 0: today_data = api_response.get("data", []) # 给默认值,避免 KeyError for item in today_data: # 安全获取字段并校验类型 if 'user_name' in item and 'is_ok' in item: user_is_ok_map[item['user_name']] = item['is_ok'] except HTTPError as e: # 捕获 HTTP 状态码错误(4xx/5xx) print(f"外部接口 HTTP 错误:{e.code} - {e.reason}") except TimeoutError: # 修复:正确的超时异常名称(首字母大写,原 timeout 会未定义报错) # 捕获请求超时异常 print(f"外部接口调用超时(超过 10 秒)") except URLError as e: # 捕获 URL 解析错误、网络连接错误等 print(f"外部接口网络错误:{e.reason}") except json.JSONDecodeError: # 捕获非合法 JSON 格式响应 print(f"外部接口返回数据格式错误,非合法 JSON 字符串") except Exception as e: # 捕获其他未知异常 print(f"外部接口处理未知异常:{str(e)}") # 3. 关键修改:仅保留 today_data 中存在的账号(核心过滤逻辑) accounts_with_is_ok = [] for account in accounts: # 步骤1:将 AccountResponse 对象转为字典(Pydantic 模型自带 dict() 方法) account_dict = account.dict() # 步骤2:获取当前账号 username,判断是否在 user_is_ok_map 中(不存在则跳过) current_username = account_dict.get("username", "") if current_username not in user_is_ok_map: continue # 核心:过滤掉 today_data 中没有的账号 # 步骤3:给字典添加 is_ok 字段(仅处理存在的账号,无需默认值 0) account_dict['is_ok'] = user_is_ok_map[current_username] # 步骤4:将处理后的字典加入新列表 accounts_with_is_ok.append(account_dict) # 4. 处理空结果返回(原有逻辑不变) if not accounts_with_is_ok: return AccountListResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, total=0, data=[] ) # 5. 正常返回:数据传入新列表 accounts_with_is_ok(仅包含 today_data 中的账号) # print(accounts_with_is_ok) return AccountListResponse( code=ResponseCode.SUCCESS, message="查询成功", total=len(accounts_with_is_ok), data=accounts_with_is_ok )