first commit

This commit is contained in:
2026-02-09 15:50:41 +08:00
commit 4e49793416
84 changed files with 255593 additions and 0 deletions

193
scheduler.py Normal file
View File

@@ -0,0 +1,193 @@
import time
import os
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from main import DeviceAutomation
import globals.apis as apis
## --- 配置区 ---
API_URL = "http://your-api-server.com/api/tasks"
MAX_WORKERS = 3
TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
POLLING_INTERVAL = 600 # 10分钟轮询一次
# 文件操作锁,防止多线程同时写入文件造成冲突
file_lock = threading.Lock()
def update_file_status(username, from_status, to_status):
"""
安全地更新 time.txt 中该用户的状态
例如: 将 'true' 改为 'running', 或将 'running' 改为 'done'
"""
if not os.path.exists(TIME_FILE_PATH):
return False
success = False
with file_lock:
try:
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
# new_lines = []
# for line in lines:
# clean_line = line.strip()
# # 匹配逻辑:包含用户名 且 以 from_status 结尾
# if f" {username} " in line and clean_line.endswith(from_status):
# line = line.replace(from_status, to_status)
# success = True
# new_lines.append(line)
new_lines = []
for line in lines:
# 使用正则确保精准匹配用户名和结尾状态
# 匹配规则:行内包含该用户名,且该行以 from_status 结尾
if re.search(rf'\b{username}\b', line) and line.strip().endswith(from_status):
# 只替换行尾的那个状态词
line = re.sub(rf'{from_status}$', to_status, line.rstrip()) + '\n'
success = True
new_lines.append(line)
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
f.writelines(new_lines)
if success:
print(f"📝 [文件更新] 用户 {username}: {from_status} -> {to_status}")
return success
except Exception as e:
print(f"❌ 更新文件状态失败 ({username}): {e}")
return False
def parse_time_config():
"""
解析 time.txt只获取状态为 true 的任务
"""
time_map = {}
if not os.path.exists(TIME_FILE_PATH):
print(f"⚠️ 文件不存在: {TIME_FILE_PATH}")
return time_map
try:
with file_lock:
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# 匹配:用户名 时间 true (仅获取待处理任务)
match = re.search(r'(\w+)\s+(\d{1,2}:\d{2}:\d{2})\s+true$', line)
if match:
username, scheduled_time = match.group(1), match.group(2)
time_map[username] = scheduled_time
except Exception as e:
print(f"❌ 解析 time.txt 失败: {e}")
return time_map
def get_combined_tasks():
"""
结合接口(is_ok==1)和本地文件(true)筛选任务
"""
try:
local_times = parse_time_config()
if not local_times:
return {}
# 调用你的 API 接口获取账号信息
accounts = apis.get_accounts_from_server("68c0dbfdb7cbcd616e7c5ab5")
if not accounts:
return {}
task_list = {}
today = datetime.now().strftime("%Y-%m-%d")
for account in accounts:
if account.get('is_ok') == 1 or account.get('username') == "czyuzongwen":
user = account.get('username')
ip = account.get('device_ip')
port = account.get('device_port')
# 只有在 time.txt 中是 true 的账号才会被加入
if user in local_times and ip and port:
address = f"{ip}:{port}"
# full_time = f"{today} {local_times[user]}"
# 确保时间是两位数格式
raw_time = local_times[user]
# 将时间格式化为两位数9:52:20 -> 09:52:20
if ':' in raw_time:
parts = raw_time.split(':')
if len(parts[0]) == 1:
raw_time = f"0{raw_time}" # 补齐前导零
full_time = f"{today} {raw_time}"
task_list[address] = {"time": full_time, "user": user}
return task_list
except Exception as e:
print(f"❌ 获取任务异常: {e}")
return {}
def run_task(address, target_time, username):
"""
单个执行线程:锁定状态 -> 等待 -> 执行 -> 完成
"""
# 1. 尝试将状态从 true 改为 running (锁定任务)
# 如果此时文件状态已被其他逻辑修改,则放弃执行,防止重复
if not update_file_status(username, "true", "running"):
return f"⏭️ {username} 状态已变更,跳过执行。"
print(f"🚀 [任务锁定] 设备: {address} | 用户: {username} | 计划时间: {target_time}")
try:
# 2. 计算并执行等待逻辑
target_dt = datetime.strptime(target_time, "%Y-%m-%d %H:%M:%S")
wait_secs = (target_dt - datetime.now()).total_seconds()
if wait_secs > 0:
print(f"{username} 距离执行还有 {int(wait_secs)} 秒...")
time.sleep(wait_secs)
# 3. 调用 main.py 中的自动化逻辑
print(f"▶️ [正在执行] {username} 开始自动化操作...")
automation = DeviceAutomation(address)
result = automation.handle_app_state()
# 4. 执行完成后,将状态从 running 改为 done
# update_file_status(username, "running", "done")
return f"{username} 执行成功: {result}"
except Exception as e:
# 如果中间报错,将状态改为 error 方便排查
update_file_status(username, "running", "error")
return f"{username} 执行异常: {str(e)}"
def monitor_center():
"""调度中心:每半小时检查一次"""
while True:
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\n{'='*20} 周期性检查开始 ({now_str}) {'='*20}")
tasks = get_combined_tasks()
if tasks:
print(f"📡 发现 {len(tasks)} 个符合条件且未跑过的任务,准备启动线程池...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交任务,将 address, time, username 传入
future_to_user = {
executor.submit(run_task, addr, info['time'], info['user']): info['user']
for addr, info in tasks.items()
}
for f in as_completed(future_to_user):
print(f.result())
else:
print("📭 暂无待处理任务 (所有账号已完成或 is_ok != 1)")
print(f"💤 本轮轮询结束,等待 {POLLING_INTERVAL//60} 分钟进行下次检查...")
time.sleep(POLLING_INTERVAL)
if __name__ == "__main__":
print("🌟 自动化调度服务已启动")
print(f"📂 配置文件: {TIME_FILE_PATH}")
print(f"⏱️ 轮询频率: {POLLING_INTERVAL}")
try:
monitor_center()
except KeyboardInterrupt:
print("\n👋 用户手动停止程序。")