Files
cjgc_upload/scheduler.py

228 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import os
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from main import DeviceAutomation
import globals.apis as apis
## --- 配置区 ---
API_URL = "http://your-api-server.com/api/tasks"
MAX_WORKERS = 3
TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
POLLING_INTERVAL = 600 # 10分钟轮询一次
# 文件操作锁,防止多线程同时写入文件造成冲突
file_lock = threading.Lock()
def update_file_status(username, from_status, to_status):
"""
安全地更新 time.txt 中该用户的状态
例如: 将 'true' 改为 'running', 或将 'running' 改为 'done'
"""
if not os.path.exists(TIME_FILE_PATH):
return False
success = False
with file_lock:
try:
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
new_lines = []
for line in lines:
# 使用正则确保精准匹配用户名和结尾状态
# 匹配规则:行内包含该用户名,且该行以 from_status 结尾
if re.search(rf'\b{username}\b', line) and line.strip().endswith(from_status):
# 只替换行尾的那个状态词
line = re.sub(rf'{from_status}$', to_status, line.rstrip()) + '\n'
success = True
new_lines.append(line)
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
f.writelines(new_lines)
if success:
print(f"📝 [文件更新] 用户 {username}: {from_status} -> {to_status}")
return success
except Exception as e:
print(f"❌ 更新文件状态失败 ({username}): {e}")
return False
def parse_time_config():
"""
解析 time.txt只获取状态为 true 的任务
"""
time_map = {}
if not os.path.exists(TIME_FILE_PATH):
print(f"⚠️ 文件不存在: {TIME_FILE_PATH}")
return time_map
try:
with file_lock:
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# 匹配:用户名 时间 true (仅获取待处理任务)
match = re.search(r'(\w+)\s+(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})\s+ok$', line)
if match:
username, scheduled_time = match.group(1), match.group(2)
time_map[username] = scheduled_time
except Exception as e:
print(f"❌ 解析 time.txt 失败: {e}")
return time_map
def normalize_datetime(time_str):
"""
将时间字符串格式化为标准格式YYYY-MM-DD HH:MM:SS
补全单数字的月、日、时
例如2024-1-15 9:52:20 -> 2024-01-15 09:52:20
"""
try:
# 分割日期和时间部分
if ' ' in time_str:
date_part, time_part = time_str.split(' ', 1)
# 补全日期部分的单数字
date_parts = date_part.split('-')
if len(date_parts) == 3:
year = date_parts[0]
month = date_parts[1].zfill(2) # 月补零
day = date_parts[2].zfill(2) # 日补零
date_part = f"{year}-{month}-{day}"
# 补全时间部分的单数字小时
time_parts = time_part.split(':')
if len(time_parts) >= 1:
hour = time_parts[0].zfill(2) # 小时补零
time_part = f"{hour}:{':'.join(time_parts[1:])}"
return f"{date_part} {time_part}"
return time_str
except Exception as e:
print(f"⚠️ 时间格式标准化失败 ({time_str}): {e}")
return time_str
def get_combined_tasks():
"""
结合接口(is_ok==1)和本地文件(ok)筛选任务
"""
try:
local_times = parse_time_config()
if not local_times:
return {}
# 调用你的 API 接口获取账号信息
accounts = apis.get_accounts_from_server("68c0dbfdb7cbcd616e7c5ab5")
if not accounts:
return {}
task_list = {}
# today = datetime.now().strftime("%Y-%m-%d")
for account in accounts:
if account.get('is_ok') == 1:
user = account.get('username')
ip = account.get('device_ip')
port = account.get('device_port')
# 只有在 time.txt 中是 ok 的账号才会被加入
if user in local_times and ip and port:
address = f"{ip}:{port}"
# full_time = f"{today} {local_times[user]}"
# 确保时间是两位数格式
raw_time = local_times[user]
# 将时间格式化为两位数9:52:20 -> 09:52:20
# if ':' in raw_time:
# parts = raw_time.split(':')
# if len(parts[0]) == 1:
# raw_time = f"0{raw_time}" # 补齐前导零
# full_time = f"{today} {raw_time}"
full_time = normalize_datetime(raw_time)
task_list[address] = {"time": full_time, "user": user}
return task_list
except Exception as e:
print(f"❌ 获取任务异常: {e}")
return {}
def run_task(address, target_time, username):
"""
单个执行线程:检查时间 -> 锁定状态 -> 执行 -> 完成
"""
print(f"📅 [任务检查] 设备: {address} | 用户: {username} | 计划时间: {target_time}")
try:
# 1. 检查当前时间是否到达计划时间
target_dt = datetime.strptime(target_time, "%Y-%m-%d %H:%M:%S")
current_dt = datetime.now()
# 如果当前时间还未到达计划时间,直接返回,等待下次轮询
if current_dt < target_dt:
time_diff = int((target_dt - current_dt).total_seconds())
print(f"{username} 计划时间未到,距离执行还有 {time_diff} 秒,等待下次轮询")
return f"{username} 计划时间未到,等待下次轮询"
# 2. 开始执行前,尝试将状态从 ok 改为 running (锁定任务)
# 如果此时文件状态已被其他逻辑修改,则放弃执行,防止重复
print(f"🔒 [准备锁定] 尝试锁定任务状态: {username}")
if not update_file_status(username, "ok", "running"):
return f"⏭️ {username} 状态已变更,跳过执行。"
print(f"🚀 [任务锁定] 设备: {address} | 用户: {username} | 计划时间: {target_time}")
# 3. 调用 main.py 中的自动化逻辑
print(f"▶️ [正在执行] {username} 开始自动化操作...")
automation = DeviceAutomation(address)
result = automation.handle_app_state()
# 4. 执行完成后,将状态从 running 改为 done
update_file_status(username, "running", "done")
return f"{username} 执行成功: {result}"
except Exception as e:
# 如果中间报错,将状态改为 error 方便排查
# 只有在状态已经改为 running 的情况下才需要改为 error
try:
update_file_status(username, "running", "error")
except:
pass
return f"{username} 执行异常: {str(e)}"
def monitor_center():
"""调度中心:每半小时检查一次"""
while True:
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\n{'='*20} 周期性检查开始 ({now_str}) {'='*20}")
tasks = get_combined_tasks()
if tasks:
print(f"📡 发现 {len(tasks)} 个符合条件且未跑过的任务,准备启动线程池...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交任务,将 address, time, username 传入
future_to_user = {
executor.submit(run_task, addr, info['time'], info['user']): info['user']
for addr, info in tasks.items()
}
for f in as_completed(future_to_user):
print(f.result())
else:
print("📭 暂无待处理任务 (所有账号已完成或 is_ok != 1)")
print(f"💤 本轮轮询结束,等待 {POLLING_INTERVAL//60} 分钟进行下次检查...")
time.sleep(POLLING_INTERVAL)
if __name__ == "__main__":
print("🌟 自动化调度服务已启动")
print(f"📂 配置文件: {TIME_FILE_PATH}")
print(f"⏱️ 轮询频率: {POLLING_INTERVAL}")
try:
monitor_center()
except KeyboardInterrupt:
print("\n👋 用户手动停止程序。")