from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List from ..core.database import get_db from ..core.response_code import ResponseCode, ResponseMessage from ..schemas.account import ( AccountCreate, AccountUpdate, AccountResponse, AccountListRequest, AccountGetRequest, AccountUpdateRequest, AccountDeleteRequest, AccountApiResponse, AccountListResponse,AccountGetRequestYH ) from ..services.account import AccountService import requests router = APIRouter(prefix="/accounts", tags=["账号管理"]) @router.post("/create", response_model=AccountApiResponse, status_code=status.HTTP_201_CREATED) def create_account(account: AccountCreate, db: Session = Depends(get_db)): """创建账号""" # 检查账号是否已存在 existing_account = AccountService.get_account_by_username(db, account.username) if existing_account: return AccountApiResponse( code=ResponseCode.ACCOUNT_EXISTS, message=ResponseMessage.ACCOUNT_EXISTS, data=None ) account_response = AccountService.create_account(db, account) return AccountApiResponse( code=ResponseCode.SUCCESS, message="账号创建成功", data=account_response.dict() ) @router.post("/list", response_model=AccountListResponse) def get_accounts(request: AccountListRequest, db: Session = Depends(get_db)): """获取账号列表""" accounts = AccountService.get_accounts(db, skip=request.skip, limit=request.limit) return AccountListResponse( code=ResponseCode.SUCCESS, message="查询成功", total=len(accounts), data=accounts ) @router.post("/get", response_model=AccountListResponse) def get_account(request: AccountGetRequest, db: Session = Depends(get_db)): """根据多种条件查询账号""" accounts = AccountService.search_accounts( db, account_id=request.account_id, username=request.username, project_name=request.project_name, status=request.status, today_updated=request.today_updated, yh_id=request.yh_id, cl_name=request.cl_name ) if not accounts: return AccountListResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, total=0, data=[] ) return AccountListResponse( code=ResponseCode.SUCCESS, message="查询成功", total=len(accounts), data=accounts ) # 宇恒一号特定查询接口 @router.post("/get/yh", response_model=AccountListResponse) def get_account(request: AccountGetRequestYH, db: Session = Depends(get_db)): """根据多种条件查询账号""" accounts = AccountService.search_accounts( db, project_name=request.project_name, yh_id=request.yh_id ) if not accounts: return AccountListResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, total=0, data=[] ) return AccountListResponse( code=ResponseCode.SUCCESS, message="查询成功", total=len(accounts), data=accounts ) @router.post("/update", response_model=AccountApiResponse) def update_account(request: AccountUpdateRequest, db: Session = Depends(get_db)): """更新账号""" account = AccountService.update_account(db, request.account_id, request.account_data) if not account: return AccountApiResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, data=None ) return AccountApiResponse( code=ResponseCode.SUCCESS, message="账号更新成功", data=account.dict() ) @router.post("/delete", response_model=AccountApiResponse) def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db)): """删除账号""" if not AccountService.delete_account(db, request.account_id): return AccountApiResponse( code=ResponseCode.ACCOUNT_NOT_FOUND, message=ResponseMessage.ACCOUNT_NOT_FOUND, data=None ) return AccountApiResponse( code=ResponseCode.SUCCESS, message="账号删除成功", data=None ) # # 获取今日上传的数据接口 # @router.post("/get_uplaod_data", response_model=AccountListResponse) # def get_account(request: AccountGetRequest, db: Session = Depends(get_db)): # """根据多种条件查询账号,并合并外部接口的 is_ok 字段""" # # 1. 从数据库查询账号列表 # accounts = AccountService.search_accounts( # db, # account_id=request.account_id, # username=request.username, # project_name=request.project_name, # status=request.status, # today_updated=request.today_updated, # yh_id=request.yh_id, # cl_name=request.cl_name # ) # # 2. 调用外部接口获取 today_data,构建 user_name -> is_ok 映射 # user_is_ok_map = {} # 存储映射关系,提升匹配效率 # url = "https://engineering.yuxindazhineng.com/index/index/get_over_data" # payload = {"user_id": "68c0dbfdb7cbcd616e7c5ab5"} # try: # # 发送POST请求并解析JSON响应 # response = requests.request("POST", url, data=payload).json() # if response['code'] == 0: # today_data = response['data'] # # 遍历 today_data,构建映射字典 # for item in today_data: # # 校验字段是否存在,避免 KeyError 异常 # if 'user_name' in item and 'is_ok' in item: # user_name = item['user_name'] # is_ok = item['is_ok'] # user_is_ok_map[user_name] = is_ok # 键:user_name,值:is_ok # except Exception as e: # # 捕获接口调用/解析异常,不阻断核心业务(数据库查询结果正常返回) # print(f"外部接口调用失败或数据解析异常:{str(e)}") # # 3. 遍历 accounts,给每个账号对象添加 is_ok 字段(关键步骤) # for account in accounts: # # 匹配 username(数据库)和 user_name(外部接口) # current_username = account.username # # 从映射字典中获取 is_ok,无匹配则默认赋值 0(可根据业务调整默认值) # is_ok_value = user_is_ok_map.get(current_username, 0) # # 给 account 对象添加 is_ok 字段(兼容 Pydantic 模型,需满足对应配置) # account.is_ok = is_ok_value # # 4. 处理空结果返回 # if not accounts: # return AccountListResponse( # code=ResponseCode.ACCOUNT_NOT_FOUND, # message=ResponseMessage.ACCOUNT_NOT_FOUND, # total=0, # data=[] # ) # # 5. 正常返回结果(包含 is_ok 字段的 accounts) # return AccountListResponse( # code=ResponseCode.SUCCESS, # message="查询成功", # total=len(accounts), # data=accounts # )