From 34b698386a8dbcd4fd8116c764dc19ea12fc0f47 Mon Sep 17 00:00:00 2001 From: lhx Date: Thu, 23 Oct 2025 11:32:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=93=8D=E5=BA=94=E6=A0=BC=E5=BC=8F=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=EF=BC=8C=E5=8E=9F=E5=A7=8B=E6=95=B0=E6=8D=AE=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/account.py | 78 +++++++---- app/api/comprehensive_data.py | 208 ++++++++++++++++-------------- app/api/database.py | 89 +++++++++---- app/api/task.py | 193 ++++++++++++++++++--------- app/core/response_code.py | 49 +++++++ app/models/original_data.py | 10 +- app/schemas/account.py | 18 ++- app/schemas/comprehensive_data.py | 37 ++++-- app/schemas/database.py | 7 +- app/schemas/task.py | 18 ++- app/services/comprehensive.py | 6 +- app/services/original_data.py | 61 +++++++-- 12 files changed, 542 insertions(+), 232 deletions(-) create mode 100644 app/core/response_code.py diff --git a/app/api/account.py b/app/api/account.py index 0ef0c10..e0b1f38 100644 --- a/app/api/account.py +++ b/app/api/account.py @@ -2,33 +2,47 @@ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List from ..core.database import get_db +from ..core.response_code import ResponseCode, ResponseMessage from ..schemas.account import ( AccountCreate, AccountUpdate, AccountResponse, - AccountListRequest, AccountGetRequest, AccountUpdateRequest, AccountDeleteRequest + AccountListRequest, AccountGetRequest, AccountUpdateRequest, AccountDeleteRequest, + AccountApiResponse, AccountListResponse ) from ..services.account import AccountService router = APIRouter(prefix="/accounts", tags=["账号管理"]) -@router.post("/create", response_model=AccountResponse, status_code=status.HTTP_201_CREATED) +@router.post("/create", response_model=AccountApiResponse, status_code=status.HTTP_201_CREATED) def create_account(account: AccountCreate, db: Session = Depends(get_db)): """创建账号""" # 检查账号是否已存在 existing_account = AccountService.get_account_by_username(db, account.username) if existing_account: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="用户名已存在" + return AccountApiResponse( + code=ResponseCode.ACCOUNT_EXISTS, + message=ResponseMessage.ACCOUNT_EXISTS, + data=None ) - return AccountService.create_account(db, account) + account_response = AccountService.create_account(db, account) + return AccountApiResponse( + code=ResponseCode.SUCCESS, + message="账号创建成功", + data=account_response.dict() + ) -@router.post("/list", response_model=List[AccountResponse]) +@router.post("/list", response_model=AccountListResponse) def get_accounts(request: AccountListRequest, db: Session = Depends(get_db)): """获取账号列表""" - return AccountService.get_accounts(db, skip=request.skip, limit=request.limit) + accounts = AccountService.get_accounts(db, skip=request.skip, limit=request.limit) + return AccountListResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + total=len(accounts), + data=accounts + ) -@router.post("/get", response_model=List[AccountResponse]) +@router.post("/get", response_model=AccountListResponse) def get_account(request: AccountGetRequest, db: Session = Depends(get_db)): """根据多种条件查询账号""" accounts = AccountService.search_accounts( @@ -40,28 +54,46 @@ def get_account(request: AccountGetRequest, db: Session = Depends(get_db)): today_updated=request.today_updated ) if not accounts: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="未找到符合条件的账号" + return AccountListResponse( + code=ResponseCode.ACCOUNT_NOT_FOUND, + message=ResponseMessage.ACCOUNT_NOT_FOUND, + total=0, + data=[] ) - return accounts + return AccountListResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + total=len(accounts), + data=accounts + ) -@router.post("/update", response_model=AccountResponse) +@router.post("/update", response_model=AccountApiResponse) def update_account(request: AccountUpdateRequest, db: Session = Depends(get_db)): """更新账号""" account = AccountService.update_account(db, request.account_id, request.account_data) if not account: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="账号不存在" + return AccountApiResponse( + code=ResponseCode.ACCOUNT_NOT_FOUND, + message=ResponseMessage.ACCOUNT_NOT_FOUND, + data=None ) - return account + return AccountApiResponse( + code=ResponseCode.SUCCESS, + message="账号更新成功", + data=account.dict() + ) -@router.post("/delete", status_code=status.HTTP_204_NO_CONTENT) +@router.post("/delete", response_model=AccountApiResponse) def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db)): """删除账号""" if not AccountService.delete_account(db, request.account_id): - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="账号不存在" - ) \ No newline at end of file + return AccountApiResponse( + code=ResponseCode.ACCOUNT_NOT_FOUND, + message=ResponseMessage.ACCOUNT_NOT_FOUND, + data=None + ) + return AccountApiResponse( + code=ResponseCode.SUCCESS, + message="账号删除成功", + data=None + ) diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py index 4344263..d7faa14 100644 --- a/app/api/comprehensive_data.py +++ b/app/api/comprehensive_data.py @@ -2,6 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List, Optional from ..core.database import get_db +from ..core.response_code import ResponseCode, ResponseMessage from ..schemas.comprehensive_data import ( BatchSectionDataImportRequest, BatchCheckpointDataImportRequest, @@ -40,20 +41,27 @@ def batch_import_sections(request: BatchSectionDataImportRequest, db: Session = """批量导入断面数据""" try: logger.info(f"Starting batch import sections, count: {len(request.data)}") - - # 直接使用字典列表,不需要转换 data_list = request.data - result = section_service.batch_import_sections(db, data_list) - logger.info(f"Batch import sections completed: {result['message']}") - return DataImportResponse(**result) + # 统一响应格式 + return DataImportResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED, + message=result['message'], + data={ + 'total_count': result.get('total_count', 0), + 'success_count': result.get('success_count', 0), + 'failed_count': result.get('failed_count', 0), + 'failed_items': result.get('failed_items', []) + } + ) except Exception as e: logger.error(f"Batch import sections failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"批量导入断面数据失败: {str(e)}" + return DataImportResponse( + code=ResponseCode.IMPORT_FAILED, + message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} ) @router.post("/batch_import_checkpoints", response_model=DataImportResponse) @@ -61,20 +69,26 @@ def batch_import_checkpoints(request: BatchCheckpointDataImportRequest, db: Sess """批量导入观测点数据""" try: logger.info(f"Starting batch import checkpoints, count: {len(request.data)}") - - # 直接使用字典列表,不需要转换 data_list = request.data - result = checkpoint_service.batch_import_checkpoints(db, data_list) - logger.info(f"Batch import checkpoints completed: {result['message']}") - return DataImportResponse(**result) + return DataImportResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED, + message=result['message'], + data={ + 'total_count': result.get('total_count', 0), + 'success_count': result.get('success_count', 0), + 'failed_count': result.get('failed_count', 0), + 'failed_items': result.get('failed_items', []) + } + ) except Exception as e: logger.error(f"Batch import checkpoints failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"批量导入观测点数据失败: {str(e)}" + return DataImportResponse( + code=ResponseCode.IMPORT_FAILED, + message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} ) @router.post("/batch_import_settlement_data", response_model=DataImportResponse) @@ -82,20 +96,26 @@ def batch_import_settlement_data(request: BatchSettlementDataImportRequest, db: """批量导入沉降数据""" try: logger.info(f"Starting batch import settlement data, count: {len(request.data)}") - - # 直接使用字典列表,不需要转换 data_list = request.data - result = settlement_service.batch_import_settlement_data(db, data_list) - logger.info(f"Batch import settlement data completed: {result['message']}") - return DataImportResponse(**result) + return DataImportResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED, + message=result['message'], + data={ + 'total_count': result.get('total_count', 0), + 'success_count': result.get('success_count', 0), + 'failed_count': result.get('failed_count', 0), + 'failed_items': result.get('failed_items', []) + } + ) except Exception as e: logger.error(f"Batch import settlement data failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"批量导入沉降数据失败: {str(e)}" + return DataImportResponse( + code=ResponseCode.IMPORT_FAILED, + message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} ) @router.post("/batch_import_level_data", response_model=DataImportResponse) @@ -103,20 +123,26 @@ def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session = """批量导入水准数据""" try: logger.info(f"Starting batch import level data, count: {len(request.data)}") - - # 直接使用字典列表,不需要转换 data_list = request.data - result = level_service.batch_import_level_data(db, data_list) - logger.info(f"Batch import level data completed: {result['message']}") - return DataImportResponse(**result) + return DataImportResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED, + message=result['message'], + data={ + 'total_count': result.get('total_count', 0), + 'success_count': result.get('success_count', 0), + 'failed_count': result.get('failed_count', 0), + 'failed_items': result.get('failed_items', []) + } + ) except Exception as e: logger.error(f"Batch import level data failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"批量导入水准数据失败: {str(e)}" + return DataImportResponse( + code=ResponseCode.IMPORT_FAILED, + message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} ) @router.post("/batch_import_original_data", response_model=DataImportResponse) @@ -127,33 +153,40 @@ def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Sess # 验证数据中是否包含account_id if not request.data or len(request.data) == 0: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="导入数据不能为空" + return DataImportResponse( + code=ResponseCode.BAD_REQUEST, + message="导入数据不能为空", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} ) # 检查第一条数据是否包含account_id if 'account_id' not in request.data[0]: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="数据中必须包含account_id字段" + return DataImportResponse( + code=ResponseCode.BAD_REQUEST, + message="数据中必须包含account_id字段", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} ) - # 直接使用字典列表,不需要转换 data_list = request.data - result = original_service.batch_import_original_data(db, data_list) - logger.info(f"Batch import original data completed: {result['message']}") - return DataImportResponse(**result) - except HTTPException: - raise + return DataImportResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED, + message=result['message'], + data={ + 'total_count': result.get('total_count', 0), + 'success_count': result.get('success_count', 0), + 'failed_count': result.get('failed_count', 0), + 'failed_items': result.get('failed_items', []) + } + ) except Exception as e: logger.error(f"Batch import original data failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"批量导入原始数据失败: {str(e)}" + return DataImportResponse( + code=ResponseCode.IMPORT_FAILED, + message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} ) # 查询断面数据对应观察点数据 @@ -162,8 +195,6 @@ def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)) """获取断面数据 + 观测点""" try: logger.info(f"Querying section data with params: {request.dict()}") - - # 调用服务层的业务方法 result_data = section_service.search_sections_with_checkpoints( db, id=request.id, @@ -174,31 +205,29 @@ def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)) status=request.status, account_id=request.account_id ) - logger.info(f"Found {len(result_data)} sections with checkpoints") + return DataResponse( - success=True, + code=ResponseCode.SUCCESS, message="查询成功", - count=len(result_data), + total=len(result_data), data=result_data ) - except Exception as e: logger.error(f"Query section data failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"查询断面数据失败: {str(e)}" + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}", + total=0, + data=[] ) - # 根据观测点id查询沉降数据 @router.post("/get_settlement", response_model=DataResponse) def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(get_db)): """获取沉降数据,按上传时间倒序排序,支持limit参数限制返回数量""" try: logger.info(f"Querying settlement data with params: {request.dict()}") - - # 调用服务层的业务方法 result_data = settlement_service.search_settlement_data_formatted( db, id=request.id, @@ -208,20 +237,21 @@ def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(ge workinfoname=request.workinfoname, limit=request.limit ) - logger.info(f"Found {len(result_data)} settlement records") + return DataResponse( - success=True, + code=ResponseCode.SUCCESS, message="查询成功", - count=len(result_data), + total=len(result_data), data=result_data ) - except Exception as e: logger.error(f"Query settlement data failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"查询沉降数据失败: {str(e)}" + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}", + total=0, + data=[] ) # 查询沉降数据+观测点数据 @@ -230,8 +260,6 @@ def get_settlement_checkpoint(request: SettlementDataCheckpointQueryRequest, db: """获取沉降数据+观测点数据,按上传时间倒序排序,支持limit参数限制返回数量""" try: logger.info(f"Querying settlement data with params: {request.dict()}") - - # 调用服务层的业务方法 result_data = settlement_service.search_settlement_checkpoint_data_formatted( db, id=request.id, @@ -242,41 +270,32 @@ def get_settlement_checkpoint(request: SettlementDataCheckpointQueryRequest, db: linecode=request.linecode, limit=request.limit ) - logger.info(f"Found {len(result_data)} settlement records") + return DataResponse( - success=True, + code=ResponseCode.SUCCESS, message="查询成功", - count=len(result_data), + total=len(result_data), data=result_data ) - except Exception as e: logger.error(f"Query settlement data failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"查询沉降数据失败: {str(e)}" + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}", + total=0, + data=[] ) - # 根据期数id获取原始数据 @router.post("/get_original", response_model=DataResponse) def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)): - """获取水准数据+原始数据 - 必须提供account_id""" + """获取水准数据+原始数据 - account_id可选,不填则查询所有分表""" try: logger.info(f"Querying original data with params: {request.dict()}") - - # 验证account_id - if not request.account_id: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="必须提供account_id参数" - ) - - # 调用综合服务的业务方法 result = comprehensive_service.get_level_and_original_data( db, - account_id=request.account_id, + account_id=request.account_id, # 可选 id=request.id, bfpcode=request.bfpcode, bffb=request.bffb, @@ -286,17 +305,16 @@ def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db ) return DataResponse( - success=result["success"], + code=ResponseCode.SUCCESS, message=result["message"], - count=result["count"], + total=result["count"], data=result["data"] ) - - except HTTPException: - raise except Exception as e: logger.error(f"Query original data failed: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"查询原始数据失败: {str(e)}" + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}", + total=0, + data=[] ) diff --git a/app/api/database.py b/app/api/database.py index 9400923..317306b 100644 --- a/app/api/database.py +++ b/app/api/database.py @@ -3,6 +3,7 @@ from sqlalchemy.orm import Session from typing import List, Optional import base64 from ..core.database import get_db +from ..core.response_code import ResponseCode, ResponseMessage from ..schemas.database import ( SQLExecuteRequest, SQLExecuteResponse, TableDataRequest, TableDataResponse, CreateTableRequest, ImportDataRequest, FileImportFormData @@ -15,7 +16,11 @@ router = APIRouter(prefix="/database", tags=["数据库管理"]) def execute_sql(request: SQLExecuteRequest, db: Session = Depends(get_db)): """执行SQL语句""" result = DatabaseService.execute_sql(db, request.sql) - return SQLExecuteResponse(**result) + return SQLExecuteResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.DATABASE_ERROR, + message=result['message'], + data=result.get('data') + ) @router.post("/table-data", response_model=TableDataResponse) def get_table_data(request: TableDataRequest, db: Session = Depends(get_db)): @@ -26,7 +31,12 @@ def get_table_data(request: TableDataRequest, db: Session = Depends(get_db)): request.limit or 100, request.offset or 0 ) - return TableDataResponse(**result) + return TableDataResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.DATABASE_ERROR, + message=result['message'], + total=result.get('total_count'), + data=result.get('data') + ) @router.post("/create-table", response_model=SQLExecuteResponse) def create_table(request: CreateTableRequest, db: Session = Depends(get_db)): @@ -37,28 +47,48 @@ def create_table(request: CreateTableRequest, db: Session = Depends(get_db)): request.columns, request.primary_key ) - return SQLExecuteResponse(**result) + return SQLExecuteResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.DATABASE_ERROR, + message=result['message'], + data=None + ) -@router.post("/drop-table") +@router.post("/drop-table", response_model=SQLExecuteResponse) def drop_table(request: dict, db: Session = Depends(get_db)): """删除表""" table_name = request.get("table_name") if not table_name: - raise HTTPException(status_code=400, detail="table_name is required") + return SQLExecuteResponse( + code=ResponseCode.BAD_REQUEST, + message="table_name is required", + data=None + ) result = DatabaseService.drop_table(db, table_name) - return SQLExecuteResponse(**result) + return SQLExecuteResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.DATABASE_ERROR, + message=result['message'], + data=None + ) @router.post("/import-data", response_model=SQLExecuteResponse) def import_data(request: ImportDataRequest, db: Session = Depends(get_db)): """导入数据""" result = DatabaseService.import_data(db, request.table_name, request.data) - return SQLExecuteResponse(**result) + return SQLExecuteResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.DATABASE_ERROR, + message=result['message'], + data=None + ) @router.post("/tables", response_model=SQLExecuteResponse) def get_table_list(): """获取所有表名""" result = DatabaseService.get_table_list() - return SQLExecuteResponse(**result) + return SQLExecuteResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.DATABASE_ERROR, + message=result['message'], + data=result.get('data') + ) @router.post("/import-file", response_model=SQLExecuteResponse) async def import_file( @@ -71,34 +101,46 @@ async def import_file( try: # 检查文件类型 if not file.filename: - raise HTTPException(status_code=400, detail="文件名不能为空") + return SQLExecuteResponse( + code=ResponseCode.BAD_REQUEST, + message="文件名不能为空", + data=None + ) - # 处理中文文件名和后缀名 - 更可靠的方法 + # 处理中文文件名和后缀名 try: - # FastAPI的UploadFile已经正确处理了文件名编码 filename = file.filename except: filename = "unknown_file" if not filename or filename == "": - raise HTTPException(status_code=400, detail="文件名不能为空") + return SQLExecuteResponse( + code=ResponseCode.BAD_REQUEST, + message="文件名不能为空", + data=None + ) # 提取文件扩展名 if '.' not in filename: - raise HTTPException(status_code=400, detail="文件必须有扩展名") + return SQLExecuteResponse( + code=ResponseCode.BAD_REQUEST, + message="文件必须有扩展名", + data=None + ) file_ext = filename.lower().split('.')[-1] if file_ext not in ['csv', 'xlsx', 'xls']: - raise HTTPException( - status_code=400, - detail=f"不支持的文件类型: .{file_ext},仅支持 .csv, .xlsx, .xls" + return SQLExecuteResponse( + code=ResponseCode.BAD_REQUEST, + message=f"不支持的文件类型: .{file_ext},仅支持 .csv, .xlsx, .xls", + data=None ) # 读取文件内容 file_content = await file.read() - # 将文件内容转换为base64(保持与现有FileImportUtils兼容) + # 将文件内容转换为base64 file_content_base64 = base64.b64encode(file_content).decode('utf-8') # 调用服务层方法 @@ -106,12 +148,15 @@ async def import_file( db, filename, file_content_base64, table_name, force_overwrite ) - return SQLExecuteResponse(**result) + return SQLExecuteResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED, + message=result['message'], + data=result.get('data') if result.get('success') else None + ) - except HTTPException: - raise except Exception as e: return SQLExecuteResponse( - success=False, - message=f"文件导入失败: {str(e)}" + code=ResponseCode.IMPORT_FAILED, + message=f"文件导入失败: {str(e)}", + data=None ) \ No newline at end of file diff --git a/app/api/task.py b/app/api/task.py index 80e75f9..695928b 100644 --- a/app/api/task.py +++ b/app/api/task.py @@ -1,10 +1,11 @@ -from fastapi import APIRouter, HTTPException, status +from fastapi import APIRouter, Depends from typing import List from ..schemas.task import ( JobResponse, AddCronJobRequest, AddIntervalJobRequest, - AddDateJobRequest, TaskResponse + AddDateJobRequest, TaskResponse, TaskListResponse, FunctionListResponse ) from ..utils.scheduler import task_scheduler, example_task, database_cleanup_task, reset_today_updated_task +from ..core.response_code import ResponseCode, ResponseMessage router = APIRouter(prefix="/tasks", tags=["定时任务管理"]) @@ -20,9 +21,10 @@ def add_cron_job(request: AddCronJobRequest): """添加cron定时任务""" try: if request.func_name not in AVAILABLE_FUNCTIONS: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"函数 {request.func_name} 不可用" + return TaskResponse( + code=ResponseCode.BAD_REQUEST, + message=f"函数 {request.func_name} 不可用", + data=None ) func = AVAILABLE_FUNCTIONS[request.func_name] @@ -49,15 +51,16 @@ def add_cron_job(request: AddCronJobRequest): job = task_scheduler.add_cron_job(func, request.job_id, **cron_kwargs) return TaskResponse( - success=True, + code=ResponseCode.SUCCESS, message=f"Cron任务 {request.job_id} 添加成功", data={"job_id": job.id, "next_run": str(job.next_run_time)} ) except Exception as e: return TaskResponse( - success=False, - message=f"添加Cron任务失败: {str(e)}" + code=ResponseCode.INTERNAL_ERROR, + message=f"添加Cron任务失败: {str(e)}", + data=None ) @router.post("/interval", response_model=TaskResponse) @@ -65,9 +68,10 @@ def add_interval_job(request: AddIntervalJobRequest): """添加间隔执行任务""" try: if request.func_name not in AVAILABLE_FUNCTIONS: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"函数 {request.func_name} 不可用" + return TaskResponse( + code=ResponseCode.BAD_REQUEST, + message=f"函数 {request.func_name} 不可用", + data=None ) func = AVAILABLE_FUNCTIONS[request.func_name] @@ -86,15 +90,16 @@ def add_interval_job(request: AddIntervalJobRequest): job = task_scheduler.add_interval_job(func, request.job_id, **interval_kwargs) return TaskResponse( - success=True, + code=ResponseCode.SUCCESS, message=f"间隔任务 {request.job_id} 添加成功", data={"job_id": job.id, "next_run": str(job.next_run_time)} ) except Exception as e: return TaskResponse( - success=False, - message=f"添加间隔任务失败: {str(e)}" + code=ResponseCode.INTERNAL_ERROR, + message=f"添加间隔任务失败: {str(e)}", + data=None ) @router.post("/date", response_model=TaskResponse) @@ -102,57 +107,86 @@ def add_date_job(request: AddDateJobRequest): """添加指定时间执行任务""" try: if request.func_name not in AVAILABLE_FUNCTIONS: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"函数 {request.func_name} 不可用" + return TaskResponse( + code=ResponseCode.BAD_REQUEST, + message=f"函数 {request.func_name} 不可用", + data=None ) func = AVAILABLE_FUNCTIONS[request.func_name] job = task_scheduler.add_date_job(func, request.job_id, run_date=request.run_date) return TaskResponse( - success=True, + code=ResponseCode.SUCCESS, message=f"定时任务 {request.job_id} 添加成功", data={"job_id": job.id, "run_date": str(job.next_run_time)} ) except Exception as e: return TaskResponse( - success=False, - message=f"添加定时任务失败: {str(e)}" + code=ResponseCode.INTERNAL_ERROR, + message=f"添加定时任务失败: {str(e)}", + data=None ) -@router.post("/list", response_model=List[JobResponse]) +@router.post("/list", response_model=TaskListResponse) def get_jobs(): """获取所有任务""" - jobs = task_scheduler.get_jobs() - result = [] - for job in jobs: - result.append(JobResponse( - id=job.id, - name=job.name, - func=str(job.func), - trigger=str(job.trigger), - next_run_time=job.next_run_time - )) - return result + try: + jobs = task_scheduler.get_jobs() + result = [] + for job in jobs: + result.append(JobResponse( + id=job.id, + name=job.name, + func=str(job.func), + trigger=str(job.trigger), + next_run_time=job.next_run_time + )) + return TaskListResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + total=len(result), + data=result + ) + except Exception as e: + return TaskListResponse( + code=ResponseCode.QUERY_FAILED, + message=f"查询任务列表失败: {str(e)}", + total=0, + data=[] + ) @router.post("/remove", response_model=TaskResponse) def remove_job(request: dict): """删除任务""" job_id = request.get("job_id") if not job_id: - raise HTTPException(status_code=400, detail="job_id is required") - success = task_scheduler.remove_job(job_id) - if success: return TaskResponse( - success=True, - message=f"任务 {job_id} 删除成功" + code=ResponseCode.BAD_REQUEST, + message="job_id is required", + data=None ) - else: + + try: + success = task_scheduler.remove_job(job_id) + if success: + return TaskResponse( + code=ResponseCode.SUCCESS, + message=f"任务 {job_id} 删除成功", + data=None + ) + else: + return TaskResponse( + code=ResponseCode.NOT_FOUND, + message=f"任务 {job_id} 不存在", + data=None + ) + except Exception as e: return TaskResponse( - success=False, - message=f"删除任务 {job_id} 失败" + code=ResponseCode.INTERNAL_ERROR, + message=f"删除任务 {job_id} 失败: {str(e)}", + data=None ) @router.post("/pause", response_model=TaskResponse) @@ -160,17 +194,31 @@ def pause_job(request: dict): """暂停任务""" job_id = request.get("job_id") if not job_id: - raise HTTPException(status_code=400, detail="job_id is required") - success = task_scheduler.pause_job(job_id) - if success: return TaskResponse( - success=True, - message=f"任务 {job_id} 已暂停" + code=ResponseCode.BAD_REQUEST, + message="job_id is required", + data=None ) - else: + + try: + success = task_scheduler.pause_job(job_id) + if success: + return TaskResponse( + code=ResponseCode.SUCCESS, + message=f"任务 {job_id} 已暂停", + data=None + ) + else: + return TaskResponse( + code=ResponseCode.NOT_FOUND, + message=f"任务 {job_id} 不存在", + data=None + ) + except Exception as e: return TaskResponse( - success=False, - message=f"暂停任务 {job_id} 失败" + code=ResponseCode.INTERNAL_ERROR, + message=f"暂停任务 {job_id} 失败: {str(e)}", + data=None ) @router.post("/resume", response_model=TaskResponse) @@ -178,20 +226,45 @@ def resume_job(request: dict): """恢复任务""" job_id = request.get("job_id") if not job_id: - raise HTTPException(status_code=400, detail="job_id is required") - success = task_scheduler.resume_job(job_id) - if success: return TaskResponse( - success=True, - message=f"任务 {job_id} 已恢复" - ) - else: - return TaskResponse( - success=False, - message=f"恢复任务 {job_id} 失败" + code=ResponseCode.BAD_REQUEST, + message="job_id is required", + data=None ) -@router.post("/functions", response_model=List[str]) + try: + success = task_scheduler.resume_job(job_id) + if success: + return TaskResponse( + code=ResponseCode.SUCCESS, + message=f"任务 {job_id} 已恢复", + data=None + ) + else: + return TaskResponse( + code=ResponseCode.NOT_FOUND, + message=f"任务 {job_id} 不存在", + data=None + ) + except Exception as e: + return TaskResponse( + code=ResponseCode.INTERNAL_ERROR, + message=f"恢复任务 {job_id} 失败: {str(e)}", + data=None + ) + +@router.post("/functions", response_model=FunctionListResponse) def get_available_functions(): """获取可用的任务函数列表""" - return list(AVAILABLE_FUNCTIONS.keys()) \ No newline at end of file + try: + return FunctionListResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + data=list(AVAILABLE_FUNCTIONS.keys()) + ) + except Exception as e: + return FunctionListResponse( + code=ResponseCode.QUERY_FAILED, + message=f"查询可用函数失败: {str(e)}", + data=[] + ) \ No newline at end of file diff --git a/app/core/response_code.py b/app/core/response_code.py new file mode 100644 index 0000000..196c394 --- /dev/null +++ b/app/core/response_code.py @@ -0,0 +1,49 @@ +""" +响应状态码常量管理 +统一管理API响应状态码 +""" + +class ResponseCode: + """响应状态码常量""" + + # 成功 + SUCCESS = 0 + + # 客户端错误 (1000-1999) + BAD_REQUEST = 1000 # 请求参数错误 + UNAUTHORIZED = 1001 # 未授权 + FORBIDDEN = 1003 # 禁止访问 + NOT_FOUND = 1004 # 资源不存在 + CONFLICT = 1009 # 资源冲突 + VALIDATION_ERROR = 1010 # 数据验证错误 + + # 服务器错误 (2000-2999) + INTERNAL_ERROR = 2000 # 内部服务器错误 + DATABASE_ERROR = 2001 # 数据库错误 + + # 业务错误 (3000-3999) + ACCOUNT_NOT_FOUND = 3001 # 账号不存在 + ACCOUNT_EXISTS = 3002 # 账号已存在 + DATA_EXISTS = 3003 # 数据已存在 + DATA_NOT_FOUND = 3004 # 数据不存在 + IMPORT_FAILED = 3005 # 导入失败 + QUERY_FAILED = 3006 # 查询失败 + + +class ResponseMessage: + """响应消息常量""" + + SUCCESS = "操作成功" + BAD_REQUEST = "请求参数错误" + UNAUTHORIZED = "未授权访问" + FORBIDDEN = "禁止访问" + NOT_FOUND = "资源不存在" + INTERNAL_ERROR = "服务器内部错误" + DATABASE_ERROR = "数据库操作失败" + + ACCOUNT_NOT_FOUND = "账号不存在" + ACCOUNT_EXISTS = "账号已存在" + DATA_EXISTS = "数据已存在" + DATA_NOT_FOUND = "数据不存在" + IMPORT_FAILED = "数据导入失败" + QUERY_FAILED = "数据查询失败" diff --git a/app/models/original_data.py b/app/models/original_data.py index a4e3f10..6c7c471 100644 --- a/app/models/original_data.py +++ b/app/models/original_data.py @@ -38,11 +38,11 @@ def get_original_data_model(account_id: int): '__table_args__': {'extend_existing': True}, 'id': Column(Integer, primary_key=True, index=True, autoincrement=True), 'account_id': Column(Integer, nullable=False, comment="账号ID", index=True), - 'bfpcode': Column(String(1000), nullable=False, comment="前(后)视点名称"), - 'mtime': Column(DateTime, nullable=False, comment="测点观测时间"), - 'bffb': Column(String(1000), nullable=False, comment="前(后)视标记符"), - 'bfpl': Column(String(1000), nullable=False, comment="前(后)视距离(m)"), - 'bfpvalue': Column(String(1000), nullable=False, comment="前(后)视尺读数(m)"), + 'bfpcode': Column(String(1000), comment="前(后)视点名称"), + 'mtime': Column(DateTime, comment="测点观测时间"), + 'bffb': Column(String(1000), comment="前(后)视标记符"), + 'bfpl': Column(String(1000), comment="前(后)视距离(m)"), + 'bfpvalue': Column(String(1000), comment="前(后)视尺读数(m)"), 'NYID': Column(String(100), nullable=False, comment="期数id", index=True), 'sort': Column(Integer, comment="序号") } diff --git a/app/schemas/account.py b/app/schemas/account.py index e2b1151..7e052fd 100644 --- a/app/schemas/account.py +++ b/app/schemas/account.py @@ -1,5 +1,5 @@ from pydantic import BaseModel, Field, ConfigDict, field_serializer -from typing import Optional +from typing import Optional, List, Any from datetime import datetime class AccountBase(BaseModel): @@ -62,4 +62,18 @@ class AccountUpdateRequest(BaseModel): account_data: AccountUpdate class AccountDeleteRequest(BaseModel): - account_id: int \ No newline at end of file + account_id: int + +# 统一响应格式 +class AccountApiResponse(BaseModel): + """账号API统一响应格式""" + code: int = 0 + message: str + data: Optional[Any] = None + +class AccountListResponse(BaseModel): + """账号列表响应格式""" + code: int = 0 + message: str + total: int + data: List[AccountResponse] = [] \ No newline at end of file diff --git a/app/schemas/comprehensive_data.py b/app/schemas/comprehensive_data.py index 5069299..2bff7e3 100644 --- a/app/schemas/comprehensive_data.py +++ b/app/schemas/comprehensive_data.py @@ -70,7 +70,7 @@ class SectionDataImportRequest(BaseModel): # 原始数据查询请求 class OriginalDataQueryRequest(BaseModel): - account_id: int # 账号ID,必填 + account_id: Optional[int] = None # 账号ID,可选。不填则查询所有分表 linecode: Optional[str] = None id: Optional[int] = None bfpcode: Optional[str] = None @@ -190,19 +190,40 @@ class BatchOriginalDataImportRequest(BaseModel): # 新增响应模型 class DataImportResponse(BaseModel): - success: bool + code: int = 0 # 响应状态码,0表示成功 message: str - total_count: int - success_count: int - failed_count: int - failed_items: List[Dict[str, Any]] = [] + data: Optional[Dict[str, Any]] = None # 将详细信息放入data中 + + class Config: + schema_extra = { + "example": { + "code": 0, + "message": "批量导入完成", + "data": { + "total_count": 100, + "success_count": 100, + "failed_count": 0, + "failed_items": [] + } + } + } # 查询响应模型 class DataResponse(BaseModel): - success: bool + code: int = 0 # 响应状态码,0表示成功 message: str - count: int data: List[Dict[str, Any]] = [] + total: Optional[int] = None # 总数 + + class Config: + schema_extra = { + "example": { + "code": 0, + "message": "查询成功", + "total": 10, + "data": [] + } + } # 兼容旧接口的模型 class ComprehensiveDataImportRequest(BaseModel): diff --git a/app/schemas/database.py b/app/schemas/database.py index 174a704..9736217 100644 --- a/app/schemas/database.py +++ b/app/schemas/database.py @@ -5,10 +5,9 @@ class SQLExecuteRequest(BaseModel): sql: str class SQLExecuteResponse(BaseModel): - success: bool + code: int = 0 # 响应状态码 message: str data: Optional[Any] = None - rows_affected: Optional[int] = None class TableDataRequest(BaseModel): table_name: str @@ -16,10 +15,10 @@ class TableDataRequest(BaseModel): offset: Optional[int] = 0 class TableDataResponse(BaseModel): - success: bool + code: int = 0 # 响应状态码 message: str + total: Optional[int] = None data: Optional[List[Dict[str, Any]]] = None - total_count: Optional[int] = None class CreateTableRequest(BaseModel): table_name: str diff --git a/app/schemas/task.py b/app/schemas/task.py index c03d098..87b64d6 100644 --- a/app/schemas/task.py +++ b/app/schemas/task.py @@ -36,6 +36,20 @@ class AddDateJobRequest(BaseModel): run_date: datetime class TaskResponse(BaseModel): - success: bool + """任务API统一响应格式""" + code: int = 0 message: str - data: Optional[Any] = None \ No newline at end of file + data: Optional[Any] = None + +class TaskListResponse(BaseModel): + """任务列表响应格式""" + code: int = 0 + message: str + total: int + data: List[JobResponse] = [] + +class FunctionListResponse(BaseModel): + """可用函数列表响应格式""" + code: int = 0 + message: str + data: List[str] = [] \ No newline at end of file diff --git a/app/services/comprehensive.py b/app/services/comprehensive.py index 31c9cd1..ed1e694 100644 --- a/app/services/comprehensive.py +++ b/app/services/comprehensive.py @@ -168,7 +168,7 @@ class ComprehensiveDataService: } def get_level_and_original_data(self, db: Session, - account_id: int, + account_id: Optional[int] = None, id: Optional[int] = None, bfpcode: Optional[str] = None, bffb: Optional[str] = None, @@ -180,7 +180,7 @@ class ComprehensiveDataService: Args: db: 数据库会话 - account_id: 账号ID,必填 + account_id: 账号ID,可选。不填则查询所有分表 其他查询条件... Returns: @@ -193,7 +193,7 @@ class ComprehensiveDataService: linecode=linecode ) - # 查询原始数据 - 传递account_id + # 查询原始数据 - account_id可选 original_data = self.original_service.search_original_data( db, account_id=account_id, diff --git a/app/services/original_data.py b/app/services/original_data.py index 5de1499..b9e456c 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -139,26 +139,48 @@ class OriginalDataService(BaseService[OriginalData]): logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") return [] + def get_all_original_data_tables(self, db: Session) -> List[str]: + """ + 获取所有原始数据分表的表名 + + Args: + db: 数据库会话 + + Returns: + 原始数据表名列表 + """ + try: + inspector = inspect(engine) + all_tables = inspector.get_table_names() + # 筛选出所有原始数据分表 + original_tables = [table for table in all_tables if table.startswith('original_data_')] + logger.info(f"Found {len(original_tables)} original data tables: {original_tables}") + return original_tables + except Exception as e: + logger.error(f"Failed to get original data tables: {str(e)}") + return [] + def search_original_data(self, db: Session, - account_id: int, + account_id: Optional[int] = None, id: Optional[int] = None, bfpcode: Optional[str] = None, bffb: Optional[str] = None, nyid: Optional[str] = None, - bfpl: Optional[str] = None) -> List[OriginalData]: + bfpl: Optional[str] = None) -> List[Any]: """ 根据多个条件搜索原始数据 + 如果未提供account_id,则遍历所有分表查询 Args: db: 数据库会话 - account_id: 账号ID + account_id: 账号ID,可选。不填则查询所有分表 其他查询条件... Returns: 原始数据列表 """ try: - table_name = self._get_table_name(account_id) + # 构建查询条件 conditions = [] params = {} @@ -179,11 +201,34 @@ class OriginalDataService(BaseService[OriginalData]): params["bfpl"] = bfpl where_clause = " AND ".join(conditions) if conditions else "1=1" - query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") - result = db.execute(query, params) - return result.fetchall() + + # 如果指定了account_id,只查询该账号的分表 + if account_id is not None: + table_name = self._get_table_name(account_id) + query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") + result = db.execute(query, params) + return result.fetchall() + + # 未指定account_id,遍历所有分表 + all_results = [] + tables = self.get_all_original_data_tables(db) + + for table_name in tables: + try: + query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") + result = db.execute(query, params) + rows = result.fetchall() + all_results.extend(rows) + logger.info(f"Found {len(rows)} records in table {table_name}") + except Exception as e: + logger.warning(f"Failed to query table {table_name}: {str(e)}") + continue + + logger.info(f"Total found {len(all_results)} records from {len(tables)} tables") + return all_results + except Exception as e: - logger.error(f"Failed to search data from {self._get_table_name(account_id)}: {str(e)}") + logger.error(f"Failed to search original data: {str(e)}") return [] def _check_settlement_exists(self, db: Session, nyid: str) -> bool: