Files
railway_cloud/app/schemas/task.py

55 lines
1.3 KiB
Python

from pydantic import BaseModel
from typing import Optional, List, Dict, Any
from datetime import datetime
class JobResponse(BaseModel):
id: str
name: Optional[str] = None
func: str
trigger: str
next_run_time: Optional[datetime] = None
class AddCronJobRequest(BaseModel):
job_id: str
func_name: str
cron_expression: Optional[str] = None
year: Optional[int] = None
month: Optional[int] = None
day: Optional[int] = None
week: Optional[int] = None
day_of_week: Optional[int] = None
hour: Optional[int] = None
minute: Optional[int] = None
second: Optional[int] = None
class AddIntervalJobRequest(BaseModel):
job_id: str
func_name: str
seconds: Optional[int] = None
minutes: Optional[int] = None
hours: Optional[int] = None
days: Optional[int] = None
class AddDateJobRequest(BaseModel):
job_id: str
func_name: str
run_date: datetime
class TaskResponse(BaseModel):
"""任务API统一响应格式"""
code: int = 0
message: str
data: Optional[Any] = None
class TaskListResponse(BaseModel):
"""任务列表响应格式"""
code: int = 0
message: str
total: int
data: List[JobResponse] = []
class FunctionListResponse(BaseModel):
"""可用函数列表响应格式"""
code: int = 0
message: str
data: List[str] = []