import subprocess import re import time import requests import json from appium import webdriver from appium.webdriver.common.appiumby import AppiumBy from appium.options.android import UiAutomator2Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from urllib3.connection import port_by_scheme # ======================= # 基础工具函数 # ======================= def run_command(command): """执行系统命令并返回输出""" result = subprocess.run(command, shell=True, capture_output=True, text=True) return result.stdout.strip() # ======================= # 无线ADB连接管理 # ======================= def check_wireless_connections(target_port=4723): """ 检查当前无线ADB连接状态 返回: (list) 当前无线连接的设备列表,每个元素为(device_id, ip, port) """ devices_output = run_command("adb devices") lines = devices_output.splitlines()[1:] wireless_connections = [] for line in lines: if not line.strip(): continue parts = line.split() if len(parts) < 2: continue device_id = parts[0] status = parts[1] # 检查是否为无线连接(包含:端口) if ":" in device_id and status == "device": # 解析IP和端口 ip_port = device_id.split(":") if len(ip_port) == 2: ip, port = ip_port[0], ip_port[1] wireless_connections.append((device_id, ip, int(port))) return wireless_connections def disconnect_wireless_connection(connection_id): """断开指定的无线ADB连接""" print(f" 断开连接: {connection_id}") result = run_command(f"adb disconnect {connection_id}") return result def cleanup_wireless_connections(target_device_ip=None, target_port=4723): """ 清理无线ADB连接 - 如果target_device_ip为None:断开所有端口为4723的连接 - 如果target_device_ip有值:断开所有端口为4723且IP不是目标设备的连接 返回: (bool) 是否需要建立新连接 """ print("\n🔍 检查无线ADB连接状态...") # 获取当前所有无线连接 wireless_connections = check_wireless_connections(target_port) if not wireless_connections: print("📡 当前没有无线ADB连接") return True # 需要建立新连接 print(f"📡 发现 {len(wireless_connections)} 个无线连接:") for conn_id, ip, port in wireless_connections: print(f" - {conn_id} (IP: {ip}, 端口: {port})") need_new_connection = True connections_to_disconnect = [] for conn_id, ip, port in wireless_connections: # 检查端口是否为4723 if port != target_port: print(f" ⚠️ 连接 {conn_id} 端口不是 {target_port},保持不动") continue # 如果没有指定目标IP,断开所有4723端口的连接 if target_device_ip is None: connections_to_disconnect.append(conn_id) continue # 如果指定了目标IP,检查IP是否匹配 if ip == target_device_ip: print(f" ✅ 发现目标设备的连接: {conn_id}") need_new_connection = False # 已有正确连接,不需要新建 else: print(f" ⚠️ 发现其他设备的4723端口连接: {conn_id}") connections_to_disconnect.append(conn_id) # 断开需要清理的连接 for conn_id in connections_to_disconnect: disconnect_wireless_connection(conn_id) time.sleep(1) # 等待断开完成 # 如果断开了一些连接,重新检查状态 if connections_to_disconnect: print("🔄 重新检查连接状态...") time.sleep(2) remaining = check_wireless_connections(target_port) if remaining: for conn_id, ip, port in remaining: if ip == target_device_ip and port == target_port: print(f" ✅ 目标设备连接仍然存在: {conn_id}") need_new_connection = False break return need_new_connection # ======================= # Appium 启动 # ======================= def start_appium(): appium_port = 4723 print(f"🚀 启动 Appium Server(端口 {appium_port})...") subprocess.Popen( ["appium.cmd", "-a", "127.0.0.1", "-p", str(appium_port)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # 检查端口是否就绪(替代固定sleep) max_wait = 30 # 最大等待30秒 start_time = time.time() while time.time() - start_time < max_wait: try: # 尝试连接Appium端口,验证是否就绪 import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex(("127.0.0.1", appium_port)) sock.close() if result == 0: # 端口就绪 print(f"✅ Appium Server 启动成功(端口 {appium_port})") return True except Exception: pass time.sleep(1) print(f"❌ Appium Server 启动超时({max_wait}秒)") return False # ======================= # 无线 ADB 建链主流程 # ======================= def setup_adb_wireless(): target_port = 4723 print(f"🚀 开始无线 ADB 建链(端口 {target_port})") # 获取USB连接的设备 devices_output = run_command("adb devices") lines = devices_output.splitlines()[1:] usb_devices = [] for line in lines: if not line.strip(): continue parts = line.split() if len(parts) < 2 or parts[1] != "device": continue device_id = parts[0] # 跳过已经是无线的 if ":" in device_id: continue usb_devices.append(device_id) if not usb_devices: print("❌ 未检测到 USB 设备") return for serial in usb_devices: print(f"\n🔎 处理设备: {serial}") # 获取WLAN IP ip_info = run_command(f"adb -s {serial} shell ip addr show wlan0") ip_match = re.search(r'inet\s+(\d+\.\d+\.\d+\.\d+)', ip_info) if not ip_match: print("⚠️ 获取 IP 失败,请确认已连接 WiFi") continue device_ip = ip_match.group(1) print(f"📍 设备 IP: {device_ip}") # ===== 清理现有无线连接 ===== need_new_connection = cleanup_wireless_connections( target_device_ip=device_ip, target_port=target_port ) # ===== 建立新连接(如果需要) ===== if need_new_connection: print(f"\n🔌 建立新的无线连接: {device_ip}:{target_port}") # 切 TCP 模式 print(f" 设置设备 {serial} 为 TCP 模式,端口 {target_port}...") run_command(f"adb -s {serial} tcpip {target_port}") time.sleep(3) # 等待模式切换 # 无线连接 connect_result = run_command(f"adb connect {device_ip}:{target_port}") time.sleep(2) # 等待连接稳定 if "connected" not in connect_result.lower(): print(f"❌ 无线连接失败: {connect_result}") continue else: print(f"✅ 无线连接成功: {device_ip}:{target_port}") else: print(f"✅ 已存在目标设备的有效连接,跳过新建") # 验证连接 wireless_id = f"{device_ip}:{target_port}" verify_result = run_command("adb devices") if wireless_id in verify_result and "device" in verify_result.split(wireless_id)[1][:10]: print(f"✅ 连接验证通过: {wireless_id}") else: print(f"❌ 连接验证失败,请检查") continue # ===== 后续自动化 ===== if not start_appium(): print("❌ Appium启动失败,跳过后续操作") continue # driver, app_started = start_settlement_app(wireless_id, device_ip, target_port) # if not app_started: # print("⚠️ App启动失败,跳过后续操作") # continue print(f"🎉 所有操作完成! 设备 {serial} 已就绪") # # 关闭Appium连接 # if driver: # print("🔄 关闭Appium连接...") # driver.quit() break # 处理完第一个设备后退出,如需处理多个设备可移除此行 # ======================= # 程序入口 # ======================= if __name__ == "__main__": # 配置参数 setup_adb_wireless()