diff --git a/__pycache__/main.cpython-312.pyc b/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..2240957 Binary files /dev/null and b/__pycache__/main.cpython-312.pyc differ diff --git a/__pycache__/permissions.cpython-312.pyc b/__pycache__/permissions.cpython-312.pyc index 23118b8..13e09c3 100644 Binary files a/__pycache__/permissions.cpython-312.pyc and b/__pycache__/permissions.cpython-312.pyc differ diff --git a/create_a_link.py b/create_a_link.py new file mode 100644 index 0000000..5bff38a --- /dev/null +++ b/create_a_link.py @@ -0,0 +1,315 @@ +import subprocess +import re +import time +import requests +import json +from appium import webdriver +from appium.webdriver.common.appiumby import AppiumBy +from appium.options.android import UiAutomator2Options +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + + +# ======================= +# 基础工具函数 +# ======================= + +def run_command(command): + """执行系统命令并返回输出""" + result = subprocess.run(command, shell=True, capture_output=True, text=True) + return result.stdout.strip() + + +# ======================= +# API请求函数 +# ======================= + +def get_accounts_from_server(yh_id): + """从服务器获取账户信息""" + url = "http://www.yuxindazhineng.com:3002/api/accounts/get_uplaod_data" + headers = { + "Content-Type": "application/json" + } + data = { + "yh_id": yh_id + } + + try: + print(f"🔍 查询服务器账户信息,用户ID: {yh_id}") + response = requests.post(url, headers=headers, json=data, timeout=10) + + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + print(f"✅ 查询成功,找到 {result.get('total', 0)} 个账户") + return result.get("data", []) + else: + print(f"❌ 查询失败: {result.get('message', '未知错误')}") + return [] + else: + print(f"❌ 服务器响应错误: {response.status_code}") + return [] + except requests.exceptions.RequestException as e: + print(f"❌ 网络请求失败: {e}") + return [] + except json.JSONDecodeError as e: + print(f"❌ JSON解析失败: {e}") + return [] + + +def update_device_info(account_id, device_name, device_port, device_ip): + """更新设备信息到服务器""" + url = "http://www.yuxindazhineng.com:3002/api/accounts/update" + headers = { + "Content-Type": "application/json" + } + data = { + "account_id": str(account_id), + "account_data": { + "device_name": device_name, + "device_port": device_port, + "device_ip": device_ip + } + } + + try: + print(f"🔄 更新设备信息,账户ID: {account_id}") + print(f" 设备信息: 名称={device_name}, 端口={device_port}, IP={device_ip}") + + response = requests.post(url, headers=headers, json=data, timeout=10) + + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + print(f"✅ 更新成功: {result.get('message', '未知信息')}") + return True + else: + print(f"❌ 更新失败: {result.get('message', '未知错误')}") + return False + else: + print(f"❌ 服务器响应错误: {response.status_code}") + return False + except requests.exceptions.RequestException as e: + print(f"❌ 网络请求失败: {e}") + return False + + +# ======================= +# Appium 启动 +# ======================= + +def start_appium(): + print("🚀 启动 Appium Server ...") + subprocess.Popen( + ["appium.cmd", "-a", "127.0.0.1", "-p", "4723"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + time.sleep(5) # 给 Appium 启动时间 + print("✅ Appium Server 已启动") + + +# ======================= +# 启动沉降观测 App +# ======================= + +def start_settlement_app(device_id, device_ip, device_port): + print(f"📱 使用 Appium 连接设备: {device_id}") + + options = UiAutomator2Options() + options.platform_name = "Android" + options.device_name = device_id + options.udid = device_id + + # ⚠️ TODO:替换为你的真实信息 + options.app_package = "com.bjjw.cjgc" + options.app_activity = ".activity.LoginActivity" + + options.automation_name = "UiAutomator2" + options.no_reset = True + options.auto_grant_permissions = True + options.new_command_timeout = 28800 + + # 超时增强(无线 ADB 必须) + options.set_capability("uiautomator2ServerLaunchTimeout", 60000) + options.set_capability("adbExecTimeout", 120000) + + driver = webdriver.Remote( + "http://127.0.0.1:4723", + options=options + ) + + # 使用ADB命令启动Activity + try: + adb_command = f"adb -s {device_id} shell am start -n com.bjjw.cjgc/.activity.LoginActivity" + result = subprocess.run(adb_command, shell=True, capture_output=True, text=True) + if result.returncode == 0: + time.sleep(1) # 等待Activity启动 + except Exception: + return False + + print("✅ 沉降观测 App 已成功启动") + + # ======================= + # 获取用户名文本框内容 + # ======================= + app_username = None + try: + print("🔍 尝试获取用户名文本框内容...") + + # 创建显式等待对象 + wait = WebDriverWait(driver, 15) # 最多等待15秒 + + # 等待用户名文本框可点击 + username_field = wait.until( + EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/et_user_name")) + ) + + # 获取文本框中的文本内容 + app_username = username_field.text + + # 如果文本框是空的,尝试使用get_attribute获取文本 + if not app_username: + app_username = username_field.get_attribute("text") + + # 如果还是空的,尝试获取其他属性 + if not app_username: + app_username = username_field.get_attribute("content-desc") or username_field.get_attribute("label") + + if app_username: + print(f"✅ 成功获取到用户名: {app_username}") + else: + print("⚠️ 用户名文本框为空") + + except Exception as e: + print(f"❌ 获取用户名失败: {e}") + + return driver, app_username + + +# ======================= +# 无线 ADB 建链主流程 +# ======================= + +def setup_adb_wireless(port="5555", yh_id="68c0dbfdb7cbcd616e7c5ab5"): + print(f"🚀 开始无线 ADB 建链(端口 {port})") + print(f"📋 用户ID: {yh_id}") + + # 从服务器获取账户信息 + accounts = get_accounts_from_server(yh_id) + if not accounts: + print("❌ 未从服务器获取到账户信息,终止流程") + return + + devices_output = run_command("adb devices") + lines = devices_output.splitlines()[1:] + + usb_devices = [] + + for line in lines: + if not line.strip(): + continue + + device_id = line.split()[0] + + # 跳过已经是无线的 + if ":" in device_id: + continue + + usb_devices.append(device_id) + + if not usb_devices: + print("❌ 未检测到 USB 设备") + return + + for serial in usb_devices: + print(f"\n🔎 处理设备: {serial}") + + # 获取 WLAN IP + ip_info = run_command(f"adb -s {serial} shell ip addr show wlan0") + ip_match = re.search(r'inet\s+(\d+\.\d+\.\d+\.\d+)', ip_info) + + if not ip_match: + print("⚠️ 获取 IP 失败,请确认已连接 WiFi") + continue + + device_ip = ip_match.group(1) + print(f"📍 设备 IP: {device_ip}") + + # 切 TCP 模式 + run_command(f"adb -s {serial} tcpip {port}") + time.sleep(2) + + # 无线连接 + connect_result = run_command(f"adb connect {device_ip}:{port}") + + if "connected" not in connect_result.lower(): + print(f"❌ 无线连接失败: {connect_result}") + continue + + wireless_id = f"{device_ip}:{port}" + print(f"✅ 无线 ADB 成功: {wireless_id}") + + # ===== 后续自动化 ===== + start_appium() + driver, app_username = start_settlement_app(wireless_id, device_ip, port) + + if not app_username: + print("⚠️ 未获取到App中的用户名,跳过服务器更新") + continue + + # 在账户列表中查找匹配的用户名 + matched_account = None + for account in accounts: + if account.get("username") == app_username: + matched_account = account + break + + if not matched_account: + print(f"❌ 未找到与用户名 '{app_username}' 匹配的账户") + continue + + print(f"✅ 找到匹配账户: {matched_account.get('cl_name')} ({matched_account.get('username')})") + print(f" account_id: {matched_account.get('account_id')}") + + # 更新设备信息到服务器 + device_name = serial # 使用设备序列号作为设备名称 + + # 构建更新数据 + update_data = { + "account_id": matched_account.get("account_id"), + "device_name": device_name, + "device_port": port, + "device_ip": device_ip + } + + success = update_device_info( + account_id=matched_account.get("account_id"), + device_name=device_name, + device_port=port, + device_ip=device_ip + ) + + if success: + print(f"🎉 所有操作完成! 账户 {matched_account.get('username')} 的设备信息已更新") + else: + print(f"⚠️ 设备信息更新失败,但无线连接和App启动已完成") + + # 关闭Appium连接 + if driver: + print("🔄 关闭Appium连接...") + driver.quit() + + break # 处理完第一个设备后退出,如需处理多个设备可移除此行 + + +# ======================= +# 程序入口 +# ======================= + +if __name__ == "__main__": + # 配置参数 + TARGET_PORT = "5555" + USER_ID = "68c0dbfdb7cbcd616e7c5ab5" # 替换为实际的用户ID + + setup_adb_wireless(TARGET_PORT, USER_ID) \ No newline at end of file diff --git a/globals/__pycache__/apis.cpython-312.pyc b/globals/__pycache__/apis.cpython-312.pyc index 3e0e4db..6c95118 100644 Binary files a/globals/__pycache__/apis.cpython-312.pyc and b/globals/__pycache__/apis.cpython-312.pyc differ diff --git a/globals/__pycache__/driver_utils.cpython-312.pyc b/globals/__pycache__/driver_utils.cpython-312.pyc index 81dc29d..ec35272 100644 Binary files a/globals/__pycache__/driver_utils.cpython-312.pyc and b/globals/__pycache__/driver_utils.cpython-312.pyc differ diff --git a/globals/__pycache__/global_variable.cpython-312.pyc b/globals/__pycache__/global_variable.cpython-312.pyc index 6e5b65c..218bd49 100644 Binary files a/globals/__pycache__/global_variable.cpython-312.pyc and b/globals/__pycache__/global_variable.cpython-312.pyc differ diff --git a/globals/apis.py b/globals/apis.py index 223ff0d..c0ba42b 100644 --- a/globals/apis.py +++ b/globals/apis.py @@ -58,7 +58,7 @@ def get_breakpoint_list(): """ # 请求参数 params = { - 'user_name': global_variable.GLOBAL_USERNAME + 'user_name': global_variable.get_username() } # 请求地址 @@ -114,7 +114,7 @@ def get_measurement_task(): url = "https://engineering.yuxindazhineng.com/index/index/getOne" # 获取用户名 - user_name = global_variable.GLOBAL_USERNAME + user_name = global_variable.get_username() if not user_name: logging.error("未设置用户名,无法获取测量任务") return None @@ -156,8 +156,8 @@ def get_end_with_num(): url = "https://engineering.yuxindazhineng.com/index/index/getOne3" # 获取用户名 - user_name = global_variable.GLOBAL_USERNAME - line_num = global_variable.GLOBAL_LINE_NUM + user_name = global_variable.get_username() + line_num = global_variable.get_line_num() if not line_num: logging.error("未设置线路编码,无法获取测量任务") return None @@ -451,17 +451,17 @@ def get_line_info_and_save_global(user_name: str) -> bool: # # 存入全局字典:key=line_num,value=line_name # global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT[line_num] = line_name # 存入全局字典:key=line_name,value=line_num - global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT[line_name] = line_num + global_variable.get_upload_breakpoint_dict()[line_name] = line_num # 如果line_name不在列表中,则添加 - if line_name not in global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST: - global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST.append(line_name) + if line_name not in global_variable.get_upload_breakpoint_list(): + global_variable.get_upload_breakpoint_list().append(line_name) logging.info(f"找到status=3的线路信息:line_num={line_num}, line_name={line_name}") found_valid_data = True if found_valid_data: - logging.info(f"成功提取所有status=3的线路信息,当前全局字典数据:{global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT}") + logging.info(f"成功提取所有status=3的线路信息,当前全局字典数据:{global_variable.get_upload_breakpoint_dict()}") return True else: logging.warning("data列表中未找到任何status=3且字段完整的线路信息") @@ -476,4 +476,36 @@ def get_line_info_and_save_global(user_name: str) -> bool: return False except Exception as e: logging.error(f"调用get_name_all接口时发生未知异常:{str(e)}", exc_info=True) # exc_info=True打印异常堆栈,方便排查 - return False \ No newline at end of file + return False + +def get_accounts_from_server(yh_id): + """从服务器获取账户信息""" + url = "http://www.yuxindazhineng.com:3002/api/accounts/get_uplaod_data" + headers = { + "Content-Type": "application/json" + } + data = { + "yh_id": yh_id + } + + try: + print(f"🔍 查询服务器账户信息,用户ID: {yh_id}") + response = requests.post(url, headers=headers, json=data, timeout=10) + + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + print(f"✅ 查询成功,找到 {result.get('total', 0)} 个账户") + return result.get("data", []) + else: + print(f"❌ 查询失败: {result.get('message', '未知错误')}") + return [] + else: + print(f"❌ 服务器响应错误: {response.status_code}") + return [] + except requests.exceptions.RequestException as e: + print(f"❌ 网络请求失败: {e}") + return [] + except json.JSONDecodeError as e: + print(f"❌ JSON解析失败: {e}") + return [] diff --git a/globals/driver_utils.py b/globals/driver_utils.py index 6954ad6..dd2bf18 100644 --- a/globals/driver_utils.py +++ b/globals/driver_utils.py @@ -60,6 +60,21 @@ def init_appium_driver(device_id, app_package="com.bjjw.cjgc", app_activity=".ac # 等待应用稳定 time.sleep(2) + # 设置屏幕永不休眠 + try: + # 使用ADB命令设置屏幕永不休眠 + screen_timeout_cmd = [ + "adb", "-s", device_id, + "shell", "settings", "put", "system", "screen_off_timeout", "86400000" + ] + timeout_result = subprocess.run(screen_timeout_cmd, capture_output=True, text=True, timeout=15) + if timeout_result.returncode == 0: + logging.info(f"设备 {device_id} 已成功设置屏幕永不休眠") + else: + logging.warning(f"设备 {device_id} 设置屏幕永不休眠失败: {timeout_result.stderr}") + except Exception as timeout_error: + logging.warning(f"设备 {device_id} 设置屏幕永不休眠时出错: {str(timeout_error)}") + logging.info(f"设备 {device_id} Appium驱动初始化完成") return driver, wait @@ -87,7 +102,7 @@ def check_session_valid(driver, device_id=None): bool: 会话有效返回True,否则返回False """ if device_id is None: - device_id = global_variable.GLOBAL_DEVICE_ID + device_id = global_variable.get_device_id() device_str = f"设备 {device_id} " if device_id else "" if not driver: @@ -158,7 +173,7 @@ def reconnect_driver(device_id, old_driver=None, app_package="com.bjjw.cjgc", ap """ # 使用传入的device_id或从全局变量获取 if not device_id: - device_id = global_variable.GLOBAL_DEVICE_ID + device_id = global_variable.get_device_id() # 修复device_id参数类型问题并使用全局设备ID作为备用 actual_device_id = device_id @@ -174,7 +189,7 @@ def reconnect_driver(device_id, old_driver=None, app_package="com.bjjw.cjgc", ap # 如果仍然没有有效的设备ID,使用全局变量 if not actual_device_id or (isinstance(actual_device_id, str) and ("session=" in actual_device_id or len(actual_device_id.strip()) == 0)): - actual_device_id = global_variable.GLOBAL_DEVICE_ID + actual_device_id = global_variable.get_device_id() logging.warning(f"无法获取有效设备ID,使用全局变量GLOBAL_DEVICE_ID: {actual_device_id}") device_id = actual_device_id # 使用修正后的设备ID @@ -395,7 +410,6 @@ def ensure_appium_server_running(port=4723): logging.error("Appium 服务启动超时!请检查 appium 命令是否在命令行可直接运行。") return False -# ... (保留 init_appium_driver, safe_quit_driver 等其他函数) ... def wait_for_appium_start(port, timeout=10): """ @@ -442,7 +456,7 @@ def safe_quit_driver(driver, device_id=None): device_id: 设备ID(可选) """ if device_id is None: - device_id = global_variable.GLOBAL_DEVICE_ID + device_id = global_variable.get_device_id() device_str = f"设备 {device_id} " if device_id else "" logging.info(f"{device_str}开始关闭驱动") @@ -504,7 +518,7 @@ def launch_app_manually(driver, device_id, package_name="com.bjjw.cjgc", activit """ try: if not device_id: - device_id = global_variable.GLOBAL_DEVICE_ID + device_id = global_variable.get_device_id() # 尝试从driver获取设备ID if driver and hasattr(driver, 'capabilities'): device_id = driver.capabilities.get('udid') diff --git a/globals/global_variable.py b/globals/global_variable.py index 86549ce..454beef 100644 --- a/globals/global_variable.py +++ b/globals/global_variable.py @@ -1,16 +1,147 @@ # 全局变量 -GLOBAL_DEVICE_ID = "" # 设备ID -GLOBAL_USERNAME = "czyuzongwen" # 用户名 -GLOBAL_CURRENT_PROJECT_NAME = "" # 当前测试项目名称 -GLOBAL_LINE_NUM = "" # 线路编码 -GLOBAL_BREAKPOINT_STATUS_CODES = [0,3] # 要获取的断点状态码列表 -GLOBAL_UPLOAD_BREAKPOINT_LIST = [] -GLOBAL_UPLOAD_BREAKPOINT_DICT = {} -GLOBAL_TESTED_BREAKPOINT_LIST = [] # 测量结束的断点列表 -LINE_TIME_MAPPING_DICT = {} # 存储所有线路编码和对应的时间的全局字典 -GLOBAL_BREAKPOINT_DICT = {} # 存储测量结束的断点名称和对应的线路编码的全局字典 -GLOBAL_NAME_TO_ID_MAP = {} # 存储所有数据员姓名和对应的身份证号的全局字典 -GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST = [] # 上传成功的`断点列表 +import threading + +# 线程本地存储,为每个线程创建独立的变量副本 +thread_local = threading.local() + +# 共享数据的锁 +upload_breakpoint_lock = threading.RLock() +upload_success_lock = threading.RLock() +line_time_mapping_lock = threading.RLock() +breakpoint_dict_lock = threading.RLock() +name_to_id_map_lock = threading.RLock() + +# 线程安全的全局变量访问函数 +def get_device_id(): + """获取当前线程的设备ID""" + if not hasattr(thread_local, 'GLOBAL_DEVICE_ID'): + thread_local.GLOBAL_DEVICE_ID = "" + return thread_local.GLOBAL_DEVICE_ID + +def set_device_id(device_id): + """设置当前线程的设备ID""" + thread_local.GLOBAL_DEVICE_ID = device_id + +def get_username(): + """获取用户名(全局共享)""" + if not hasattr(thread_local, 'GLOBAL_USERNAME'): + thread_local.GLOBAL_USERNAME = "czyuzongwen" + return thread_local.GLOBAL_USERNAME + +def set_username(username): + """设置用户名""" + thread_local.GLOBAL_USERNAME = username + +def get_current_project_name(): + """获取当前测试项目名称""" + if not hasattr(thread_local, 'GLOBAL_CURRENT_PROJECT_NAME'): + thread_local.GLOBAL_CURRENT_PROJECT_NAME = "" + return thread_local.GLOBAL_CURRENT_PROJECT_NAME + +def set_current_project_name(project_name): + """设置当前测试项目名称""" + thread_local.GLOBAL_CURRENT_PROJECT_NAME = project_name + +def get_line_num(): + """获取线路编码""" + if not hasattr(thread_local, 'GLOBAL_LINE_NUM'): + thread_local.GLOBAL_LINE_NUM = "" + return thread_local.GLOBAL_LINE_NUM + +def set_line_num(line_num): + """设置线路编码""" + thread_local.GLOBAL_LINE_NUM = line_num + +def get_breakpoint_status_codes(): + """获取要获取的断点状态码列表""" + return [0, 3] # 固定值,不需要线程本地存储 + +def get_upload_breakpoint_list(): + """获取上传断点列表""" + if not hasattr(thread_local, 'GLOBAL_UPLOAD_BREAKPOINT_LIST'): + thread_local.GLOBAL_UPLOAD_BREAKPOINT_LIST = [] + return thread_local.GLOBAL_UPLOAD_BREAKPOINT_LIST + +def set_upload_breakpoint_list(breakpoint_list): + """设置上传断点列表""" + thread_local.GLOBAL_UPLOAD_BREAKPOINT_LIST = breakpoint_list + +def get_upload_breakpoint_dict(): + """获取上传断点字典""" + if not hasattr(thread_local, 'GLOBAL_UPLOAD_BREAKPOINT_DICT'): + thread_local.GLOBAL_UPLOAD_BREAKPOINT_DICT = {} + return thread_local.GLOBAL_UPLOAD_BREAKPOINT_DICT + +def set_upload_breakpoint_dict(breakpoint_dict): + """设置上传断点字典""" + thread_local.GLOBAL_UPLOAD_BREAKPOINT_DICT = breakpoint_dict + +def get_tested_breakpoint_list(): + """获取测量结束的断点列表""" + if not hasattr(thread_local, 'GLOBAL_TESTED_BREAKPOINT_LIST'): + thread_local.GLOBAL_TESTED_BREAKPOINT_LIST = [] + return thread_local.GLOBAL_TESTED_BREAKPOINT_LIST + +def set_tested_breakpoint_list(tested_list): + """设置测量结束的断点列表""" + thread_local.GLOBAL_TESTED_BREAKPOINT_LIST = tested_list + +def get_line_time_mapping_dict(): + """获取线路编码和对应的时间的字典""" + if not hasattr(thread_local, 'LINE_TIME_MAPPING_DICT'): + thread_local.LINE_TIME_MAPPING_DICT = {} + return thread_local.LINE_TIME_MAPPING_DICT + +def set_line_time_mapping_dict(mapping_dict): + """设置线路编码和对应的时间的字典""" + thread_local.LINE_TIME_MAPPING_DICT = mapping_dict + +def get_breakpoint_dict(): + """获取测量结束的断点名称和对应的线路编码的字典""" + if not hasattr(thread_local, 'GLOBAL_BREAKPOINT_DICT'): + thread_local.GLOBAL_BREAKPOINT_DICT = {} + return thread_local.GLOBAL_BREAKPOINT_DICT + +def set_breakpoint_dict(breakpoint_dict): + """设置测量结束的断点名称和对应的线路编码的字典""" + thread_local.GLOBAL_BREAKPOINT_DICT = breakpoint_dict + +def get_name_to_id_map(): + """获取数据员姓名和对应的身份证号的字典""" + if not hasattr(thread_local, 'GLOBAL_NAME_TO_ID_MAP'): + thread_local.GLOBAL_NAME_TO_ID_MAP = {} + return thread_local.GLOBAL_NAME_TO_ID_MAP + +def set_name_to_id_map(name_id_map): + """设置数据员姓名和对应的身份证号的字典""" + thread_local.GLOBAL_NAME_TO_ID_MAP = name_id_map + +def get_upload_success_breakpoint_list(): + """获取上传成功的断点列表""" + if not hasattr(thread_local, 'GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST'): + thread_local.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST = [] + return thread_local.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST + +def set_upload_success_breakpoint_list(success_list): + """设置上传成功的断点列表""" + thread_local.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST = success_list + +# 为了保持 ,保留原有的全局变量名称 +# 但这些将不再被直接使用,而是通过上面的函数访问 +GLOBAL_DEVICE_ID = "" # 设备ID +GLOBAL_USERNAME = "czyuzongwen" # 用户名 +GLOBAL_CURRENT_PROJECT_NAME = "" # 当前测试项目名称 +GLOBAL_LINE_NUM = "" # 线路编码 +GLOBAL_BREAKPOINT_STATUS_CODES = [0,3] # 要获取的断点状态码列表 +GLOBAL_UPLOAD_BREAKPOINT_LIST = [] # +GLOBAL_UPLOAD_BREAKPOINT_DICT = {} # +GLOBAL_TESTED_BREAKPOINT_LIST = [] # 测量结束的断点列表 +LINE_TIME_MAPPING_DICT = {} # 存储所有线路编码和对应的时间的全局字典 +GLOBAL_BREAKPOINT_DICT = {} # 存储测量结束的断点名称和对应的线路编码的全局字典 +GLOBAL_NAME_TO_ID_MAP = {} # 存储所有数据员姓名和对应的身份证号的全局字典 +GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST = [] # 上传成功的`断点列表 + +GLOBAL_YU_ID = "68e764f0f9548871c22f93b8" # 宇恒一号ID diff --git a/main.py b/main.py index 803a05b..a3f8663 100644 --- a/main.py +++ b/main.py @@ -32,156 +32,11 @@ logging.basicConfig( ] ) -class DeviceAutomation: - @staticmethod - def get_device_id() -> str: +class DeviceAutomation(object): - # """ - # 获取设备ID,优先使用已连接设备,否则使用全局配置 - # """ - # try: - # # 检查已连接设备 - # result = subprocess.run( - # ["adb", "devices"], - # capture_output=True, - # text=True, - # timeout=10 - # ) - - # # 解析设备列表 - # for line in result.stdout.strip().split('\n')[1:]: - # if line.strip() and "device" in line and "offline" not in line: - # device_id = line.split('\t')[0] - # logging.info(f"使用已连接设备: {device_id}") - # global_variable.GLOBAL_DEVICE_ID = device_id - # return device_id - - # except Exception as e: - # logging.warning(f"设备检测失败: {e}") - - """ - 获取设备ID,优先使用无线连接设备,否则尝试开启无线调试,最后使用全局配置 - """ - try: - # 检查已连接设备 - result = subprocess.run( - ["adb", "devices"], - capture_output=True, - text=True, - timeout=10 - ) - - # 解析设备列表 - wireless_device_id = None - usb_device_id = None - - # 先查找无线连接的设备(IP:端口格式) - for line in result.stdout.strip().split('\n')[1:]: - if line.strip() and "device" in line and "offline" not in line: - current_device = line.split('\t')[0] - # 检查是否为IP:端口格式的无线连接 - if ":" in current_device and any(char.isdigit() for char in current_device): - wireless_device_id = current_device - logging.info(f"使用无线连接设备: {wireless_device_id}") - global_variable.GLOBAL_DEVICE_ID = wireless_device_id - return wireless_device_id - else: - # 记录第一个USB连接的设备 - if not usb_device_id: - usb_device_id = current_device - - # 如果没有找到无线连接的设备,尝试使用USB设备开启无线调试 - if not wireless_device_id and usb_device_id: - logging.info(f"未找到无线连接设备,尝试使用USB设备 {usb_device_id} 开启无线调试") - - # 尝试获取设备IP地址 - try: - import re - import time - - ip_result = subprocess.run( - ["adb", "-s", usb_device_id, "shell", "ip", "-f", "inet", "addr", "show", "wlan0"], - capture_output=True, - text=True, - timeout=10 - ) - - # 解析IP地址 - ip_output = ip_result.stdout - if "inet " in ip_output: - # 提取IP地址 - ip_match = re.search(r'inet\s+(\d+\.\d+\.\d+\.\d+)', ip_output) - if ip_match: - device_ip = ip_match.group(1) - logging.info(f"获取到设备IP地址: {device_ip}") - - # 开启无线调试 - tcpip_result = subprocess.run( - ["adb", "-s", usb_device_id, "tcpip", "5555"], - capture_output=True, - text=True, - timeout=10 - ) - - if "restarting in TCP mode port: 5555" in tcpip_result.stdout: - logging.info("无线调试已开启,端口: 5555") - - # 等待几秒钟让设备准备好 - time.sleep(3) - - # 连接到无线设备 - connect_result = subprocess.run( - ["adb", "connect", f"{device_ip}:5555"], - capture_output=True, - text=True, - timeout=10 - ) - - if "connected to" in connect_result.stdout: - logging.info(f"成功连接到无线设备: {device_ip}:5555") - global_variable.GLOBAL_DEVICE_ID = f"{device_ip}:5555" - return f"{device_ip}:5555" - else: - logging.warning(f"连接无线设备失败: {connect_result.stderr}") - logging.info(f"使用USB设备: {usb_device_id}") - global_variable.GLOBAL_DEVICE_ID = usb_device_id - return usb_device_id - else: - logging.warning(f"开启无线调试失败: {tcpip_result.stderr}") - logging.info(f"使用USB设备: {usb_device_id}") - global_variable.GLOBAL_DEVICE_ID = usb_device_id - return usb_device_id - else: - logging.warning("未找到设备IP地址") - logging.info(f"使用USB设备: {usb_device_id}") - global_variable.GLOBAL_DEVICE_ID = usb_device_id - return usb_device_id - else: - logging.warning("无法获取设备IP地址,可能设备未连接到WiFi") - logging.info(f"使用USB设备: {usb_device_id}") - global_variable.GLOBAL_DEVICE_ID = usb_device_id - return usb_device_id - except Exception as e: - logging.warning(f"开启无线调试时出错: {str(e)}") - logging.info(f"使用USB设备: {usb_device_id}") - global_variable.GLOBAL_DEVICE_ID = usb_device_id - return usb_device_id - - except Exception as e: - logging.warning(f"设备检测失败: {e}") - - - # 使用全局配置 - device_id = global_variable.GLOBAL_DEVICE_ID - logging.info(f"使用全局配置设备: {device_id}") - return device_id def __init__(self, device_id=None): - # 如果没有提供设备ID,则自动获取 - if device_id is None: - self.device_id = self.get_device_id() - else: - self.device_id = device_id + self.device_id = device_id # 初始化权限 if permissions.grant_appium_permissions(self.device_id): @@ -282,7 +137,7 @@ class DeviceAutomation: return False # 获取状态为3的线路。 - apis.get_line_info_and_save_global(user_name=global_variable.GLOBAL_USERNAME); + apis.get_line_info_and_save_global(user_name=global_variable.get_username()); # # 虚拟数据替代 # global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT = {'CDWZQ-2标-龙骨湾右线大桥-0-7号墩-平原': 'L156372', 'CDWZQ-2标-蓝家湾特大 桥-31-31-平原': 'L159206'} # global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST = list(global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT.keys()) @@ -299,7 +154,7 @@ class DeviceAutomation: # 检查是否有需要上传的断点 - if not global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST: + if not global_variable.get_upload_breakpoint_list(): logging.info(f"设备 {self.device_id} 断点列表为空,无需执行上传操作") return False @@ -312,12 +167,12 @@ class DeviceAutomation: # 遍历断点列表,逐个执行上传操作 upload_success_count = 0 - for breakpoint_name in global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST: + for breakpoint_name in global_variable.get_upload_breakpoint_list(): try: logging.info(f"设备 {self.device_id} 开始处理断点 '{breakpoint_name}' 的上传") # 安全地获取断点信息 - line_num = global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT.get(breakpoint_name) + line_num = global_variable.get_upload_breakpoint_dict().get(breakpoint_name) if line_num is None: logging.warning(f"设备 {self.device_id} 断点 '{breakpoint_name}' 在字典中未找到,跳过上传") continue @@ -335,25 +190,25 @@ class DeviceAutomation: except Exception as e: logging.error(f"设备 {self.device_id} 处理断点 '{breakpoint_name}' 时发生异常: {str(e)}") - logging.info(f"设备 {self.device_id} 上传配置管理执行完成,成功上传 {upload_success_count}/{len(global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST)} 个断点") + logging.info(f"设备 {self.device_id} 上传配置管理执行完成,成功上传 {upload_success_count}/{len(global_variable.get_upload_breakpoint_list())} 个断点") # 如果所有断点都上传成功,返回True;否则返回False - all_upload_success = upload_success_count == len(global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST) + all_upload_success = upload_success_count == len(global_variable.get_upload_breakpoint_list()) if all_upload_success: logging.info(f"设备 {self.device_id} 所有断点上传成功") # 把上传成功的断点写入日志文件"上传成功的断点.txt" with open(os.path.join(self.results_dir, "上传成功的断点.txt"), "w", encoding='utf-8') as f: - for bp in global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST: + for bp in global_variable.get_upload_success_breakpoint_list(): f.write(f"{bp}\n") else: logging.warning(f"设备 {self.device_id} 部分断点上传失败") # 把上传成功的断点写入日志文件"上传成功的断点.txt" with open(os.path.join(self.results_dir, "上传成功的断点.txt"), "w", encoding='utf-8') as f: - for bp in global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST: + for bp in global_variable.get_upload_success_breakpoint_list(): f.write(f"{bp}\n") # 把上传失败的断点写入日志文件"上传失败的断点.txt" with open(os.path.join(self.results_dir, "上传失败的断点.txt"), "w", encoding='utf-8') as f: - for bp in set(global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST)-set(global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST): + for bp in set(global_variable.get_upload_breakpoint_list())-set(global_variable.get_upload_success_breakpoint_list()): f.write(f"{bp}\n") return all_upload_success @@ -457,13 +312,48 @@ class DeviceAutomation: """关闭驱动""" safe_quit_driver(getattr(self, 'driver', None), self.device_id) + @staticmethod + def start_upload(device_id=None, upload_time=None): + """ + 供其他页面或模块调用的静态方法 + 执行完整的自动化流程 + + 参数: + device_id: 可选的设备ID,如果为None则自动获取 + + 返回: + bool: 自动化流程执行结果(True/False) + """ + automation = None + try: + # 创建自动化实例 + automation = DeviceAutomation(device_id=device_id) + + # 执行自动化流程 + success = automation.run_automation() + + if success: + logging.info("自动化流程执行成功") + else: + logging.error("自动化流程执行失败") + + return success + + except Exception as e: + logging.error(f"自动化流程执行出错: {str(e)}") + return False + finally: + # 确保资源被清理 + if automation: + automation.quit() + # 主执行逻辑 if __name__ == "__main__": # 单个设备配置 - 现在DeviceAutomation会自动获取设备ID try: - automation = DeviceAutomation() + automation = DeviceAutomation(device_id="192.168.1.100:5556") success = automation.run_automation() if success: diff --git a/main_init.py b/main_init.py new file mode 100644 index 0000000..803a05b --- /dev/null +++ b/main_init.py @@ -0,0 +1,478 @@ +# actions.py 主自动化脚本 +import os +import logging +import time +import subprocess +from appium import webdriver +from appium.options.android import UiAutomator2Options +from appium.webdriver.common.appiumby import AppiumBy +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.common.exceptions import TimeoutException, NoSuchElementException + +import globals.ids as ids +import globals.global_variable as global_variable # 导入全局变量模块 +import permissions # 导入权限处理模块 +import globals.apis as apis +from globals.driver_utils import init_appium_driver, ensure_appium_server_running, safe_quit_driver, is_app_launched, launch_app_manually +from page_objects.login_page import LoginPage +from page_objects.download_tabbar_page import DownloadTabbarPage +from page_objects.screenshot_page import ScreenshotPage +from page_objects.upload_config_page import UploadConfigPage +from page_objects.more_download_page import MoreDownloadPage + + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s: %(message)s", + handlers=[ + logging.FileHandler("appium_automation.log"), + logging.StreamHandler() + ] +) + +class DeviceAutomation: + @staticmethod + def get_device_id() -> str: + + # """ + # 获取设备ID,优先使用已连接设备,否则使用全局配置 + # """ + # try: + # # 检查已连接设备 + # result = subprocess.run( + # ["adb", "devices"], + # capture_output=True, + # text=True, + # timeout=10 + # ) + + # # 解析设备列表 + # for line in result.stdout.strip().split('\n')[1:]: + # if line.strip() and "device" in line and "offline" not in line: + # device_id = line.split('\t')[0] + # logging.info(f"使用已连接设备: {device_id}") + # global_variable.GLOBAL_DEVICE_ID = device_id + # return device_id + + # except Exception as e: + # logging.warning(f"设备检测失败: {e}") + + """ + 获取设备ID,优先使用无线连接设备,否则尝试开启无线调试,最后使用全局配置 + """ + try: + # 检查已连接设备 + result = subprocess.run( + ["adb", "devices"], + capture_output=True, + text=True, + timeout=10 + ) + + # 解析设备列表 + wireless_device_id = None + usb_device_id = None + + # 先查找无线连接的设备(IP:端口格式) + for line in result.stdout.strip().split('\n')[1:]: + if line.strip() and "device" in line and "offline" not in line: + current_device = line.split('\t')[0] + # 检查是否为IP:端口格式的无线连接 + if ":" in current_device and any(char.isdigit() for char in current_device): + wireless_device_id = current_device + logging.info(f"使用无线连接设备: {wireless_device_id}") + global_variable.GLOBAL_DEVICE_ID = wireless_device_id + return wireless_device_id + else: + # 记录第一个USB连接的设备 + if not usb_device_id: + usb_device_id = current_device + + # 如果没有找到无线连接的设备,尝试使用USB设备开启无线调试 + if not wireless_device_id and usb_device_id: + logging.info(f"未找到无线连接设备,尝试使用USB设备 {usb_device_id} 开启无线调试") + + # 尝试获取设备IP地址 + try: + import re + import time + + ip_result = subprocess.run( + ["adb", "-s", usb_device_id, "shell", "ip", "-f", "inet", "addr", "show", "wlan0"], + capture_output=True, + text=True, + timeout=10 + ) + + # 解析IP地址 + ip_output = ip_result.stdout + if "inet " in ip_output: + # 提取IP地址 + ip_match = re.search(r'inet\s+(\d+\.\d+\.\d+\.\d+)', ip_output) + if ip_match: + device_ip = ip_match.group(1) + logging.info(f"获取到设备IP地址: {device_ip}") + + # 开启无线调试 + tcpip_result = subprocess.run( + ["adb", "-s", usb_device_id, "tcpip", "5555"], + capture_output=True, + text=True, + timeout=10 + ) + + if "restarting in TCP mode port: 5555" in tcpip_result.stdout: + logging.info("无线调试已开启,端口: 5555") + + # 等待几秒钟让设备准备好 + time.sleep(3) + + # 连接到无线设备 + connect_result = subprocess.run( + ["adb", "connect", f"{device_ip}:5555"], + capture_output=True, + text=True, + timeout=10 + ) + + if "connected to" in connect_result.stdout: + logging.info(f"成功连接到无线设备: {device_ip}:5555") + global_variable.GLOBAL_DEVICE_ID = f"{device_ip}:5555" + return f"{device_ip}:5555" + else: + logging.warning(f"连接无线设备失败: {connect_result.stderr}") + logging.info(f"使用USB设备: {usb_device_id}") + global_variable.GLOBAL_DEVICE_ID = usb_device_id + return usb_device_id + else: + logging.warning(f"开启无线调试失败: {tcpip_result.stderr}") + logging.info(f"使用USB设备: {usb_device_id}") + global_variable.GLOBAL_DEVICE_ID = usb_device_id + return usb_device_id + else: + logging.warning("未找到设备IP地址") + logging.info(f"使用USB设备: {usb_device_id}") + global_variable.GLOBAL_DEVICE_ID = usb_device_id + return usb_device_id + else: + logging.warning("无法获取设备IP地址,可能设备未连接到WiFi") + logging.info(f"使用USB设备: {usb_device_id}") + global_variable.GLOBAL_DEVICE_ID = usb_device_id + return usb_device_id + except Exception as e: + logging.warning(f"开启无线调试时出错: {str(e)}") + logging.info(f"使用USB设备: {usb_device_id}") + global_variable.GLOBAL_DEVICE_ID = usb_device_id + return usb_device_id + + except Exception as e: + logging.warning(f"设备检测失败: {e}") + + + # 使用全局配置 + device_id = global_variable.GLOBAL_DEVICE_ID + logging.info(f"使用全局配置设备: {device_id}") + return device_id + + def __init__(self, device_id=None): + # 如果没有提供设备ID,则自动获取 + if device_id is None: + self.device_id = self.get_device_id() + else: + self.device_id = device_id + + # 初始化权限 + if permissions.grant_appium_permissions(self.device_id): + logging.info(f"设备 {self.device_id} 权限授予成功") + else: + logging.warning(f"设备 {self.device_id} 权限授予失败") + + # 确保Appium服务器正在运行 + ensure_appium_server_running(4723) + + # 初始化驱动 + self.init_driver() + # 先拼接,后创建测试结果目录 + self.results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_results') + os.makedirs(self.results_dir, exist_ok=True) + + + def init_driver(self): + """初始化Appium驱动""" + try: + # 使用全局函数初始化驱动 + self.driver, self.wait = init_appium_driver(self.device_id) + # 初始化页面对象 + logging.info(f"设备 {self.device_id} 开始初始化页面对象") + self.login_page = LoginPage(self.driver, self.wait) + self.download_tabbar_page = DownloadTabbarPage(self.driver, self.wait, self.device_id) + self.screenshot_page = ScreenshotPage(self.driver, self.wait, self.device_id) + self.upload_config_page = UploadConfigPage(self.driver, self.wait, self.device_id) + self.more_download_page = MoreDownloadPage(self.driver, self.wait,self.device_id) + logging.info(f"设备 {self.device_id} 所有页面对象初始化完成") + + # 检查应用是否成功启动 + if is_app_launched(self.driver): + logging.info(f"设备 {self.device_id} 沉降观测App已成功启动") + else: + logging.warning(f"设备 {self.device_id} 应用可能未正确启动") + # 手动启动应用 + launch_app_manually(self.driver, self.device_id) + + except Exception as e: + logging.error(f"设备 {self.device_id} 初始化驱动失败: {str(e)}") + raise + + def is_app_launched(self): + """检查应用是否已启动""" + try: + return is_app_launched(self.driver) + except Exception as e: + logging.error(f"设备 {self.device_id} 检查应用启动状态时出错: {str(e)}") + return False + + def is_element_present(self, by, value): + """检查元素是否存在""" + try: + self.driver.find_element(by, value) + return True + except NoSuchElementException: + return False + + def handle_app_state(self): + """根据当前应用状态处理相应的操作""" + try: + login_btn_exists = self.login_page.is_login_page() + if not login_btn_exists: + logging.error(f"设备 {self.device_id} 未知应用状态,无法确定当前页面,跳转到登录页面") + if self.navigate_to_login_page(self.driver, self.device_id): + logging.info(f"设备 {self.device_id} 成功跳转到登录页面") + return self.handle_app_state() # 递归调用处理登录后的状态 + else: + logging.error(f"设备 {self.device_id} 跳转到登录页面失败") + return False + + # 处理登录页面状态 + logging.info(f"设备 {self.device_id} 检测到登录页面,执行登录操作") + max_retries = 1 + login_success = False + + for attempt in range(max_retries + 1): + if self.login_page.login(): + login_success = True + break + else: + if attempt < max_retries: + logging.warning(f"设备 {self.device_id} 登录失败,准备重试 ({attempt + 1}/{max_retries})") + time.sleep(2) # 等待2秒后重试 + else: + logging.error(f"设备 {self.device_id} 登录失败,已达到最大重试次数") + + if not login_success: + return False + + logging.info(f"设备 {self.device_id} 登录成功,继续执行更新操作") + time.sleep(1) + + # 执行更新操作 + if not self.download_tabbar_page.download_tabbar_page_manager(): + logging.error(f"设备 {self.device_id} 更新操作执行失败") + return False + + # 获取状态为3的线路。 + apis.get_line_info_and_save_global(user_name=global_variable.GLOBAL_USERNAME); + # # 虚拟数据替代 + # global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT = {'CDWZQ-2标-龙骨湾右线大桥-0-7号墩-平原': 'L156372', 'CDWZQ-2标-蓝家湾特大 桥-31-31-平原': 'L159206'} + # global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST = list(global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT.keys()) + + # 点击测量导航栏按钮 + measure_page_btn = WebDriverWait(self.driver, 5).until( + EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/img_3_layout")) + ) + measure_page_btn.click() + + + # 处理平差 + self.screenshot_page.screenshot_page_manager(self.device_id) + + + # 检查是否有需要上传的断点 + if not global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST: + logging.info(f"设备 {self.device_id} 断点列表为空,无需执行上传操作") + return False + + # 点击上传导航栏按钮 + upload_page_btn = WebDriverWait(self.driver, 5).until( + EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/img_2_layout")) + ) + upload_page_btn.click() + + + # 遍历断点列表,逐个执行上传操作 + upload_success_count = 0 + for breakpoint_name in global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST: + try: + logging.info(f"设备 {self.device_id} 开始处理断点 '{breakpoint_name}' 的上传") + + # 安全地获取断点信息 + line_num = global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT.get(breakpoint_name) + if line_num is None: + logging.warning(f"设备 {self.device_id} 断点 '{breakpoint_name}' 在字典中未找到,跳过上传") + continue + if not line_num: + logging.warning(f"设备 {self.device_id} 断点 '{breakpoint_name}' 未获取到line_num,跳过上传") + continue + + # 执行上传配置管理,传入当前断点名称 + if self.upload_config_page.upload_config_page_manager(self.results_dir, breakpoint_name, line_num): + logging.info(f"设备 {self.device_id} 断点 '{breakpoint_name}' 上传成功") + upload_success_count += 1 + else: + logging.error(f"设备 {self.device_id} 断点 '{breakpoint_name}' 上传失败") + + except Exception as e: + logging.error(f"设备 {self.device_id} 处理断点 '{breakpoint_name}' 时发生异常: {str(e)}") + + logging.info(f"设备 {self.device_id} 上传配置管理执行完成,成功上传 {upload_success_count}/{len(global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST)} 个断点") + + # 如果所有断点都上传成功,返回True;否则返回False + all_upload_success = upload_success_count == len(global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST) + if all_upload_success: + logging.info(f"设备 {self.device_id} 所有断点上传成功") + # 把上传成功的断点写入日志文件"上传成功的断点.txt" + with open(os.path.join(self.results_dir, "上传成功的断点.txt"), "w", encoding='utf-8') as f: + for bp in global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST: + f.write(f"{bp}\n") + else: + logging.warning(f"设备 {self.device_id} 部分断点上传失败") + # 把上传成功的断点写入日志文件"上传成功的断点.txt" + with open(os.path.join(self.results_dir, "上传成功的断点.txt"), "w", encoding='utf-8') as f: + for bp in global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST: + f.write(f"{bp}\n") + # 把上传失败的断点写入日志文件"上传失败的断点.txt" + with open(os.path.join(self.results_dir, "上传失败的断点.txt"), "w", encoding='utf-8') as f: + for bp in set(global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST)-set(global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST): + f.write(f"{bp}\n") + + return all_upload_success + + except Exception as e: + logging.error(f"设备 {self.device_id} 处理应用状态时出错: {str(e)}") + return False + + + + def check_and_click_confirm_popup_appium(self): + """ + 适用于Appium的弹窗检测函数 + + Returns: + bool: 是否成功处理弹窗 + """ + try: + from appium.webdriver.common.appiumby import AppiumBy + + # 使用self.driver而不是参数 + if not hasattr(self, 'driver') or self.driver is None: + logging.warning("driver未初始化,无法检测弹窗") + return False + + # 检查弹窗消息 + message_elements = self.driver.find_elements(AppiumBy.XPATH, '//android.widget.TextView[@text="是否退出测量界面?"]') + + if message_elements: + logging.info("检测到退出测量界面弹窗") + + # 点击"是"按钮 + confirm_buttons = self.driver.find_elements(AppiumBy.XPATH, '//android.widget.Button[@text="是" and @resource-id="android:id/button1"]') + if confirm_buttons: + confirm_buttons[0].click() + logging.info("已点击'是'按钮") + time.sleep(1) + return True + else: + logging.warning("未找到'是'按钮") + return False + else: + return False + + except Exception as e: + logging.error(f"Appium检测弹窗时发生错误: {str(e)}") + return False + + + + def navigate_to_login_page(self, driver, device_id): + """ + 补充的跳转页面函数:当设备处于未知状态时,尝试跳转到登录页面 + + 参数: + driver: 已初始化的Appium WebDriver对象 + device_id: 设备ID,用于日志记录 + """ + try: + target_package = 'com.bjjw.cjgc' + target_activity = '.activity.LoginActivity' + # 使用ADB命令启动Activity + try: + logging.info(f"尝试使用ADB命令启动LoginActivity: {target_package}/{target_activity}") + adb_command = f"adb -s {device_id} shell am start -n {target_package}/{target_activity}" + result = subprocess.run(adb_command, shell=True, capture_output=True, text=True) + if result.returncode == 0: + logging.info(f"使用ADB命令启动LoginActivity成功") + time.sleep(2) # 等待Activity启动 + return True + else: + logging.warning(f"ADB命令执行失败: {result.stderr}") + except Exception as adb_error: + logging.warning(f"执行ADB命令时出错: {adb_error}") + except Exception as e: + logging.error(f"跳转到登录页面过程中发生未预期错误: {e}") + + # 所有尝试都失败 + return False + + + + + def run_automation(self): + """运行自动化流程""" + try: + success = self.handle_app_state() + # success = self.test_handle_app_state() + if success: + logging.info(f"设备 {self.device_id} 自动化流程执行成功") + else: + logging.error(f"设备 {self.device_id} 自动化流程执行失败") + return success + except Exception as e: + logging.error(f"设备 {self.device_id} 自动化执行过程中发生错误: {str(e)}") + return False + finally: + self.quit() + + def quit(self): + """关闭驱动""" + safe_quit_driver(getattr(self, 'driver', None), self.device_id) + +# 主执行逻辑 +if __name__ == "__main__": + + # 单个设备配置 - 现在DeviceAutomation会自动获取设备ID + + try: + automation = DeviceAutomation() + success = automation.run_automation() + + if success: + logging.info(f"设备自动化流程执行成功") + else: + logging.error(f"设备自动化流程执行失败") + except Exception as e: + logging.error(f"设备执行出错: {str(e)}") + + + + \ No newline at end of file diff --git a/page_objects/__pycache__/download_tabbar_page.cpython-312.pyc b/page_objects/__pycache__/download_tabbar_page.cpython-312.pyc index 02ad58c..6c94d57 100644 Binary files a/page_objects/__pycache__/download_tabbar_page.cpython-312.pyc and b/page_objects/__pycache__/download_tabbar_page.cpython-312.pyc differ diff --git a/page_objects/__pycache__/login_page.cpython-312.pyc b/page_objects/__pycache__/login_page.cpython-312.pyc index 691de42..9311982 100644 Binary files a/page_objects/__pycache__/login_page.cpython-312.pyc and b/page_objects/__pycache__/login_page.cpython-312.pyc differ diff --git a/page_objects/__pycache__/screenshot_page.cpython-312.pyc b/page_objects/__pycache__/screenshot_page.cpython-312.pyc index 76bb38a..0836221 100644 Binary files a/page_objects/__pycache__/screenshot_page.cpython-312.pyc and b/page_objects/__pycache__/screenshot_page.cpython-312.pyc differ diff --git a/page_objects/__pycache__/upload_config_page.cpython-312.pyc b/page_objects/__pycache__/upload_config_page.cpython-312.pyc index 8cc26d1..ea59bcd 100644 Binary files a/page_objects/__pycache__/upload_config_page.cpython-312.pyc and b/page_objects/__pycache__/upload_config_page.cpython-312.pyc differ diff --git a/page_objects/download_tabbar_page.py b/page_objects/download_tabbar_page.py index e753889..f333dde 100644 --- a/page_objects/download_tabbar_page.py +++ b/page_objects/download_tabbar_page.py @@ -295,25 +295,25 @@ class DownloadTabbarPage: """执行基础更新操作""" try: # 执行基础更新流程 - self.logger.info(f"设备 {global_variable.GLOBAL_DEVICE_ID} 开始执行更新流程") + self.logger.info(f"设备 {global_variable.get_device_id()} 开始执行更新流程") # 点击下载标签栏 if not self.click_download_tabbar(): - self.logger.error(f"设备 {global_variable.GLOBAL_DEVICE_ID} 点击下载标签栏失败") + self.logger.error(f"设备 {global_variable.get_device_id()} 点击下载标签栏失败") return False # 更新工作基点 if not self.update_work_base(): - self.logger.error(f"设备 {global_variable.GLOBAL_DEVICE_ID} 更新工作基点失败") + self.logger.error(f"设备 {global_variable.get_device_id()} 更新工作基点失败") return False # 更新水准线路 if not self.update_level_line(): - self.logger.error(f"设备 {global_variable.GLOBAL_DEVICE_ID} 更新水准线路失败") + self.logger.error(f"设备 {global_variable.get_device_id()} 更新水准线路失败") return False - self.logger.info(f"设备 {global_variable.GLOBAL_DEVICE_ID} 更新操作执行成功") + self.logger.info(f"设备 {global_variable.get_device_id()} 更新操作执行成功") return True except Exception as e: - self.logger.error(f"设备 {global_variable.GLOBAL_DEVICE_ID} 执行更新操作时出错: {str(e)}") + self.logger.error(f"设备 {global_variable.get_device_id()} 执行更新操作时出错: {str(e)}") return False \ No newline at end of file diff --git a/page_objects/login_page.py b/page_objects/login_page.py index f02b3a2..036a970 100644 --- a/page_objects/login_page.py +++ b/page_objects/login_page.py @@ -35,14 +35,40 @@ class LoginPage: # 读取文本框内已有的用户名(.text属性获取元素显示的文本内容) existing_username = username_field.text - # 3. 将获取到的用户名写入全局变量中 - global_variable.GLOBAL_USERNAME = existing_username # 关键:给全局变量赋值 + # 3. 将获取到的用户名写入全局变量中 + # global_variable.GLOBAL_USERNAME = existing_username # 关键:给全局变量赋值 + global_variable.set_username(existing_username) # 日志记录获取到的已有用户名(若为空,也需明确记录,避免后续误解) if existing_username.strip(): # 去除空格后判断是否有有效内容 self.logger.info(f"已获取文本框中的已有用户名: {existing_username}") else: self.logger.info("文本框中未检测到已有用户名(内容为空)") + + # 1. 定位密码输入框 + password_field = self.wait.until( + EC.element_to_be_clickable((AppiumBy.ID, ids.LOGIN_PASSWORD)) + ) + + # 2. 清空密码框(如果需要) + try: + password_field.clear() + # time.sleep(0.5) # 等待清除完成 + except: + # 如果clear方法不可用,尝试其他方式 + pass + + # 3. 输入密码 + if existing_username=="wangshun": + password_field.send_keys("Wang93534.") + else: + password_field.send_keys("Liang/1974.") + + # 4. 可选:隐藏键盘 + try: + self.driver.hide_keyboard() + except: + pass # 点击登录按钮 login_btn = self.wait.until( diff --git a/page_objects/screenshot_page.py b/page_objects/screenshot_page.py index 15e3a83..210078f 100644 --- a/page_objects/screenshot_page.py +++ b/page_objects/screenshot_page.py @@ -258,8 +258,8 @@ class ScreenshotPage: tuple: (date_str, time_str) 日期和时间字符串 """ - if line_code in global_variable.LINE_TIME_MAPPING_DICT: - end_time = global_variable.LINE_TIME_MAPPING_DICT[line_code] + if line_code in global_variable.get_line_time_mapping_dict(): + end_time = global_variable.get_line_time_mapping_dict()[line_code] date_str = end_time.strftime("%Y-%m-%d") time_str = end_time.strftime("%H:%M:%S") return (date_str, time_str) @@ -273,19 +273,19 @@ class ScreenshotPage: """ self.logger.info("\n当前全局字典内容:") - if global_variable.LINE_TIME_MAPPING_DICT: - for line_code, end_time in sorted(global_variable.LINE_TIME_MAPPING_DICT.items()): + if global_variable.get_line_time_mapping_dict(): + for line_code, end_time in sorted(global_variable.get_line_time_mapping_dict().items()): date_str = end_time.strftime("%Y-%m-%d") time_str = end_time.strftime("%H:%M:%S") self.logger.info(f" {line_code}: {date_str} {time_str}") else: self.logger.info(" 全局字典为空") - self.logger.info(f"总计: {len(global_variable.LINE_TIME_MAPPING_DICT)} 条记录\n") + self.logger.info(f"总计: {len(global_variable.get_line_time_mapping_dict())} 条记录\n") def clear_line_time_mapping(self): """ 清空全局字典 """ - global_variable.LINE_TIME_MAPPING_DICT.clear() + global_variable.get_line_time_mapping_dict().clear() self.logger.info("已清空全局字典") def set_device_time(self, device_id, time_str=None, date_str=None, disable_auto_sync=True): @@ -1121,10 +1121,10 @@ class ScreenshotPage: # 确保device_id正确设置,使用全局变量作为备用 if not hasattr(self, 'device_id') or not self.device_id: # 优先使用传入的device_id,其次使用全局变量 - self.device_id = device_id if device_id else global_variable.GLOBAL_DEVICE_ID + self.device_id = device_id if device_id else global_variable.get_device_id() # 使用self.device_id,确保有默认值 - actual_device_id = self.device_id if self.device_id else global_variable.GLOBAL_DEVICE_ID + actual_device_id = self.device_id if self.device_id else global_variable.get_device_id() if not check_session_valid(self.driver, actual_device_id): self.logger.warning(f"设备 {actual_device_id} 会话无效,尝试重新连接驱动...") @@ -1269,9 +1269,9 @@ class ScreenshotPage: def add_breakpoint_to_upload_list(self, breakpoint_name, line_num): """添加平差完成的断点到上传列表和字典""" - if breakpoint_name and breakpoint_name not in global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST: - global_variable.GLOBAL_UPLOAD_BREAKPOINT_LIST.append(breakpoint_name) - global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT[breakpoint_name] = { + if breakpoint_name and breakpoint_name not in global_variable.get_upload_breakpoint_list(): + global_variable.get_upload_breakpoint_list().append(breakpoint_name) + global_variable.get_upload_breakpoint_dict()[breakpoint_name] = { 'breakpoint_name': breakpoint_name, 'line_num': line_num } @@ -1342,19 +1342,18 @@ class ScreenshotPage: return False # 检查GLOBAL_UPLOAD_BREAKPOINT_DICT是否为空,如果为空则初始化一些测试数据 - if not global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT: + if not global_variable.get_upload_breakpoint_dict(): self.logger.warning("global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT为空,正在初始化测试数据") - global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT = {'CDWZQ-2标-龙骨湾右线大桥-0-7号墩-平原': 'L156372', 'CDWZQ-2标-蓝家湾特大 桥-31-31-平原': 'L159206'} + global_variable.set_upload_breakpoint_dict({'CDWZQ-2标-龙骨湾右线大桥-0-7号墩-平原': 'L156372', 'CDWZQ-2标-蓝家湾特大 桥-31-31-平原': 'L159206'}) - # 创建断点列表的副本,用于重试时重新处理 - breakpoint_names = list(global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT.keys()) + breakpoint_names = list(global_variable.get_upload_breakpoint_dict().keys()) processed_breakpoints = [] # 开始循环处理断点 for breakpoint_name in breakpoint_names: if breakpoint_name in processed_breakpoints: continue - line_code = global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT[breakpoint_name] + line_code = global_variable.get_upload_breakpoint_dict()[breakpoint_name] self.logger.info(f"开始处理要平差的断点 {breakpoint_name}") @@ -1409,17 +1408,19 @@ class ScreenshotPage: self.logger.error(f"设备 {device_id} 处理返回按钮确认失败") continue - # 成功处理完一个断点,添加到已处理列表 - processed_breakpoints.append(breakpoint_name) - self.logger.info(f"成功处理断点: {breakpoint_name}") + # # 成功处理完一个断点,添加到已处理列表 + # processed_breakpoints.append(breakpoint_name) + # self.logger.info(f"成功处理断点: {breakpoint_name}") - # 检查是否所有断点都处理完成 - if len(processed_breakpoints) == len(breakpoint_names): - self.logger.info(f"设备 {device_id} 平差页面操作执行完成") - return True - else: - self.logger.warning(f"设备 {device_id} 部分断点处理失败,已成功处理 {len(processed_breakpoints)}/{len(breakpoint_names)} 个断点") - return True + # # 检查是否所有断点都处理完成 + # if len(processed_breakpoints) == len(breakpoint_names): + # self.logger.info(f"设备 {device_id} 平差页面操作执行完成") + # return True + # else: + # self.logger.warning(f"设备 {device_id} 部分断点处理失败,已成功处理 {len(processed_breakpoints)}/{len(breakpoint_names)} 个断点") + # return True + self.logger.warning(f"设备 {device_id} 上传流程执行完成") + return True except Exception as e: retry_count += 1 diff --git a/page_objects/upload_config_page.py b/page_objects/upload_config_page.py index 1ac8ef0..9c17cb8 100644 --- a/page_objects/upload_config_page.py +++ b/page_objects/upload_config_page.py @@ -671,7 +671,7 @@ class UploadConfigPage: logging.warning(f"跳过无效数据: 姓名='{name}', 身份证='{id_card}'") # 将字典保存到全局变量 - global_variable.GLOBAL_NAME_TO_ID_MAP = name_id_map + global_variable.set_name_to_id_map(name_id_map) logging.info(f"成功加载用户数据,共 {len(df)} 条记录,{len(name_id_map)} 个有效姓名-身份证映射") @@ -711,7 +711,7 @@ class UploadConfigPage: logging.info(f"使用第一个数据员: {sjname}") # 获取身份证号码 - id_card = global_variable.GLOBAL_NAME_TO_ID_MAP.get(sjname) + id_card = global_variable.get_name_to_id_map().get(sjname) logging.info(f"id_card: {id_card}") if not id_card: logging.error(f"未找到数据员 {sjname} 对应的身份证号") @@ -1759,7 +1759,7 @@ class UploadConfigPage: else: self.logger.info("页面中包含变化量属性,继续执行后续操作") - user_id = global_variable.GLOBAL_USERNAME + user_id = global_variable.get_username() if user_id is None: self.logger.error("获取用户ID失败") return False @@ -1859,7 +1859,7 @@ class UploadConfigPage: self.logger.info("上传配置页面操作执行完成") # 把上传成功的断点写入全局变量GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST - global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST.append(breakpoint_name) + global_variable.get_upload_success_breakpoint_list().append(breakpoint_name) return True diff --git a/permissions.py b/permissions.py index 8fc9a94..eeb78f7 100644 --- a/permissions.py +++ b/permissions.py @@ -2,6 +2,7 @@ import subprocess import logging import time +import os # 配置日志 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s: %(message)s") @@ -83,90 +84,143 @@ def grant_single_permission(device_id: str, package: str, permission: str) -> bo logging.error(f"设备 {device_id}:处理 {package} 时发生未知错误:{str(e)}") return False + +# def grant_appium_permissions(device_id: str, require_all: bool = False) -> bool: +# """ +# 为 Appium UiAutomator2 服务授予权限 +# :param device_id: 设备 ID +# :param require_all: 是否要求所有权限都成功授予 +# :return: 权限授予是否成功(根据require_all参数判断) +# """ +# # 首先检查设备连接 +# if not check_device_connection(device_id): +# return False + +# packages_to_grant = [ +# "io.appium.settings", +# "io.appium.uiautomator2.server", +# "io.appium.uiautomator2.server.test" +# ] + +# # 权限列表(按优先级排序) +# permissions_to_grant = [ +# "android.permission.SET_ANIMATION_SCALE", +# "android.permission.CHANGE_CONFIGURATION", +# "android.permission.WRITE_SETTINGS", +# "android.permission.DISABLE_KEYGUARD", +# ] + +# success_count = 0 +# total_attempted = 0 +# package_results = {} + +# # 检查并授予权限 +# for package in packages_to_grant: +# package_results[package] = {"installed": False, "permissions": {}} + +# if not is_package_installed(device_id, package): +# logging.warning(f"设备 {device_id}:包 {package} 未安装,跳过权限授予") +# package_results[package]["installed"] = False +# continue + +# package_results[package]["installed"] = True +# package_success = 0 +# package_attempted = 0 + +# for permission in permissions_to_grant: +# total_attempted += 1 +# package_attempted += 1 + +# result = grant_single_permission(device_id, package, permission) +# package_results[package]["permissions"][permission] = result + +# if result: +# success_count += 1 +# package_success += 1 + +# # 记录每个包的授权结果 +# logging.info(f"设备 {device_id}:包 {package} 权限授予结果: {package_success}/{package_attempted}") + +# # 统计和报告 +# logging.info(f"设备 {device_id}:权限授予完成") +# logging.info(f"总计: 尝试 {total_attempted} 次,成功 {success_count} 次") + +# # 检查每个包的关键权限 +# critical_permission = "android.permission.WRITE_SECURE_SETTINGS" +# critical_failures = [] + +# for package, info in package_results.items(): +# if info["installed"] and critical_permission in info["permissions"]: +# if not info["permissions"][critical_permission]: +# critical_failures.append(package) + +# if critical_failures: +# logging.warning(f"设备 {device_id}:以下包的关键权限 {critical_permission} 授予失败: {', '.join(critical_failures)}") +# logging.warning("这可能会影响某些自动化功能,但基础测试通常不受影响") + +# # 根据require_all参数返回结果 +# if require_all: +# # 要求所有权限都成功 +# if critical_failures: +# logging.error("关键权限授予失败,无法继续(require_all=True)") +# return False +# return success_count == total_attempted +# else: +# # 不要求所有权限,只要设备连接正常就返回True +# logging.info(f"设备 {device_id}:权限授予过程完成,建议重启设备或Appium服务使更改生效") +# return True + def grant_appium_permissions(device_id: str, require_all: bool = False) -> bool: """ - 为 Appium UiAutomator2 服务授予权限 - :param device_id: 设备 ID - :param require_all: 是否要求所有权限都成功授予 - :return: 权限授予是否成功(根据require_all参数判断) + 修复版:为 Appium 授予权限(使用正确的方法) """ - # 首先检查设备连接 - if not check_device_connection(device_id): - return False + logging.info(f"设备 {device_id}:开始设置Appium权限") - packages_to_grant = [ - "io.appium.settings", - "io.appium.uiautomator2.server", - "io.appium.uiautomator2.server.test" - ] - - # 权限列表(按优先级排序) - permissions_to_grant = [ - "android.permission.WRITE_SECURE_SETTINGS", - "android.permission.CHANGE_CONFIGURATION", - "android.permission.DUMP", + # 1. 使用系统设置命令(替代原来的pm grant尝试) + logging.info("使用系统设置命令...") + system_commands = [ + ["adb", "-s", device_id, "shell", "settings", "put", "global", "window_animation_scale", "0"], + ["adb", "-s", device_id, "shell", "settings", "put", "global", "transition_animation_scale", "0"], + ["adb", "-s", device_id, "shell", "settings", "put", "global", "animator_duration_scale", "0"], + ["adb", "-s", device_id, "shell", "settings", "put", "system", "screen_off_timeout", "86400000"], ] success_count = 0 - total_attempted = 0 - package_results = {} - - # 检查并授予权限 - for package in packages_to_grant: - package_results[package] = {"installed": False, "permissions": {}} - - if not is_package_installed(device_id, package): - logging.warning(f"设备 {device_id}:包 {package} 未安装,跳过权限授予") - package_results[package]["installed"] = False - continue - - package_results[package]["installed"] = True - package_success = 0 - package_attempted = 0 - - for permission in permissions_to_grant: - total_attempted += 1 - package_attempted += 1 - - result = grant_single_permission(device_id, package, permission) - package_results[package]["permissions"][permission] = result - - if result: + for cmd in system_commands: + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + if result.returncode == 0: success_count += 1 - package_success += 1 - - # 记录每个包的授权结果 - logging.info(f"设备 {device_id}:包 {package} 权限授予结果: {package_success}/{package_attempted}") + logging.info(f" 成功: {' '.join(cmd[3:])}") + else: + logging.warning(f" 失败: {' '.join(cmd[3:])}") + except: + logging.warning(f" 异常: {' '.join(cmd[3:])}") - # 统计和报告 - logging.info(f"设备 {device_id}:权限授予完成") - logging.info(f"总计: 尝试 {total_attempted} 次,成功 {success_count} 次") + # 2. 授予可自动授予的权限 + logging.info("授予基础权限...") + grantable = [ + "android.permission.INTERNET", + "android.permission.ACCESS_NETWORK_STATE", + "android.permission.ACCESS_WIFI_STATE", + ] - # 检查每个包的关键权限 - critical_permission = "android.permission.WRITE_SECURE_SETTINGS" - critical_failures = [] + for perm in grantable: + cmd = ["adb", "-s", device_id, "shell", "pm", "grant", "io.appium.settings", perm] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + if result.returncode == 0: + success_count += 1 + logging.info(f" 成功授予: {perm.split('.')[-1]}") + else: + logging.debug(f" 跳过: {perm.split('.')[-1]}") - for package, info in package_results.items(): - if info["installed"] and critical_permission in info["permissions"]: - if not info["permissions"][critical_permission]: - critical_failures.append(package) + # 3. 返回结果 + logging.info(f"设置完成,成功项数: {success_count}") - if critical_failures: - logging.warning(f"设备 {device_id}:以下包的关键权限 {critical_permission} 授予失败: {', '.join(critical_failures)}") - logging.warning("这可能会影响某些自动化功能,但基础测试通常不受影响") - - # 根据require_all参数返回结果 if require_all: - # 要求所有权限都成功 - if critical_failures: - logging.error("关键权限授予失败,无法继续(require_all=True)") - return False - return success_count == total_attempted + return success_count == (len(system_commands) + len(grantable)) else: - # 不要求所有权限,只要设备连接正常就返回True - logging.info(f"设备 {device_id}:权限授予过程完成,建议重启设备或Appium服务使更改生效") - return True - + return success_count > 0 # 只要有成功项就返回True def check_appium_compatibility(device_id: str) -> dict: """ 检查Appium兼容性 @@ -217,6 +271,8 @@ def check_appium_compatibility(device_id: str) -> dict: logging.error(f"检查兼容性时出错: {str(e)}") return {"device_id": device_id, "error": str(e)} + + # 使用示例 if __name__ == "__main__": # 获取设备ID(示例) diff --git a/scheduler.py b/scheduler.py new file mode 100644 index 0000000..c59bbf6 --- /dev/null +++ b/scheduler.py @@ -0,0 +1,100 @@ +import requests +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +# 从你的 main.py 中导入DeviceAutomation类 +from main import DeviceAutomation +import globals.apis as apis +from datetime import datetime + +## --- 配置区 --- +API_URL = "http://your-api-server.com/api/tasks" # 替换为真实的接口地址 +MAX_WORKERS = 3 # 最大并发线程数,建议根据网络带宽调整 +DEFAULT_PORT = "6666" + +def get_remote_tasks(): + """ + 从接口获取数据 + """ + try: + # 如果是真实接口,取消下面两行的注释 + # response = requests.get(API_URL, timeout=10) + # return response.json() + accounts = apis.get_accounts_from_server("68c0dbfdb7cbcd616e7c5ab5") + if not accounts: + print("❌ 未从服务器获取到账户信息,终止流程") + return {} + + filtered_accounts = [account for account in accounts if account.get('is_ok') == 1] + # print("✅ 获取账户信息成功", filtered_accounts) + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # 从原始 accounts 数据中筛选有 device_ip 和 device_port 且不为 None 的账户 + device_dict = {} + + for account in filtered_accounts: + device_ip = account.get('device_ip') + device_port = account.get('device_port') + + # 检查 device_ip 和 device_port 是否都存在且不为 None + if device_ip and device_port: + # 拼接为 ip:port 格式 + key = f"{device_ip}:{device_port}" + # 添加当前时间 + device_dict[key] = current_time + + # 结果 + print(device_dict) + return device_dict + # # 模拟数据供测试 + # return { + # "192.168.1.45:5555": "2026-02-03 10:20:00", + # "192.168.1.100:5556": "2026-02-03 10:20:20", + # "192.168.31.12:5557": "2026-02-03 10:21:00" + # } + except Exception as e: + print(f"❌ 获取接口任务失败: {e}") + return {} + +def run_task(address, target_time): + """ + 单个线程的任务包装器 + """ + # 格式化账号/设备名:确保带上端口号 + device_address = address + + print(f"🕒 [等待/启动] 设备: {device_address} | 预定时间: {target_time}") + + try: + # 创建DeviceAutomation实例并执行上传逻辑 + automation = DeviceAutomation(device_address) + result = automation.handle_app_state() + return f"✅ {device_address} 完成: {result}" + except Exception as e: + return f"❌ {device_address} 报错: {str(e)}" + +def monitor_center(): + """调度中心""" + tasks_data = get_remote_tasks() + + if not tasks_data: + print("📭 接口未返回任何任务,程序退出。") + return + + print(f"🗂️ 发现 {len(tasks_data)} 个待处理账号,开始建立线程池...") + + # 使用线程池并发执行 + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # 提交所有任务 + future_to_device = { + executor.submit(run_task, acc, t): acc + for acc, t in tasks_data.items() + } + + # 实时打印完成情况 + for future in as_completed(future_to_device): + print(future.result()) + +if __name__ == "__main__": + print("🚀 自动化调度程序启动...") + monitor_center() + print("🏁 所有并发任务处理序列结束。") \ No newline at end of file diff --git a/test_results/上传失败的断点.txt b/test_results/上传失败的断点.txt index 62e0a65..fbf556e 100644 --- a/test_results/上传失败的断点.txt +++ b/test_results/上传失败的断点.txt @@ -1,5 +1,9 @@ -CDWZQ-3标-雷庙村大桥-8-号桥台-平原 -CDWZQ-3标-雷庙村特大桥-5-6-号墩-平原 -CDWZQ-3标-金马村特大桥-14-号墩-平原 -CDWZQ-3标-雷庙村特大桥-4#-7-14号墩-平原 -CDWZQ-3标-老屋坡特大桥-14-21-平原 +CDWZQ-2标-区间路基135号-447663-447219-平原 +CDWZQ-2标-蓝家湾特大桥17#墩-448591-0-平原 +CDWZQ-2标-蓝家湾特大桥-0桥台-0-平原 +CDWZQ-2标-一工区-资阳沱江特大桥-10#墩-平原 +CDWZQ-2标-蓝家湾特大桥-1#-6-平原 +CDWZQ-2标-蓝家湾特大桥-19#-号墩-平原 +CDWZQ-2标-一工区-龙家沟右线大桥-0-11号墩-山区 +CDWZQ-2标-二工区-蓝家湾特大桥-18#-山区 +CDWZQ-2标-一工区-资阳沱江特大桥-9#-山区