Files
cjgc_upload/scheduler.py
2026-02-10 09:20:12 +08:00

193 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import os
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from main import DeviceAutomation
import globals.apis as apis
## --- 配置区 ---
API_URL = "http://your-api-server.com/api/tasks"
MAX_WORKERS = 3
TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
POLLING_INTERVAL = 600 # 10分钟轮询一次
# 文件操作锁,防止多线程同时写入文件造成冲突
file_lock = threading.Lock()
def update_file_status(username, from_status, to_status):
"""
安全地更新 time.txt 中该用户的状态
例如: 将 'true' 改为 'running', 或将 'running' 改为 'done'
"""
if not os.path.exists(TIME_FILE_PATH):
return False
success = False
with file_lock:
try:
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
# new_lines = []
# for line in lines:
# clean_line = line.strip()
# # 匹配逻辑:包含用户名 且 以 from_status 结尾
# if f" {username} " in line and clean_line.endswith(from_status):
# line = line.replace(from_status, to_status)
# success = True
# new_lines.append(line)
new_lines = []
for line in lines:
# 使用正则确保精准匹配用户名和结尾状态
# 匹配规则:行内包含该用户名,且该行以 from_status 结尾
if re.search(rf'\b{username}\b', line) and line.strip().endswith(from_status):
# 只替换行尾的那个状态词
line = re.sub(rf'{from_status}$', to_status, line.rstrip()) + '\n'
success = True
new_lines.append(line)
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
f.writelines(new_lines)
if success:
print(f"📝 [文件更新] 用户 {username}: {from_status} -> {to_status}")
return success
except Exception as e:
print(f"❌ 更新文件状态失败 ({username}): {e}")
return False
def parse_time_config():
"""
解析 time.txt只获取状态为 true 的任务
"""
time_map = {}
if not os.path.exists(TIME_FILE_PATH):
print(f"⚠️ 文件不存在: {TIME_FILE_PATH}")
return time_map
try:
with file_lock:
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# 匹配:用户名 时间 true (仅获取待处理任务)
match = re.search(r'(\w+)\s+(\d{1,2}:\d{2}:\d{2})\s+ok$', line)
if match:
username, scheduled_time = match.group(1), match.group(2)
time_map[username] = scheduled_time
except Exception as e:
print(f"❌ 解析 time.txt 失败: {e}")
return time_map
def get_combined_tasks():
"""
结合接口(is_ok==1)和本地文件(ok)筛选任务
"""
try:
local_times = parse_time_config()
if not local_times:
return {}
# 调用你的 API 接口获取账号信息
accounts = apis.get_accounts_from_server("68c0dbfdb7cbcd616e7c5ab5")
if not accounts:
return {}
task_list = {}
today = datetime.now().strftime("%Y-%m-%d")
for account in accounts:
if account.get('is_ok') == 1 or account.get('username') == "CZSCZQ13A1xuliguo":
user = account.get('username')
ip = account.get('device_ip')
port = account.get('device_port')
# 只有在 time.txt 中是 ok 的账号才会被加入
if user in local_times and ip and port:
address = f"{ip}:{port}"
# full_time = f"{today} {local_times[user]}"
# 确保时间是两位数格式
raw_time = local_times[user]
# 将时间格式化为两位数9:52:20 -> 09:52:20
if ':' in raw_time:
parts = raw_time.split(':')
if len(parts[0]) == 1:
raw_time = f"0{raw_time}" # 补齐前导零
full_time = f"{today} {raw_time}"
task_list[address] = {"time": full_time, "user": user}
return task_list
except Exception as e:
print(f"❌ 获取任务异常: {e}")
return {}
def run_task(address, target_time, username):
"""
单个执行线程:锁定状态 -> 等待 -> 执行 -> 完成
"""
# 1. 尝试将状态从 ok 改为 running (锁定任务)
# 如果此时文件状态已被其他逻辑修改,则放弃执行,防止重复
if not update_file_status(username, "ok", "running"):
return f"⏭️ {username} 状态已变更,跳过执行。"
print(f"🚀 [任务锁定] 设备: {address} | 用户: {username} | 计划时间: {target_time}")
try:
# 2. 计算并执行等待逻辑
target_dt = datetime.strptime(target_time, "%Y-%m-%d %H:%M:%S")
wait_secs = (target_dt - datetime.now()).total_seconds()
if wait_secs > 0:
print(f"{username} 距离执行还有 {int(wait_secs)} 秒...")
time.sleep(wait_secs)
# 3. 调用 main.py 中的自动化逻辑
print(f"▶️ [正在执行] {username} 开始自动化操作...")
automation = DeviceAutomation(address)
result = automation.handle_app_state()
# 4. 执行完成后,将状态从 running 改为 done
update_file_status(username, "running", "done")
return f"{username} 执行成功: {result}"
except Exception as e:
# 如果中间报错,将状态改为 error 方便排查
update_file_status(username, "running", "error")
return f"{username} 执行异常: {str(e)}"
def monitor_center():
"""调度中心:每半小时检查一次"""
while True:
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\n{'='*20} 周期性检查开始 ({now_str}) {'='*20}")
tasks = get_combined_tasks()
if tasks:
print(f"📡 发现 {len(tasks)} 个符合条件且未跑过的任务,准备启动线程池...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交任务,将 address, time, username 传入
future_to_user = {
executor.submit(run_task, addr, info['time'], info['user']): info['user']
for addr, info in tasks.items()
}
for f in as_completed(future_to_user):
print(f.result())
else:
print("📭 暂无待处理任务 (所有账号已完成或 is_ok != 1)")
print(f"💤 本轮轮询结束,等待 {POLLING_INTERVAL//60} 分钟进行下次检查...")
time.sleep(POLLING_INTERVAL)
if __name__ == "__main__":
print("🌟 自动化调度服务已启动")
print(f"📂 配置文件: {TIME_FILE_PATH}")
print(f"⏱️ 轮询频率: {POLLING_INTERVAL}")
try:
monitor_center()
except KeyboardInterrupt:
print("\n👋 用户手动停止程序。")