diff --git a/__pycache__/actions.cpython-312.pyc b/__pycache__/actions.cpython-312.pyc new file mode 100644 index 0000000..2dec727 Binary files /dev/null and b/__pycache__/actions.cpython-312.pyc differ diff --git a/__pycache__/check_station.cpython-312.pyc b/__pycache__/check_station.cpython-312.pyc index 8ca335c..d3d0e8f 100644 Binary files a/__pycache__/check_station.cpython-312.pyc and b/__pycache__/check_station.cpython-312.pyc differ diff --git a/actions.py b/actions.py index d70f38e..5f1d664 100644 --- a/actions.py +++ b/actions.py @@ -23,13 +23,31 @@ import globals.create_link as create_link class DeviceAutomation: - def __init__(self, device_id=None): - # 如果没有提供设备ID,则自动获取 - if device_id is None: - self.device_id = self.get_device_id() - else: - self.device_id = device_id + # def __init__(self, device_id=None): + # # 如果没有提供设备ID,则自动获取 + # if device_id is None: + # self.device_id = driver_utils.get_device_id() + # else: + # self.device_id = device_id + # # 初始化权限 + # if driver_utils.grant_appium_permissions(self.device_id): + # logging.info(f"设备 {self.device_id} 授予Appium权限成功") + # else: + # logging.warning(f"设备 {self.device_id} 授予Appium权限失败") + + # # 确保Appium服务器正在运行,不在运行则启动 + # if not driver_utils.check_server_status(4723): + # driver_utils.start_appium_server() + + # # 初始化Appium驱动和页面对象 + # self.init_driver() + # # 创建测试结果目录 + # self.results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_results') + def __init__(self, driver=None, wait=None, device_id=None): + self.driver = driver + self.wait = wait + self.device_id = device_id # 初始化权限 if driver_utils.grant_appium_permissions(self.device_id): logging.info(f"设备 {self.device_id} 授予Appium权限成功") @@ -42,80 +60,70 @@ class DeviceAutomation: # 初始化Appium驱动和页面对象 self.init_driver() - # 创建测试结果目录 - self.results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_results') - @staticmethod - def get_device_id() -> str: - """ - 获取设备ID,优先使用已连接设备,否则使用全局配置 - """ - try: - # 检查已连接设备 - result = subprocess.run( - ["adb", "devices"], - capture_output=True, - text=True, - timeout=10 - ) + # @staticmethod + # def get_device_id() -> str: + # """ + # 获取设备ID,优先使用已连接设备,否则使用全局配置 + # """ + # try: + # # 检查已连接设备 + # result = subprocess.run( + # ["adb", "devices"], + # capture_output=True, + # text=True, + # timeout=10 + # ) - # # 解析设备列表 - # for line in result.stdout.strip().split('\n')[1:]: - # if line.strip() and "device" in line and "offline" not in line: - # device_id = line.split('\t')[0] - # logging.info(f"使用已连接设备: {device_id}") - # global_variable.GLOBAL_DEVICE_ID = device_id - # return device_id - - target_port = "4723" - for line in result.stdout.strip().split('\n')[1:]: - if line.strip() and "device" in line and "offline" not in line: - device_id = line.split('\t')[0] + # target_port = "4723" + # for line in result.stdout.strip().split('\n')[1:]: + # if line.strip() and "device" in line and "offline" not in line: + # device_id = line.split('\t')[0] - # 检查是否为无线设备且端口为4723 - if ':' in device_id: - ip_port = device_id.split(':') - if len(ip_port) == 2 and ip_port[1] == target_port: - logging.info(f"找到目标无线设备(端口{target_port}): {device_id}") - global_variable.GLOBAL_DEVICE_ID = device_id - return device_id + # # 检查是否为无线设备且端口为4723 + # if ':' in device_id: + # ip_port = device_id.split(':') + # if len(ip_port) == 2 and ip_port[1] == target_port: + # logging.info(f"找到目标无线设备(端口{target_port}): {device_id}") + # global_variable.GLOBAL_DEVICE_ID = device_id + # return device_id - # 如果没有找到端口4723的设备,找其他无线设备 - for line in result.stdout.strip().split('\n')[1:]: - if line.strip() and "device" in line and "offline" not in line: - device_id = line.split('\t')[0] + # # 如果没有找到端口4723的设备,找其他无线设备 + # for line in result.stdout.strip().split('\n')[1:]: + # if line.strip() and "device" in line and "offline" not in line: + # device_id = line.split('\t')[0] - # 检查是否为无线设备(任何端口) - if ':' in device_id and device_id.split(':')[-1].isdigit(): - logging.info(f"未找到端口{target_port}的设备,使用其他无线设备: {device_id}") - global_variable.GLOBAL_DEVICE_ID = device_id - return device_id + # # 检查是否为无线设备(任何端口) + # if ':' in device_id and device_id.split(':')[-1].isdigit(): + # logging.info(f"未找到端口{target_port}的设备,使用其他无线设备: {device_id}") + # global_variable.GLOBAL_DEVICE_ID = device_id + # return device_id - # 如果没有任何无线设备,找有线设备 - for line in result.stdout.strip().split('\n')[1:]: - if line.strip() and "device" in line and "offline" not in line: - device_id = line.split('\t')[0] - logging.info(f"未找到无线设备,使用有线设备: {device_id}") - global_variable.GLOBAL_DEVICE_ID = device_id - return device_id + # # 如果没有任何无线设备,找有线设备 + # for line in result.stdout.strip().split('\n')[1:]: + # if line.strip() and "device" in line and "offline" not in line: + # device_id = line.split('\t')[0] + # logging.info(f"未找到无线设备,使用有线设备: {device_id}") + # global_variable.GLOBAL_DEVICE_ID = device_id + # return device_id - logging.error("未找到任何可用设备") - return None + # logging.error("未找到任何可用设备") + # return None - except Exception as e: - logging.warning(f"设备检测失败: {e}") + # except Exception as e: + # logging.warning(f"设备检测失败: {e}") - # 使用全局配置 - device_id = global_variable.GLOBAL_DEVICE_ID - logging.info(f"使用全局配置设备: {device_id}") - return device_id + # # 使用全局配置 + # device_id = global_variable.GLOBAL_DEVICE_ID + # logging.info(f"使用全局配置设备: {device_id}") + # return device_id def init_driver(self): """初始化Appium驱动""" try: - # 使用全局函数初始化驱动 - self.driver, self.wait = driver_utils.init_appium_driver(self.device_id) + # # 使用全局函数初始化驱动 + # self.driver, self.wait = driver_utils.init_appium_driver(self.device_id) # 初始化页面对象 logging.info(f"设备 {self.device_id} 开始初始化页面对象") self.login_page = LoginPage(self.driver, self.wait) @@ -273,9 +281,9 @@ class DeviceAutomation: # GLOBAL_TESTED_BREAKPOINT_LIST 把已打完的写入日志文件 - with open(os.path.join(self.results_dir, "打数据完成线路.txt"), "w", encoding='utf-8') as f: - for bp in global_variable.GLOBAL_TESTED_BREAKPOINT_LIST: - f.write(f"{bp}\n") + # with open(os.path.join(self.results_dir, "打数据完成线路.txt"), "w", encoding='utf-8') as f: + # for bp in global_variable.GLOBAL_TESTED_BREAKPOINT_LIST: + # f.write(f"{bp}\n") return task_count > 0 diff --git a/check_station.png b/check_station.png index e054821..ff218a3 100644 Binary files a/check_station.png and b/check_station.png differ diff --git a/check_station.py b/check_station.py index 13ff4a0..17f59c5 100644 --- a/check_station.py +++ b/check_station.py @@ -6,6 +6,11 @@ from io import BytesIO import subprocess import globals.global_variable as global_variable import globals.driver_utils as driver_utils # 导入驱动工具模块 +from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException +from appium.webdriver.common.appiumby import AppiumBy +from selenium.webdriver.support import expected_conditions as EC + + @@ -13,7 +18,7 @@ class CheckStation: def __init__(self, driver=None, wait=None,device_id=None): """初始化CheckStation对象""" if device_id is None: - self.device_id = self.get_device_id() + self.device_id = driver_utils.get_device_id() else: self.device_id = device_id if driver is None or wait is None: @@ -21,14 +26,14 @@ class CheckStation: else: self.driver = driver self.wait = wait - if driver_utils.grant_appium_permissions(self.device_id): - logging.info(f"设备 {self.device_id} 授予Appium权限成功") - else: - logging.warning(f"设备 {self.device_id} 授予Appium权限失败") + # if driver_utils.grant_appium_permissions(self.device_id): + # logging.info(f"设备 {self.device_id} 授予Appium权限成功") + # else: + # logging.warning(f"设备 {self.device_id} 授予Appium权限失败") - # 确保Appium服务器正在运行,不在运行则启动 - if not driver_utils.check_server_status(4723): - driver_utils.start_appium_server() + # # 确保Appium服务器正在运行,不在运行则启动 + # if not driver_utils.check_server_status(4723): + # driver_utils.start_appium_server() # 检查应用是否成功启动 if driver_utils.is_app_launched(self.driver): @@ -37,44 +42,60 @@ class CheckStation: logging.warning(f"设备 {self.device_id} 应用可能未正确启动") driver_utils.check_app_status(self.driver) - @staticmethod - def get_device_id() -> str: - """ - 获取设备ID,优先使用已连接设备,否则使用全局配置 - """ - try: - # 检查已连接设备 - result = subprocess.run( - ["adb", "devices"], - capture_output=True, - text=True, - timeout=10 - ) + # @staticmethod + # def get_device_id() -> str: + # """ + # 获取设备ID,优先使用已连接设备,否则使用全局配置 + # """ + # try: + # # 检查已连接设备 + # result = subprocess.run( + # ["adb", "devices"], + # capture_output=True, + # text=True, + # timeout=10 + # ) - # 解析设备列表 - for line in result.stdout.strip().split('\n')[1:]: - if line.strip() and "device" in line and "offline" not in line: - device_id = line.split('\t')[0] - logging.info(f"使用已连接设备: {device_id}") - global_variable.GLOBAL_DEVICE_ID = device_id - return device_id + # # 解析设备列表 + # for line in result.stdout.strip().split('\n')[1:]: + # if line.strip() and "device" in line and "offline" not in line: + # device_id = line.split('\t')[0] + # logging.info(f"使用已连接设备: {device_id}") + # global_variable.GLOBAL_DEVICE_ID = device_id + # return device_id - except Exception as e: - logging.warning(f"设备检测失败: {e}") + # except Exception as e: + # logging.warning(f"设备检测失败: {e}") - # 使用全局配置 - device_id = global_variable.GLOBAL_DEVICE_ID - logging.info(f"使用全局配置设备: {device_id}") - return device_id + # # 使用全局配置 + # device_id = global_variable.GLOBAL_DEVICE_ID + # logging.info(f"使用全局配置设备: {device_id}") + # return device_id def get_measure_data(self): # 模拟获取测量数据 pass + # def add_transition_point(self): + # # 添加转点逻辑 + # print("添加转点") + # return True def add_transition_point(self): - # 添加转点逻辑 - print("添加转点") - return True + """添加转点""" + try: + # 查找并点击添加转点按钮 + add_transition_btn = self.wait.until( + EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/btn_add_ZPoint")) + ) + add_transition_btn.click() + logging.info("已点击添加转点按钮") + return True + except TimeoutException: + logging.error("等待添加转点按钮超时") + return False + except Exception as e: + logging.error(f"添加转点时出错: {str(e)}") + return False def get_excel_from_url(self, url): """ @@ -144,16 +165,18 @@ class CheckStation: print(f"站点{station_num}: {value} -> {result}") return result - - def run(self): - last_station_num = 0 + def main_run(self): + return self.add_transition_point() + + def run(self, station_num: int): + # last_station_num = 0 url = f"https://database.yuxindazhineng.com/team-bucket/69378c5b4f42d83d9504560d/前测点表/20260309/CDWZQ-2标-龙家沟左线大桥-0-11号墩-平原.xlsx" station_data = self.get_excel_from_url(url) print(station_data) - station_quantity = len(station_data) - over_station_num = 0 - over_station_list = [] + station_quantity = len(station_data) #总站点数量 + over_station_num = 0 #已完成的站点数量 + over_station_list = [] #已完成的站点列表 while over_station_num < station_quantity: try: # 键盘输出线路编号 @@ -167,10 +190,10 @@ class CheckStation: print("已处理该站点,跳过") continue - if last_station_num == station_num: - print("输入与上次相同,跳过处理") - continue - last_station_num = station_num + # if last_station_num == station_num: + # print("输入与上次相同,跳过处理") + # continue + # last_station_num = station_num result = self.check_station_exists(station_data, station_num) if result == "error": @@ -205,6 +228,50 @@ class CheckStation: # 截图 self.driver.save_screenshot("check_station.png") return True +def get_excel_from_url(url): + """ + 从URL获取Excel文件并解析为字典 + Excel只有一列数据(A列),每行是站点值 + + Args: + url: Excel文件的URL地址 + + Returns: + dict: 解析后的站点数据字典 {行号: 值},失败返回None + """ + try: + print(f"正在从URL获取数据: {url}") + response = requests.get(url, timeout=30) + response.raise_for_status() # 检查请求是否成功 + + # 使用pandas读取Excel数据,指定没有表头,只读第一个sheet + excel_data = pd.read_excel( + BytesIO(response.content), + header=None, # 没有表头 + sheet_name=0, # 只读取第一个sheet + dtype=str # 全部作为字符串读取 + ) + + station_dict = {} + + # 解析Excel数据:使用行号+1作为站点编号,A列的值作为站点值 + print("解析Excel数据(使用行号作为站点编号)...") + for index, row in excel_data.iterrows(): + station_num = index + 1 # 行号从1开始作为站点编号 + station_value = str(row[0]).strip() if pd.notna(row[0]) else "" + + if station_value: # 只保存非空值 + station_dict[station_num] = station_value + + print(f"成功解析Excel,共{len(station_dict)}条数据") + return station_dict + + except requests.exceptions.RequestException as e: + print(f"请求URL失败: {e}") + return None + except Exception as e: + print(f"解析Excel失败: {e}") + return None if __name__ == "__main__": check_station = CheckStation() diff --git a/ck/device_b.py b/ck/device_b.py index a761684..1185c13 100644 --- a/ck/device_b.py +++ b/ck/device_b.py @@ -1,117 +1,87 @@ -""" -设备B示例 - 接收端 -模拟第二台设备,接收站点数据(用户名、线路名称、站点编号)并应答 -""" - -from serial_protocol import SerialProtocol, Command -from data_models import StationData, ResponseData +import json +import sys import time -import subprocess +import serial +import os + +# 添加项目根目录到Python搜索路径 +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from globals.driver_utils import DriverManager +import globals.driver_utils as driver_utils +import actions +import check_station +from check_station import get_excel_from_url + +PORT = "COM9" +BAUD = 115200 + +RESPONSE_PAYLOAD = { + "1": "xxxx", + "2": "xxxx", + "3": "xxxx", +} -def on_receive(cmd: int, data: bytes): - """接收数据回调函数""" - print("\n" + "="*60) - print("[设备B] 收到数据") - print("="*60) - cmd_name = (Command(cmd).name if cmd in Command._value2member_map_ - else 'UNKNOWN') - print(f"命令类型: 0x{cmd:02X} ({cmd_name})") - - # 根据不同命令进行处理 - if cmd == Command.HEARTBEAT: - print(">>> 收到心跳包") - # 可以回复ACK - device.send_ack() - print("<<< 已发送ACK应答\n") - - elif cmd == Command.DATA_RESPONSE: - # 尝试解析站点数据 - station_data = StationData.from_bytes(data) - if station_data: - print("\n📍 站点数据详情:") - print(f" 用户名称: {station_data.username}") - print(f" 线路名称: {station_data.line_name}") - print(f" 站点编号: 第{station_data.station_no}站") - - # 根据站点编号执行不同逻辑 - if station_data.station_no == 0: - print("\n🚀 站点编号为0,启动 actions.py") - # 启动 actions.py - - subprocess.Popen(["python", "actions.py"], cwd="d:\\Projects\\cjgc_data") - - if station_data.station_no > 0: - print(f"\n🚀 站点编号为{station_data.station_no},启动 check_station.py") - # 启动 check_station.py - - subprocess.Popen(["python", "check_station.py"], cwd="d:\\Projects\\cjgc_data") - - # 创建统一格式的响应字典 {1:"xxx", 2:"xxx", 3:"xxx", ...} - response_dict = ResponseData.create_response(station_data) - print("\n📤 设备B统一响应格式:") - for key, value in response_dict.items(): - print(f" {key}: \"{value}\"") - - # 发送响应 - device.send_data(ResponseData.to_bytes(response_dict)) - print("\n<<< 已发送响应字典\n") - else: - # 如果不是站点数据,按普通消息处理 - message = data.decode('utf-8', errors='ignore') - print(f">>> 收到普通消息: {message}") - device.send_ack() - print("<<< 已发送ACK\n") - - elif cmd == Command.CONTROL: - if len(data) >= 1: - control_code = data[0] - params = data[1:] - print(f">>> 收到控制命令: 0x{control_code:02X}, " - f"参数: {params.hex()}") - # 执行控制逻辑... - device.send_ack() - print("<<< 已发送ACK\n") - - -# 全局变量,用于在回调中访问 -device = None - - -def main(): - global device - - # 创建串口协议实例 - # 注意: 设备B应该使用另一个串口,或者通过虚拟串口对连接 - # Windows: 'COM2', Linux: '/dev/ttyUSB1' - device = SerialProtocol(port='COM2', baudrate=115200) - - print("=" * 60) - print("设备B - 接收端") - print("=" * 60) - - # 打开串口 - if not device.open(): - print("❌ 无法打开串口") - return - - print(f"✓ 串口已打开: {device.port} @ {device.baudrate}") - - # 启动接收线程 - device.start_receive(on_receive) - print("✓ 接收线程已启动,等待接收数据...") +def main() -> int: + driver, wait, device_id = DriverManager.get_driver() + url = f"https://database.yuxindazhineng.com/team-bucket/69378c5b4f42d83d9504560d/前测点表/20260309/CDWZQ-2标-龙家沟左线大桥-0-11号墩-平原.xlsx" + station_data = get_excel_from_url(url) + print(station_data) + station_quantity = len(station_data) #总站点数量 + over_station_num = 0 #已完成的站点数量 + over_station_list = [] try: - # 保持运行 - while True: - time.sleep(1) - - except KeyboardInterrupt: - print("\n\n用户中断") + with serial.Serial(PORT, BAUD, timeout=1) as ser: + print(f"Opened {PORT} at {BAUD} baud") + while True: + line = ser.readline().decode("utf-8", errors="ignore").strip() + if line: + print(f"RX: {line}") + # 解析接收到的JSON数据 + try: + data = json.loads(line) + + # 检查station字段 + if "station" in data: + station_value = data["station"] + + # station等于0时启动actions.py + if station_value == 0: + print("\n🚀 站点编号为0,启动 actions.py") + # 启动 actions.py + app = actions.DeviceAutomation(driver=driver, wait=wait, device_id=device_id) + app.run_automation() + # subprocess.Popen(["python", "actions.py"], cwd="d:\\Projects\\cjgc_data") + + # station大于0时启动check_station.py + elif station_value > 0 and over_station_num < station_quantity and station_value not in over_station_list: + print(f"\n🚀 站点编号为{station_value},启动 check_station.py") + # 启动 check_station.py + # driver_utils.ensure_appium_server_running() + app = check_station.CheckStation(driver, wait, device_id) + station = app.main_run() + if station: + over_station_num += 1 + over_station_list.append(station_value) + # subprocess.Popen(["python", "check_station.py"], cwd="d:\\Projects\\cjgc_data") + + except json.JSONDecodeError: + print("Received non-JSON data") + reply = ( + json.dumps(RESPONSE_PAYLOAD, ensure_ascii=False) + "\n" + ) + ser.write(reply.encode("utf-8")) + print(f"TX: {reply.strip()}") + time.sleep(0.05) + except serial.SerialException as exc: + print(f"Serial error: {exc}") + return 1 finally: - device.close() - print("串口已关闭") + if driver: + DriverManager.quit_driver(device_id) # 安全退出驱动 -if __name__ == '__main__': - main() +if __name__ == "__main__": + sys.exit(main()) diff --git a/globals/__pycache__/driver_utils.cpython-312.pyc b/globals/__pycache__/driver_utils.cpython-312.pyc index 0bbd255..ba0ea22 100644 Binary files a/globals/__pycache__/driver_utils.cpython-312.pyc and b/globals/__pycache__/driver_utils.cpython-312.pyc differ diff --git a/globals/driver_utils.py b/globals/driver_utils.py index 89b1a12..27941c1 100644 --- a/globals/driver_utils.py +++ b/globals/driver_utils.py @@ -16,6 +16,89 @@ import globals.global_variable as global_variable logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s: %(message)s") +class DriverManager: + _driver = None + _wait = None + _device_id = None + + @classmethod + def get_driver(cls): + + # 如果驱动不存在,或者 session 失效,则初始化/重连 + if cls._device_id is None: + cls._device_id = get_device_id() + if grant_appium_permissions(cls._device_id): + logging.info(f"设备 {cls._device_id} 授予Appium权限成功") + else: + logging.warning(f"设备 {cls._device_id} 授予Appium权限失败") + if not check_server_status(4723): + start_appium_server() + if cls._driver is None: + cls._driver, cls._wait = init_appium_driver(cls._device_id) + elif not check_session_valid(cls._driver, cls._device_id): + logging.info("检测到 Session 失效,正在重连...") + cls._driver, cls._wait = reconnect_driver(cls._device_id, cls._driver) + + return cls._driver, cls._wait, cls._device_id + + @classmethod + def quit_driver(cls, device_id): + if cls._driver: + safe_quit_driver(cls._driver, device_id) + cls._driver = None + cls._wait = None + cls._device_id = None +def get_device_id() -> str: + """ + 获取设备ID,优先使用已连接设备,否则使用全局配置 + """ + try: + # 检查已连接设备 + result = subprocess.run( + ["adb", "devices"], + capture_output=True, + text=True, + timeout=10 + ) + + target_port = "4723" + for line in result.stdout.strip().split('\n')[1:]: + if line.strip() and "device" in line and "offline" not in line: + device_id = line.split('\t')[0] + + # 检查是否为无线设备且端口为4723 + if ':' in device_id: + ip_port = device_id.split(':') + if len(ip_port) == 2 and ip_port[1] == target_port: + logging.info(f"找到目标无线设备(端口{target_port}): {device_id}") + global_variable.GLOBAL_DEVICE_ID = device_id + return device_id + + # 如果没有找到端口4723的设备,找其他无线设备 + for line in result.stdout.strip().split('\n')[1:]: + if line.strip() and "device" in line and "offline" not in line: + device_id = line.split('\t')[0] + + # 检查是否为无线设备(任何端口) + if ':' in device_id and device_id.split(':')[-1].isdigit(): + logging.info(f"未找到端口{target_port}的设备,使用其他无线设备: {device_id}") + global_variable.GLOBAL_DEVICE_ID = device_id + return device_id + + # 如果没有任何无线设备,找有线设备 + for line in result.stdout.strip().split('\n')[1:]: + if line.strip() and "device" in line and "offline" not in line: + device_id = line.split('\t')[0] + logging.info(f"未找到无线设备,使用有线设备: {device_id}") + global_variable.GLOBAL_DEVICE_ID = device_id + return device_id + + logging.error("未找到任何可用设备") + return None + + except Exception as e: + logging.warning(f"设备检测失败: {e}") + return None def init_appium_driver(device_id, app_package="com.bjjw.cjgc", app_activity=".activity.LoginActivity"): """ 初始化Appium驱动的全局函数 @@ -599,27 +682,40 @@ def safe_quit_driver(driver, device_id=None): if not hasattr(driver, 'quit'): logging.warning(f"{device_str}驱动对象类型无效,不具有quit方法: {type(driver).__name__}") return - - max_quit_attempts = 3 - for attempt in range(max_quit_attempts): - try: - logging.info(f"{device_str}尝试关闭驱动 (尝试 {attempt + 1}/{max_quit_attempts})") - driver.quit() - logging.info(f"{device_str}驱动已成功关闭") - return - except InvalidSessionIdException: - # 会话已经失效,不需要重试 - logging.info(f"{device_str}会话已经失效,无需关闭") - return - except Exception as e: - logging.error(f"{device_str}关闭驱动时出错 (尝试 {attempt + 1}/{max_quit_attempts}): {str(e)}") - if attempt < max_quit_attempts - 1: - # 等待一段时间后重试 - wait_time = 2 - logging.info(f"{device_str}将在 {wait_time} 秒后重试") - time.sleep(wait_time) - else: - logging.critical(f"{device_str}尝试多次关闭驱动失败,可能导致资源泄漏") + + try: + logging.info(f"{device_str}尝试关闭驱动 (尝试)") + driver.quit() + logging.info(f"{device_str}驱动已成功关闭") + return + except InvalidSessionIdException: + # 会话已经失效,不需要重试 + logging.info(f"{device_str}会话已经失效,无需关闭") + return + except Exception as e: + logging.error(f"{device_str}关闭驱动时出错 : {str(e)}") + return + + # max_quit_attempts = 3 + # for attempt in range(max_quit_attempts): + # try: + # logging.info(f"{device_str}尝试关闭驱动 (尝试 {attempt + 1}/{max_quit_attempts})") + # driver.quit() + # logging.info(f"{device_str}驱动已成功关闭") + # return + # except InvalidSessionIdException: + # # 会话已经失效,不需要重试 + # logging.info(f"{device_str}会话已经失效,无需关闭") + # return + # except Exception as e: + # logging.error(f"{device_str}关闭驱动时出错 (尝试 {attempt + 1}/{max_quit_attempts}): {str(e)}") + # if attempt < max_quit_attempts - 1: + # # 等待一段时间后重试 + # wait_time = 2 + # logging.info(f"{device_str}将在 {wait_time} 秒后重试") + # time.sleep(wait_time) + # else: + # logging.critical(f"{device_str}尝试多次关闭驱动失败,可能导致资源泄漏") def check_app_status(driver, package_name="com.bjjw.cjgc", activity=".activity.LoginActivity"): """ diff --git a/page_objects/__pycache__/section_mileage_config_page.cpython-312.pyc b/page_objects/__pycache__/section_mileage_config_page.cpython-312.pyc index 69e759b..53f2bf0 100644 Binary files a/page_objects/__pycache__/section_mileage_config_page.cpython-312.pyc and b/page_objects/__pycache__/section_mileage_config_page.cpython-312.pyc differ diff --git a/page_objects/section_mileage_config_page.py b/page_objects/section_mileage_config_page.py index c1e8dac..85fa1e5 100644 --- a/page_objects/section_mileage_config_page.py +++ b/page_objects/section_mileage_config_page.py @@ -383,9 +383,9 @@ class SectionMileageConfigPage: # 本次连接失败,判断是否还有剩余重试次数 remaining_times = max_retry_times - current_retry if remaining_times > 0: - self.logger.warning(f"第 {current_retry} 次尝试连接失败,剩余 {remaining_times} 次重试机会") + self.logger.warning(f"1:第 {current_retry} 次尝试连接失败,剩余 {remaining_times} 次重试机会") else: - self.logger.error(f"第 {current_retry} 次尝试连接失败,已达到最大重试次数({max_retry_times}次)") + self.logger.error(f"1:第 {current_retry} 次尝试连接失败,已达到最大重试次数({max_retry_times}次)") else: # 未检测到弹窗,直接尝试连接(逻辑与原代码一致,仅增加重试计数) # self.logger.info("未检测到弹窗,尝试连接设备") @@ -398,9 +398,9 @@ class SectionMileageConfigPage: else: remaining_times = max_retry_times - current_retry if remaining_times > 0: - self.logger.warning(f"第 {current_retry} 次尝试连接失败,剩余 {remaining_times} 次重试机会") + self.logger.warning(f"2:第 {current_retry} 次尝试连接失败,剩余 {remaining_times} 次重试机会") else: - self.logger.error(f"第 {current_retry} 次尝试连接失败,已达到最大重试次数({max_retry_times}次)") + self.logger.error(f"2:第 {current_retry} 次尝试连接失败,已达到最大重试次数({max_retry_times}次)") # 循环结束:3次均失败,返回False return False else: