Files
cjgc_data/globals/create_link.py
2026-03-12 17:03:56 +08:00

273 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
import re
import time
import requests
import json
from appium import webdriver
from appium.webdriver.common.appiumby import AppiumBy
from appium.options.android import UiAutomator2Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from urllib3.connection import port_by_scheme
# =======================
# 基础工具函数
# =======================
def run_command(command):
"""执行系统命令并返回输出"""
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return result.stdout.strip()
# =======================
# 无线ADB连接管理
# =======================
def check_wireless_connections(target_port=4723):
"""
检查当前无线ADB连接状态
返回: (list) 当前无线连接的设备列表,每个元素为(device_id, ip, port)
"""
devices_output = run_command("adb devices")
lines = devices_output.splitlines()[1:]
wireless_connections = []
for line in lines:
if not line.strip():
continue
parts = line.split()
if len(parts) < 2:
continue
device_id = parts[0]
status = parts[1]
# 检查是否为无线连接(包含:端口)
if ":" in device_id and status == "device":
# 解析IP和端口
ip_port = device_id.split(":")
if len(ip_port) == 2:
ip, port = ip_port[0], ip_port[1]
wireless_connections.append((device_id, ip, int(port)))
return wireless_connections
def disconnect_wireless_connection(connection_id):
"""断开指定的无线ADB连接"""
print(f" 断开连接: {connection_id}")
result = run_command(f"adb disconnect {connection_id}")
return result
def cleanup_wireless_connections(target_device_ip=None, target_port=4723):
"""
清理无线ADB连接
- 如果target_device_ip为None断开所有端口为4723的连接
- 如果target_device_ip有值断开所有端口为4723且IP不是目标设备的连接
返回: (bool) 是否需要建立新连接
"""
print("\n🔍 检查无线ADB连接状态...")
# 获取当前所有无线连接
wireless_connections = check_wireless_connections(target_port)
if not wireless_connections:
print("📡 当前没有无线ADB连接")
return True # 需要建立新连接
print(f"📡 发现 {len(wireless_connections)} 个无线连接:")
for conn_id, ip, port in wireless_connections:
print(f" - {conn_id} (IP: {ip}, 端口: {port})")
need_new_connection = True
connections_to_disconnect = []
for conn_id, ip, port in wireless_connections:
# 检查端口是否为4723
if port != target_port:
print(f" ⚠️ 连接 {conn_id} 端口不是 {target_port},保持不动")
continue
# 如果没有指定目标IP断开所有4723端口的连接
if target_device_ip is None:
connections_to_disconnect.append(conn_id)
continue
# 如果指定了目标IP检查IP是否匹配
if ip == target_device_ip:
print(f" ✅ 发现目标设备的连接: {conn_id}")
need_new_connection = False # 已有正确连接,不需要新建
else:
print(f" ⚠️ 发现其他设备的4723端口连接: {conn_id}")
connections_to_disconnect.append(conn_id)
# 断开需要清理的连接
for conn_id in connections_to_disconnect:
disconnect_wireless_connection(conn_id)
time.sleep(1) # 等待断开完成
# 如果断开了一些连接,重新检查状态
if connections_to_disconnect:
print("🔄 重新检查连接状态...")
time.sleep(2)
remaining = check_wireless_connections(target_port)
if remaining:
for conn_id, ip, port in remaining:
if ip == target_device_ip and port == target_port:
print(f" ✅ 目标设备连接仍然存在: {conn_id}")
need_new_connection = False
break
return need_new_connection
# =======================
# Appium 启动
# =======================
def start_appium():
appium_port = 4723
print(f"🚀 启动 Appium Server端口 {appium_port}...")
subprocess.Popen(
["appium.cmd", "-a", "127.0.0.1", "-p", str(appium_port)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# 检查端口是否就绪替代固定sleep
max_wait = 30 # 最大等待30秒
start_time = time.time()
while time.time() - start_time < max_wait:
try:
# 尝试连接Appium端口验证是否就绪
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(("127.0.0.1", appium_port))
sock.close()
if result == 0: # 端口就绪
print(f"✅ Appium Server 启动成功(端口 {appium_port}")
return True
except Exception:
pass
time.sleep(1)
print(f"❌ Appium Server 启动超时({max_wait}秒)")
return False
# =======================
# 无线 ADB 建链主流程
# =======================
def setup_adb_wireless():
target_port = 4723
print(f"🚀 开始无线 ADB 建链(端口 {target_port}")
# 获取USB连接的设备
devices_output = run_command("adb devices")
lines = devices_output.splitlines()[1:]
usb_devices = []
for line in lines:
if not line.strip():
continue
parts = line.split()
if len(parts) < 2 or parts[1] != "device":
continue
device_id = parts[0]
# 跳过已经是无线的
if ":" in device_id:
continue
usb_devices.append(device_id)
if not usb_devices:
print("❌ 未检测到 USB 设备")
return
for serial in usb_devices:
print(f"\n🔎 处理设备: {serial}")
# 获取WLAN IP
ip_info = run_command(f"adb -s {serial} shell ip addr show wlan0")
ip_match = re.search(r'inet\s+(\d+\.\d+\.\d+\.\d+)', ip_info)
if not ip_match:
print("⚠️ 获取 IP 失败,请确认已连接 WiFi")
continue
device_ip = ip_match.group(1)
print(f"📍 设备 IP: {device_ip}")
# ===== 清理现有无线连接 =====
need_new_connection = cleanup_wireless_connections(
target_device_ip=device_ip,
target_port=target_port
)
# ===== 建立新连接(如果需要) =====
if need_new_connection:
print(f"\n🔌 建立新的无线连接: {device_ip}:{target_port}")
# 切 TCP 模式
print(f" 设置设备 {serial} 为 TCP 模式,端口 {target_port}...")
run_command(f"adb -s {serial} tcpip {target_port}")
time.sleep(3) # 等待模式切换
# 无线连接
connect_result = run_command(f"adb connect {device_ip}:{target_port}")
time.sleep(2) # 等待连接稳定
if "connected" not in connect_result.lower():
print(f"❌ 无线连接失败: {connect_result}")
continue
else:
print(f"✅ 无线连接成功: {device_ip}:{target_port}")
else:
print(f"✅ 已存在目标设备的有效连接,跳过新建")
# 验证连接
wireless_id = f"{device_ip}:{target_port}"
verify_result = run_command("adb devices")
if wireless_id in verify_result and "device" in verify_result.split(wireless_id)[1][:10]:
print(f"✅ 连接验证通过: {wireless_id}")
else:
print(f"❌ 连接验证失败,请检查")
continue
# ===== 后续自动化 =====
if not start_appium():
print("❌ Appium启动失败跳过后续操作")
continue
# driver, app_started = start_settlement_app(wireless_id, device_ip, target_port)
# if not app_started:
# print("⚠️ App启动失败跳过后续操作")
# continue
print(f"🎉 所有操作完成! 设备 {serial} 已就绪")
# # 关闭Appium连接
# if driver:
# print("🔄 关闭Appium连接...")
# driver.quit()
break # 处理完第一个设备后退出,如需处理多个设备可移除此行
# =======================
# 程序入口
# =======================
if __name__ == "__main__":
# 配置参数
setup_adb_wireless()