修改上传人员信息表为D盘固定位置;新增上传弹窗检查
This commit is contained in:
288
scheduler.py
288
scheduler.py
@@ -1,185 +1,193 @@
|
||||
import requests
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
from datetime import datetime
|
||||
from main import DeviceAutomation
|
||||
import globals.apis as apis
|
||||
import random
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
## --- 配置区 ---
|
||||
API_URL = "http://your-api-server.com/api/tasks" # 替换为真实的接口地址
|
||||
MAX_WORKERS = 3 # 最大并发线程数,建议根据网络带宽调整
|
||||
TIME_FILE_PATH = r"D:\time.txt" # 电脑D盘下的路径
|
||||
API_URL = "http://your-api-server.com/api/tasks"
|
||||
MAX_WORKERS = 3
|
||||
TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
|
||||
POLLING_INTERVAL = 600 # 10分钟轮询一次
|
||||
|
||||
# 文件操作锁,防止多线程同时写入文件造成冲突
|
||||
file_lock = threading.Lock()
|
||||
|
||||
def update_file_status(username, from_status, to_status):
|
||||
"""
|
||||
安全地更新 time.txt 中该用户的状态
|
||||
例如: 将 'true' 改为 'running', 或将 'running' 改为 'done'
|
||||
"""
|
||||
if not os.path.exists(TIME_FILE_PATH):
|
||||
return False
|
||||
|
||||
success = False
|
||||
with file_lock:
|
||||
try:
|
||||
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
|
||||
lines = f.readlines()
|
||||
|
||||
# new_lines = []
|
||||
# for line in lines:
|
||||
# clean_line = line.strip()
|
||||
# # 匹配逻辑:包含用户名 且 以 from_status 结尾
|
||||
# if f" {username} " in line and clean_line.endswith(from_status):
|
||||
# line = line.replace(from_status, to_status)
|
||||
# success = True
|
||||
# new_lines.append(line)
|
||||
new_lines = []
|
||||
for line in lines:
|
||||
# 使用正则确保精准匹配用户名和结尾状态
|
||||
# 匹配规则:行内包含该用户名,且该行以 from_status 结尾
|
||||
if re.search(rf'\b{username}\b', line) and line.strip().endswith(from_status):
|
||||
# 只替换行尾的那个状态词
|
||||
line = re.sub(rf'{from_status}$', to_status, line.rstrip()) + '\n'
|
||||
success = True
|
||||
new_lines.append(line)
|
||||
|
||||
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
|
||||
f.writelines(new_lines)
|
||||
|
||||
if success:
|
||||
print(f"📝 [文件更新] 用户 {username}: {from_status} -> {to_status}")
|
||||
return success
|
||||
except Exception as e:
|
||||
print(f"❌ 更新文件状态失败 ({username}): {e}")
|
||||
return False
|
||||
|
||||
def parse_time_config():
|
||||
"""
|
||||
解析 D:\time.txt 文件
|
||||
返回格式: { "用户名": "16:40:20", ... }
|
||||
解析 time.txt,只获取状态为 true 的任务
|
||||
"""
|
||||
time_map = {}
|
||||
if not os.path.exists(TIME_FILE_PATH):
|
||||
print(f"⚠️ 未找到配置文件: {TIME_FILE_PATH}")
|
||||
print(f"⚠️ 文件不存在: {TIME_FILE_PATH}")
|
||||
return time_map
|
||||
|
||||
try:
|
||||
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# 使用正则匹配用户名、时间、状态
|
||||
# 兼容 wangshun 16:40:20 true 和 cdwzq3liangchaoyong 15:06:35 true
|
||||
match = re.search(r'(\w+)\s+(\d{2}:\d{2}:\d{2})\s+true', line)
|
||||
if match:
|
||||
username = match.group(1)
|
||||
scheduled_time = match.group(2)
|
||||
time_map[username] = scheduled_time
|
||||
with file_lock:
|
||||
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
# 匹配:用户名 时间 true (仅获取待处理任务)
|
||||
match = re.search(r'(\w+)\s+(\d{1,2}:\d{2}:\d{2})\s+true$', line)
|
||||
if match:
|
||||
username, scheduled_time = match.group(1), match.group(2)
|
||||
time_map[username] = scheduled_time
|
||||
except Exception as e:
|
||||
print(f"❌ 解析 time.txt 失败: {e}")
|
||||
|
||||
return time_map
|
||||
|
||||
def get_remote_tasks():
|
||||
def get_combined_tasks():
|
||||
"""
|
||||
从接口获取数据
|
||||
结合接口(is_ok==1)和本地文件(true)筛选任务
|
||||
"""
|
||||
try:
|
||||
# 1. 先获取本地文件中的配置
|
||||
local_times = parse_time_config()
|
||||
if not local_times:
|
||||
print("❌ time.txt 中没有有效的 true 任务或文件为空")
|
||||
return {}
|
||||
|
||||
# 2. 从服务器获取账户
|
||||
|
||||
# 调用你的 API 接口获取账号信息
|
||||
accounts = apis.get_accounts_from_server("68c0dbfdb7cbcd616e7c5ab5")
|
||||
if not accounts:
|
||||
print("❌ 未从服务器获取到账户信息,终止流程")
|
||||
return {}
|
||||
|
||||
# filtered_accounts = [account for account in accounts if account.get('is_ok') == 1]
|
||||
# # print("✅ 获取账户信息成功", filtered_accounts)
|
||||
# # current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
# # 从原始 accounts 数据中筛选有 device_ip 和 device_port 且不为 None 的账户
|
||||
# device_dict = {}
|
||||
|
||||
# for account in filtered_accounts:
|
||||
# device_ip = account.get('device_ip')
|
||||
# device_port = account.get('device_port')
|
||||
|
||||
# # 检查 device_ip 和 device_port 是否都存在且不为 None
|
||||
# if device_ip and device_port:
|
||||
# # 拼接为 ip:port 格式
|
||||
# key = f"{device_ip}:{device_port}"
|
||||
# # 添加当前时间
|
||||
# # device_dict[key] = current_time
|
||||
# # 获取当前时间的 datetime 对象
|
||||
# current_datetime = datetime.now()
|
||||
|
||||
# # 生成1-2小时的随机时间
|
||||
# random_hours = random.uniform(1, 2)
|
||||
|
||||
# # 计算未来时间(datetime 对象可以加 timedelta)
|
||||
# future_datetime = current_datetime + timedelta(hours=random_hours)
|
||||
|
||||
# # 将 datetime 对象格式化为字符串
|
||||
# time_str = future_datetime.strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
# # 添加时间字符串到字典
|
||||
# device_dict[key] = time_str
|
||||
|
||||
|
||||
# # 结果
|
||||
# print(device_dict)
|
||||
# return device_dict
|
||||
device_dict = {}
|
||||
today_str = datetime.now().strftime("%Y-%m-%d")
|
||||
task_list = {}
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
for account in accounts:
|
||||
# 条件1: 接口返回 is_ok == 1
|
||||
if account.get('is_ok') != 1:
|
||||
continue
|
||||
|
||||
username = account.get('username') # 假设接口中用户名的 key 是 username
|
||||
device_ip = account.get('device_ip')
|
||||
device_port = account.get('device_port')
|
||||
|
||||
# 条件2: 用户名在 time.txt 中且状态为 true (已经在 local_times 过滤过)
|
||||
if username in local_times and device_ip and device_port:
|
||||
key = f"{device_ip}:{device_port}"
|
||||
# 拼接成完整的 YYYY-MM-DD HH:MM:SS
|
||||
full_time_str = f"{today_str} {local_times[username]}"
|
||||
device_dict[key] = full_time_str
|
||||
|
||||
print(f"✅ 匹配成功,待执行任务详情: {device_dict}")
|
||||
return device_dict
|
||||
# # 模拟数据供测试
|
||||
# return {
|
||||
# "192.168.1.45:5555": "2026-02-03 10:20:00",
|
||||
# "192.168.1.100:5556": "2026-02-03 10:20:20",
|
||||
# "192.168.31.12:5557": "2026-02-03 10:21:00"
|
||||
# }
|
||||
if account.get('is_ok') == 1:
|
||||
user = account.get('username')
|
||||
ip = account.get('device_ip')
|
||||
port = account.get('device_port')
|
||||
|
||||
# 只有在 time.txt 中是 true 的账号才会被加入
|
||||
if user in local_times and ip and port:
|
||||
address = f"{ip}:{port}"
|
||||
# full_time = f"{today} {local_times[user]}"
|
||||
# 确保时间是两位数格式
|
||||
raw_time = local_times[user]
|
||||
# 将时间格式化为两位数:9:52:20 -> 09:52:20
|
||||
if ':' in raw_time:
|
||||
parts = raw_time.split(':')
|
||||
if len(parts[0]) == 1:
|
||||
raw_time = f"0{raw_time}" # 补齐前导零
|
||||
|
||||
full_time = f"{today} {raw_time}"
|
||||
task_list[address] = {"time": full_time, "user": user}
|
||||
|
||||
return task_list
|
||||
except Exception as e:
|
||||
print(f"❌ 获取接口任务失败: {e}")
|
||||
print(f"❌ 获取任务异常: {e}")
|
||||
return {}
|
||||
|
||||
def run_task(address, target_time):
|
||||
def run_task(address, target_time, username):
|
||||
"""
|
||||
单个线程的任务包装器
|
||||
单个执行线程:锁定状态 -> 等待 -> 执行 -> 完成
|
||||
"""
|
||||
# 格式化账号/设备名:确保带上端口号
|
||||
device_address = address
|
||||
|
||||
print(f"🕒 [等待/启动] 设备: {device_address} | 预定时间: {target_time}")
|
||||
# 解析目标时间
|
||||
target_datetime = datetime.strptime(target_time, "%Y-%m-%d %H:%M:%S")
|
||||
current_datetime = datetime.now()
|
||||
|
||||
# 计算等待时间
|
||||
wait_seconds = (target_datetime - current_datetime).total_seconds()
|
||||
|
||||
# 如果需要等待
|
||||
if wait_seconds > 0:
|
||||
print(f"⏳ [等待中] 设备: {device_address} | 等待时间: {wait_seconds:.2f}秒")
|
||||
time.sleep(wait_seconds)
|
||||
print(f"▶️ [开始执行] 设备: {device_address} | 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
else:
|
||||
print(f"▶️ [开始执行] 设备: {device_address} | 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
# 1. 尝试将状态从 true 改为 running (锁定任务)
|
||||
# 如果此时文件状态已被其他逻辑修改,则放弃执行,防止重复
|
||||
if not update_file_status(username, "true", "running"):
|
||||
return f"⏭️ {username} 状态已变更,跳过执行。"
|
||||
|
||||
print(f"🚀 [任务锁定] 设备: {address} | 用户: {username} | 计划时间: {target_time}")
|
||||
|
||||
try:
|
||||
# 创建DeviceAutomation实例并执行上传逻辑
|
||||
automation = DeviceAutomation(device_address)
|
||||
# 2. 计算并执行等待逻辑
|
||||
target_dt = datetime.strptime(target_time, "%Y-%m-%d %H:%M:%S")
|
||||
wait_secs = (target_dt - datetime.now()).total_seconds()
|
||||
|
||||
if wait_secs > 0:
|
||||
print(f"⏳ {username} 距离执行还有 {int(wait_secs)} 秒...")
|
||||
time.sleep(wait_secs)
|
||||
|
||||
# 3. 调用 main.py 中的自动化逻辑
|
||||
print(f"▶️ [正在执行] {username} 开始自动化操作...")
|
||||
automation = DeviceAutomation(address)
|
||||
result = automation.handle_app_state()
|
||||
return f"✅ {device_address} 完成: {result}"
|
||||
|
||||
# 4. 执行完成后,将状态从 running 改为 done
|
||||
update_file_status(username, "running", "done")
|
||||
return f"✅ {username} 执行成功: {result}"
|
||||
|
||||
except Exception as e:
|
||||
return f"❌ {device_address} 报错: {str(e)}"
|
||||
# 如果中间报错,将状态改为 error 方便排查
|
||||
update_file_status(username, "running", "error")
|
||||
return f"❌ {username} 执行异常: {str(e)}"
|
||||
|
||||
def monitor_center():
|
||||
"""调度中心"""
|
||||
tasks_data = get_remote_tasks()
|
||||
|
||||
if not tasks_data:
|
||||
print("📭 接口未返回任何任务,程序退出。")
|
||||
return
|
||||
"""调度中心:每半小时检查一次"""
|
||||
while True:
|
||||
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
print(f"\n{'='*20} 周期性检查开始 ({now_str}) {'='*20}")
|
||||
|
||||
tasks = get_combined_tasks()
|
||||
|
||||
if tasks:
|
||||
print(f"📡 发现 {len(tasks)} 个符合条件且未跑过的任务,准备启动线程池...")
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
# 提交任务,将 address, time, username 传入
|
||||
future_to_user = {
|
||||
executor.submit(run_task, addr, info['time'], info['user']): info['user']
|
||||
for addr, info in tasks.items()
|
||||
}
|
||||
|
||||
for f in as_completed(future_to_user):
|
||||
print(f.result())
|
||||
else:
|
||||
print("📭 暂无待处理任务 (所有账号已完成或 is_ok != 1)")
|
||||
|
||||
print(f"🗂️ 发现 {len(tasks_data)} 个待处理账号,开始建立线程池...")
|
||||
|
||||
# 使用线程池并发执行
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
# 提交所有任务
|
||||
future_to_device = {
|
||||
executor.submit(run_task, acc, t): acc
|
||||
for acc, t in tasks_data.items()
|
||||
}
|
||||
|
||||
# 实时打印完成情况
|
||||
for future in as_completed(future_to_device):
|
||||
print(future.result())
|
||||
print(f"💤 本轮轮询结束,等待 {POLLING_INTERVAL//60} 分钟进行下次检查...")
|
||||
time.sleep(POLLING_INTERVAL)
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("🚀 自动化调度程序启动...")
|
||||
monitor_center()
|
||||
print("🏁 所有并发任务处理序列结束。")
|
||||
print("🌟 自动化调度服务已启动")
|
||||
print(f"📂 配置文件: {TIME_FILE_PATH}")
|
||||
print(f"⏱️ 轮询频率: {POLLING_INTERVAL}秒")
|
||||
try:
|
||||
monitor_center()
|
||||
except KeyboardInterrupt:
|
||||
print("\n👋 用户手动停止程序。")
|
||||
Reference in New Issue
Block a user