from fastapi import APIRouter, HTTPException, status from typing import List from ..schemas.task import ( JobResponse, AddCronJobRequest, AddIntervalJobRequest, AddDateJobRequest, TaskResponse ) from ..utils.scheduler import task_scheduler, example_task, database_cleanup_task router = APIRouter(prefix="/tasks", tags=["定时任务管理"]) # 可用的任务函数映射 AVAILABLE_FUNCTIONS = { "example_task": example_task, "database_cleanup_task": database_cleanup_task, } @router.post("/cron", response_model=TaskResponse) def add_cron_job(request: AddCronJobRequest): """添加cron定时任务""" try: if request.func_name not in AVAILABLE_FUNCTIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"函数 {request.func_name} 不可用" ) func = AVAILABLE_FUNCTIONS[request.func_name] # 构建cron参数 cron_kwargs = {} if request.year is not None: cron_kwargs['year'] = request.year if request.month is not None: cron_kwargs['month'] = request.month if request.day is not None: cron_kwargs['day'] = request.day if request.week is not None: cron_kwargs['week'] = request.week if request.day_of_week is not None: cron_kwargs['day_of_week'] = request.day_of_week if request.hour is not None: cron_kwargs['hour'] = request.hour if request.minute is not None: cron_kwargs['minute'] = request.minute if request.second is not None: cron_kwargs['second'] = request.second job = task_scheduler.add_cron_job(func, request.job_id, **cron_kwargs) return TaskResponse( success=True, message=f"Cron任务 {request.job_id} 添加成功", data={"job_id": job.id, "next_run": str(job.next_run_time)} ) except Exception as e: return TaskResponse( success=False, message=f"添加Cron任务失败: {str(e)}" ) @router.post("/interval", response_model=TaskResponse) def add_interval_job(request: AddIntervalJobRequest): """添加间隔执行任务""" try: if request.func_name not in AVAILABLE_FUNCTIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"函数 {request.func_name} 不可用" ) func = AVAILABLE_FUNCTIONS[request.func_name] # 构建interval参数 interval_kwargs = {} if request.seconds is not None: interval_kwargs['seconds'] = request.seconds if request.minutes is not None: interval_kwargs['minutes'] = request.minutes if request.hours is not None: interval_kwargs['hours'] = request.hours if request.days is not None: interval_kwargs['days'] = request.days job = task_scheduler.add_interval_job(func, request.job_id, **interval_kwargs) return TaskResponse( success=True, message=f"间隔任务 {request.job_id} 添加成功", data={"job_id": job.id, "next_run": str(job.next_run_time)} ) except Exception as e: return TaskResponse( success=False, message=f"添加间隔任务失败: {str(e)}" ) @router.post("/date", response_model=TaskResponse) def add_date_job(request: AddDateJobRequest): """添加指定时间执行任务""" try: if request.func_name not in AVAILABLE_FUNCTIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"函数 {request.func_name} 不可用" ) func = AVAILABLE_FUNCTIONS[request.func_name] job = task_scheduler.add_date_job(func, request.job_id, run_date=request.run_date) return TaskResponse( success=True, message=f"定时任务 {request.job_id} 添加成功", data={"job_id": job.id, "run_date": str(job.next_run_time)} ) except Exception as e: return TaskResponse( success=False, message=f"添加定时任务失败: {str(e)}" ) @router.get("/", response_model=List[JobResponse]) def get_jobs(): """获取所有任务""" jobs = task_scheduler.get_jobs() result = [] for job in jobs: result.append(JobResponse( id=job.id, name=job.name, func=str(job.func), trigger=str(job.trigger), next_run_time=job.next_run_time )) return result @router.delete("/{job_id}", response_model=TaskResponse) def remove_job(job_id: str): """删除任务""" success = task_scheduler.remove_job(job_id) if success: return TaskResponse( success=True, message=f"任务 {job_id} 删除成功" ) else: return TaskResponse( success=False, message=f"删除任务 {job_id} 失败" ) @router.put("/{job_id}/pause", response_model=TaskResponse) def pause_job(job_id: str): """暂停任务""" success = task_scheduler.pause_job(job_id) if success: return TaskResponse( success=True, message=f"任务 {job_id} 已暂停" ) else: return TaskResponse( success=False, message=f"暂停任务 {job_id} 失败" ) @router.put("/{job_id}/resume", response_model=TaskResponse) def resume_job(job_id: str): """恢复任务""" success = task_scheduler.resume_job(job_id) if success: return TaskResponse( success=True, message=f"任务 {job_id} 已恢复" ) else: return TaskResponse( success=False, message=f"恢复任务 {job_id} 失败" ) @router.get("/functions", response_model=List[str]) def get_available_functions(): """获取可用的任务函数列表""" return list(AVAILABLE_FUNCTIONS.keys())