Files
railway_cloud/app/utils/scheduler.py

327 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from sqlalchemy import text
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from ..core.config import settings
from sqlalchemy.orm import Session
from ..core.database import SessionLocal
from ..core.logging_config import get_logger
from ..models.account import Account
# from ..services.daily import get_nyid_by_point_id
from ..services.daily import DailyDataService
from ..services.level_data import LevelDataService
from ..services.checkpoint import CheckpointService
from ..services.section_data import SectionDataService
from ..services.account import AccountService
from ..models.daily import DailyData
from ..models.settlement_data import SettlementData
from ..models.level_data import LevelData
from ..services.settlement_data import SettlementDataService
from typing import List, Tuple, Any
from ..utils.construction_monitor import ConstructionMonitorUtils
import time
import json
# 获取日志记录器
logger = get_logger(__name__)
class TaskScheduler:
def __init__(self):
# 配置作业存储
jobstores = {
'default': SQLAlchemyJobStore(url=settings.DATABASE_URL)
}
# 配置执行器
executors = {
'default': ThreadPoolExecutor(20)
}
# 作业默认配置
job_defaults = {
'coalesce': False,
'max_instances': 3
}
# 创建调度器
self.scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone='Asia/Shanghai'
)
# 添加事件监听器
self.scheduler.add_listener(self._job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
def _job_listener(self, event):
"""作业执行监听器"""
if event.exception:
logger.error(f"Job {event.job_id} crashed: {event.exception}")
else:
logger.info(f"Job {event.job_id} executed successfully")
def start(self):
"""启动调度器"""
if not self.scheduler.running:
self.scheduler.start()
logger.info("定时任务调度器已启动")
# 启动时自动添加系统定时任务
self._setup_system_tasks()
def shutdown(self):
"""关闭调度器"""
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("定时任务调度器已关闭")
def _setup_system_tasks(self):
"""设置系统定时任务"""
try:
logger.info("无定时任务")
# 检查是否已存在每日重置任务
# existing_job = self.scheduler.get_job("daily_reset_today_updated")
# # existing_job = self.scheduler.get_job("get_max_nyid")
# if not existing_job:
# # 添加每天午夜12点重置today_updated字段的任务
# self.scheduler.add_job(
# reset_today_updated_task,
# 'cron',
# id='daily_reset_today_updated',
# hour=0,
# minute=0,
# second=0,
# name='每日重置账号更新状态'
# )
# logger.info("系统定时任务:每日重置账号更新状态已添加")
# existing_job = self.scheduler.get_job("scheduled_get_max_nyid_by_point_id")
# if existing_job is None:
# self.scheduler.add_job(
# scheduled_get_max_nyid_by_point_id,
# 'cron',
# id='scheduled_get_max_nyid_by_point_id',
# hour=1,
# minute=0,
# second=0,
# name='每日获取max NYID关联数据并创建DailyData记录'
# )
# logger.info("系统定时任务每日获取max NYID关联数据任务已添加")
except Exception as e:
logger.error(f"设置系统定时任务失败: {e}")
# 设置测试任务
# try:
# existing_job = self.scheduler.get_job("test_task")
# if not existing_job:
# # 添加每天每小时重置today_updated字段的任务
# self.scheduler.add_job(
# reset_today_updated_task,
# 'cron',
# id='test_task',
# hour='*',
# minute=0,
# second=0,
# name='测试任务'
# )
# logger.info("系统定时任务:测试任务已添加")
# except Exception as e:
# logger.error(f"设置测试任务失败: {e}")
def add_cron_job(self, func, job_id: str, **kwargs):
"""添加cron定时任务"""
return self.scheduler.add_job(func, 'cron', id=job_id, **kwargs)
def add_interval_job(self, func, job_id: str, **kwargs):
"""添加间隔执行任务"""
return self.scheduler.add_job(func, 'interval', id=job_id, **kwargs)
def add_date_job(self, func, job_id: str, **kwargs):
"""添加指定时间执行任务"""
return self.scheduler.add_job(func, 'date', id=job_id, **kwargs)
def remove_job(self, job_id: str):
"""移除任务"""
try:
self.scheduler.remove_job(job_id)
logger.info(f"任务 {job_id} 已移除")
return True
except Exception as e:
logger.error(f"移除任务失败: {e}")
return False
def get_job(self, job_id: str):
"""获取任务"""
return self.scheduler.get_job(job_id)
def get_jobs(self):
"""获取所有任务"""
return self.scheduler.get_jobs()
def pause_job(self, job_id: str):
"""暂停任务"""
try:
self.scheduler.pause_job(job_id)
logger.info(f"任务 {job_id} 已暂停")
return True
except Exception as e:
logger.error(f"暂停任务失败: {e}")
return False
def resume_job(self, job_id: str):
"""恢复任务"""
try:
self.scheduler.resume_job(job_id)
logger.info(f"任务 {job_id} 已恢复")
return True
except Exception as e:
logger.error(f"恢复任务失败: {e}")
return False
# 全局调度器实例
task_scheduler = TaskScheduler()
# 系统定时任务函数
def reset_today_updated_task():
"""每日重置账号today_updated字段为0的任务"""
# db = SessionLocal()
# try:
# logger.info("开始执行每日重置账号更新状态任务")
# # 更新所有账号的today_updated字段为0
# updated_need_count = db.query(Account).filter(Account.today_updated == 1).count()
# updated_count = db.query(Account).update({Account.today_updated: 0})
# db.commit()
# logger.info(f"每日重置任务完成,已重置 {updated_count} 个账号的today_updated字段")
# return f"成功重置 {updated_count} 个账号的更新状态"
# except Exception as e:
# db.rollback()
# logger.error(f"每日重置任务执行失败: {e}")
# raise e
# finally:
# db.close()
# 示例定时任务函数
def example_task():
"""示例定时任务"""
logger.info("执行示例定时任务")
# 这里可以添加具体的业务逻辑
return "任务执行完成"
def database_cleanup_task():
"""数据库清理任务示例"""
logger.info("执行数据库清理任务")
# 这里可以添加数据库清理逻辑
return "数据库清理完成"
# 每日自动写入获取最新工况信息
def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0):
"""定时任务获取max NYID关联数据并批量创建DailyData记录"""
db: Session = None
try:
logger.info("定时任务触发开始获取max NYID关联数据并处理")
# 初始化数据库会话替代接口的Depends依赖
db = SessionLocal()
logger.info("定时任务开始执行获取max NYID关联数据并处理")
# 核心新增清空DailyData表所有数据
# delete_count = db.query(DailyData).delete()
# db.commit()
# logger.info(f"DailyData表清空完成共删除{delete_count}条历史记录")
# 1. 以 level_data 为来源:每个 linecode 取最新一期NYID 最大),再按该 NYID 从 settlement 取一条
level_service = LevelDataService()
settlement_service = SettlementDataService()
daily_service = DailyDataService()
checkpoint_db = CheckpointService()
section_db = SectionDataService()
account_service = AccountService()
monitor = ConstructionMonitorUtils()
linecodes = [r[0] for r in db.query(LevelData.linecode).distinct().all()]
linecode_level_settlement: List[Tuple[str, Any, dict]] = []
for linecode in linecodes:
level_instance = level_service.get_last_by_linecode(db, linecode)
if not level_instance:
continue
nyid = level_instance.NYID
settlement_dict = settlement_service.get_one_dict_by_nyid(db, nyid)
if not settlement_dict:
continue
settlement_dict['__linecode'] = linecode
settlement_dict['__level_data'] = level_instance.to_dict()
linecode_level_settlement.append((linecode, level_instance, settlement_dict))
input_data = [[s] for (_, _, s) in linecode_level_settlement]
if not input_data:
logger.warning("未找到任何 linecode 对应的最新期沉降数据,跳过写 daily")
db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}"))
db.commit()
db.close()
return
# 2. 计算到期数据remaining / 冬休等)
daily_data = monitor.get_due_data(input_data, start=start, end=end)
data = daily_data['data']
logger.info(f"按 level_data 最新期获取数据完成,共{len(data)}条有效记录")
# 3. 关联 level / checkpoint / section / accountlevel 已带在 __level_data
for d in data:
d['level_data'] = d.pop('__level_data', None)
d.pop('__linecode', None)
checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id'])
d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None
if d['checkpoint_data']:
section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id'])
d['section_data'] = section_instance.to_dict() if section_instance else None
else:
d['section_data'] = None
if d.get('section_data') and d['section_data'].get('account_id'):
account_response = account_service.get_account(db, account_id=d['section_data']['account_id'])
d['account_data'] = account_response.__dict__ if account_response else None
else:
d['account_data'] = None
# 4. 构造 DailyData每条已是「每 linecode 最新一期」)
daily_create_data = []
for d in data:
if not all(key in d for key in ['NYID', 'point_id', 'remaining']) or not d.get('level_data') or not d.get('account_data') or not d.get('section_data'):
continue
tem = {
'NYID': d['NYID'],
'point_id': d['point_id'],
'linecode': d['level_data']['linecode'],
'account_id': d['account_data']['account_id'],
'section_id': d['section_data']['section_id'],
'remaining': (0 - int(d['overdue'])) if 'overdue' in d else d['remaining'],
}
daily_create_data.append(tem)
# 批量创建记录
print(daily_create_data)
if daily_create_data:
db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}"))
db.commit() # 必须提交事务
# with open('data.json', 'w', encoding='utf-8') as f:
# json.dump(daily_create_data, f, ensure_ascii=False, indent=4)
created_records = daily_service.batch_create_by_account_nyid(db, daily_create_data)
logger.info(f"定时任务完成:成功创建{len(created_records)}条DailyData记录共处理{len(data)}个point_id数据")
else:
db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}"))
db.commit() # 必须提交事务
logger.warning("定时任务完成无有效数据可创建DailyData记录")
except Exception as e:
# 异常时回滚事务
if db:
db.rollback()
logger.error(f"定时任务执行失败:{str(e)}", exc_info=True)
raise e # 抛出异常便于定时框架捕获告警
finally:
# 确保数据库会话关闭
if db:
db.close()