import time import os import re import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from main import DeviceAutomation import globals.apis as apis ## --- 配置区 --- API_URL = "http://your-api-server.com/api/tasks" MAX_WORKERS = 3 TIME_FILE_PATH = r"D:\uploadInfo\time.txt" POLLING_INTERVAL = 600 # 10分钟轮询一次 # 文件操作锁,防止多线程同时写入文件造成冲突 file_lock = threading.Lock() def update_file_status(username, from_status, to_status): """ 安全地更新 time.txt 中该用户的状态 例如: 将 'true' 改为 'running', 或将 'running' 改为 'done' """ if not os.path.exists(TIME_FILE_PATH): return False success = False with file_lock: try: with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f: lines = f.readlines() # new_lines = [] # for line in lines: # clean_line = line.strip() # # 匹配逻辑:包含用户名 且 以 from_status 结尾 # if f" {username} " in line and clean_line.endswith(from_status): # line = line.replace(from_status, to_status) # success = True # new_lines.append(line) new_lines = [] for line in lines: # 使用正则确保精准匹配用户名和结尾状态 # 匹配规则:行内包含该用户名,且该行以 from_status 结尾 if re.search(rf'\b{username}\b', line) and line.strip().endswith(from_status): # 只替换行尾的那个状态词 line = re.sub(rf'{from_status}$', to_status, line.rstrip()) + '\n' success = True new_lines.append(line) with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f: f.writelines(new_lines) if success: print(f"📝 [文件更新] 用户 {username}: {from_status} -> {to_status}") return success except Exception as e: print(f"❌ 更新文件状态失败 ({username}): {e}") return False def parse_time_config(): """ 解析 time.txt,只获取状态为 true 的任务 """ time_map = {} if not os.path.exists(TIME_FILE_PATH): print(f"⚠️ 文件不存在: {TIME_FILE_PATH}") return time_map try: with file_lock: with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # 匹配:用户名 时间 true (仅获取待处理任务) match = re.search(r'(\w+)\s+(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})\s+true$', line) if match: username, scheduled_time = match.group(1), match.group(2) time_map[username] = scheduled_time except Exception as e: print(f"❌ 解析 time.txt 失败: {e}") return time_map def normalize_datetime(time_str): """ 将时间字符串格式化为标准格式:YYYY-MM-DD HH:MM:SS 补全单数字的月、日、时 例如:2024-1-15 9:52:20 -> 2024-01-15 09:52:20 """ try: # 分割日期和时间部分 if ' ' in time_str: date_part, time_part = time_str.split(' ', 1) # 补全日期部分的单数字 date_parts = date_part.split('-') if len(date_parts) == 3: year = date_parts[0] month = date_parts[1].zfill(2) # 月补零 day = date_parts[2].zfill(2) # 日补零 date_part = f"{year}-{month}-{day}" # 补全时间部分的单数字小时 time_parts = time_part.split(':') if len(time_parts) >= 1: hour = time_parts[0].zfill(2) # 小时补零 time_part = f"{hour}:{':'.join(time_parts[1:])}" return f"{date_part} {time_part}" return time_str except Exception as e: print(f"⚠️ 时间格式标准化失败 ({time_str}): {e}") return time_str def get_combined_tasks(): """ # 结合接口(is_ok==1)和本地文件(true)筛选任务 结合接口(is_ok==1)和获取解析云端轨迹文件筛选任务 """ try: # 调用你的 API 接口获取账号信息 accounts = apis.get_accounts_from_server("68c0dbfdb7cbcd616e7c5ab5") if not accounts: return {} task_list = {} today = datetime.now().strftime("%Y%m%d") for account in accounts: if account.get('is_ok') == 1: user = account.get('username') ip = account.get('device_ip') port = account.get('device_port') project_name = account.get('project_name') # 获取轨迹文件 save_path = apis.fetch_and_save_track_file(f"https://database.yuxindazhineng.com/team-bucket/69378c5b4f42d83d9504560d/track/{today}/{today}_1_{user}.txt") if save_path: # 1. 解析原始文件 raw_records = apis.parse_track_file(save_path) # 2. 按线路ID去重(重复则保留后面的记录) deduplicated_records = apis.deduplicate_by_line_id(raw_records) # 3. 从去重后的记录中找最晚结束时间 latest_time = apis.get_latest_end_time(deduplicated_records) # 4. 写入end.txt # write_success = apis.write_to_file(latest_time) write_success = apis.writer_file_status(user, latest_time, "true") time.sleep(2) # 获取状态为true的账号 local_times = parse_time_config() if not local_times: return {} # 只有在写入成功且 time.txt 中是 true 的账号才会被加入 if write_success and user in local_times and ip and port: address = f"{ip}:{port}" # 确保时间是两位数格式 raw_time = local_times[user] full_time = normalize_datetime(raw_time) task_list[address] = {"time": full_time, "user": user, "project_name": project_name} elif not write_success: print(f"❌ 写入end.txt失败,跳过任务: {user}") return task_list except Exception as e: print(f"❌ 获取任务异常: {e}") return {} def run_task(address, target_time, username, project_name): """ 单个执行线程:锁定状态 -> 等待 -> 执行 -> 完成 """ # 1. 尝试将状态从 true 改为 running (锁定任务) # 如果此时文件状态已被其他逻辑修改,则放弃执行,防止重复 if not update_file_status(username, "true", "running"): return f"⏭️ {username} 状态已变更,跳过执行。" print(f"🚀 [任务锁定] 设备: {address} | 用户: {username} | 计划时间: {target_time} | 项目: {project_name}") try: # 2. 计算并执行等待逻辑 # target_dt = datetime.strptime(target_time, "%Y-%m-%d %H:%M:%S") # wait_secs = (target_dt - datetime.now()).total_seconds() # if wait_secs > 0: # print(f"⏳ {username} 距离执行还有 {int(wait_secs)} 秒...") # time.sleep(wait_secs) # 3. 调用 main.py 中的自动化逻辑 print(f"▶️ [正在执行] {username} 开始自动化操作...") automation = DeviceAutomation(address, project_name) result = automation.handle_app_state() # 4. 执行完成后,将状态从 running 改为 done # update_file_status(username, "running", "done") return f"✅ {username} 执行成功: {result}" except Exception as e: # 如果中间报错,将状态改为 error 方便排查 update_file_status(username, "running", "error") return f"❌ {username} 执行异常: {str(e)}" def monitor_center(): """调度中心:每半小时检查一次""" while True: now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"\n{'='*20} 周期性检查开始 ({now_str}) {'='*20}") tasks = get_combined_tasks() if tasks: print(f"📡 发现 {len(tasks)} 个符合条件且未跑过的任务,准备启动线程池...") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # 提交任务,将 address, time, username, project_name 传入 future_to_user = { executor.submit(run_task, addr, info['time'], info['user'], info.get('project_name', '')): info['user'] for addr, info in tasks.items() } for f in as_completed(future_to_user): print(f.result()) else: print("📭 暂无待处理任务 (所有账号已完成或 is_ok != 1)") print(f"💤 本轮轮询结束,等待 {POLLING_INTERVAL//60} 分钟进行下次检查...") time.sleep(POLLING_INTERVAL) if __name__ == "__main__": print("🌟 自动化调度服务已启动") print(f"📂 配置文件: {TIME_FILE_PATH}") print(f"⏱️ 轮询频率: {POLLING_INTERVAL}秒") try: monitor_center() except KeyboardInterrupt: print("\n👋 用户手动停止程序。")