# screenshot_page.py import subprocess import logging import time import re import os from datetime import datetime from appium.webdriver.common.appiumby import AppiumBy from selenium.common.exceptions import NoSuchElementException, TimeoutException from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import globals.global_variable as global_variable # 导入全局变量模块 import globals.ids as ids # 导入全局驱动工具函数 from globals.driver_utils import check_session_valid, reconnect_driver import globals.driver_utils as driver_utils import globals.global_variable as global_variable # 导入全局变量模块 class ScreenshotPage: def __init__(self, driver, wait, device_id=None): self.driver = driver self.wait = wait self.device_id = device_id self.logger = logging.getLogger(__name__) self.all_items = set() def scroll_list(self, direction="down"): """滑动列表以加载更多项目 Args: direction: 滑动方向,"down"表示向下滑动,"up"表示向上滑动 Returns: bool: 滑动是否成功执行,对于向上滑动,如果滑动到顶则返回False """ try: # 获取列表容器 list_container = self.driver.find_element(AppiumBy.ID, ids.MEASURE_LIST_ID) # 计算滑动坐标 start_x = list_container.location['x'] + list_container.size['width'] // 2 if direction == "down": # 向下滑动 start_y = list_container.location['y'] + list_container.size['height'] * 0.95 end_y = list_container.location['y'] + list_container.size['height'] * 0.05 self.logger.info("向下滑动列表") else: # 向上滑动 # 记录滑动前的项目,用于判断是否滑动到顶 before_scroll_items = self.get_current_items() start_y = list_container.location['y'] + list_container.size['height'] * 0.05 end_y = list_container.location['y'] + list_container.size['height'] * 0.95 self.logger.info("向上滑动列表") # 执行滑动 self.driver.swipe(start_x, start_y, start_x, end_y, 1000) # 向上滑动时,检查是否滑动到顶 if direction == "up": after_scroll_items = self.get_current_items() # 如果滑动后的项目与滑动前的项目相同,说明已经滑动到顶 if after_scroll_items == before_scroll_items: self.logger.info("已滑动到列表顶部,列表内容不变") return False return True except Exception as e: self.logger.error(f"滑动列表失败: {str(e)}") return False def get_current_items(self): """获取当前页面中的所有项目文本""" try: items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID) item_texts = [] for item in items: try: title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID) if title_element and title_element.text: item_texts.append(title_element.text) except NoSuchElementException: continue return item_texts except Exception as e: self.logger.error(f"获取当前项目失败: {str(e)}") return [] def click_item_by_text(self, text): """点击指定文本的项目""" try: # 查找包含指定文本的项目 items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID) for item in items: try: title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID) if title_element and title_element.text == text: title_element.click() self.logger.info(f"已点击项目: {text}") return True except NoSuchElementException: continue self.logger.warning(f"未找到可点击的项目: {text}") return False except Exception as e: self.logger.error(f"点击项目失败: {str(e)}") return False def find_keyword(self, fixed_filename): """查找指定关键词并点击,支持向下和向上滑动查找""" try: if not check_session_valid(self.driver, self.device_id): self.logger.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...") if not reconnect_driver(self.device_id, self.driver): self.logger.error(f"设备 {self.device_id} 驱动重连失败") # 等待线路列表容器出现 self.wait.until( EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID)) ) self.logger.info("线路列表容器已找到") max_scroll_attempts = 100 # 最大滚动尝试次数 scroll_count = 0 previous_items = set() # 记录前一次获取的项目集合,用于检测是否到达边界 # 首先尝试向下滑动查找 while scroll_count < max_scroll_attempts: # 获取当前页面中的所有项目 current_items = self.get_current_items() # self.logger.info(f"当前页面找到 {len(current_items)} 个项目: {current_items}") # 检查目标文件是否在当前页面中 if fixed_filename in current_items: self.logger.info(f"找到目标文件: {fixed_filename}") # 点击目标文件 if self.click_item_by_text(fixed_filename): return True else: self.logger.error(f"点击目标文件失败: {fixed_filename}") return False # 检查是否到达底部:连续两次获取的项目相同 if current_items == previous_items and len(current_items) > 0: self.logger.info("连续两次获取的项目相同,已到达列表底部") break # 更新前一次项目集合 previous_items = current_items.copy() # 向下滑动列表以加载更多项目 if not self.scroll_list(direction="down"): self.logger.error("向下滑动列表失败") return False scroll_count += 1 self.logger.info(f"第 {scroll_count} 次向下滑动,继续查找...") # 如果向下滑动未找到,尝试向上滑动查找 self.logger.info("向下滑动未找到目标,开始向上滑动查找") # 重置滚动计数 scroll_count = 0 while scroll_count < max_scroll_attempts: # 向上滑动列表 # 如果返回False,说明已经滑动到顶 if not self.scroll_list(direction="up"): # 检查是否是因为滑动到顶而返回False if "已滑动到列表顶部" in self.logger.handlers[0].buffer[-1].message: self.logger.info("已滑动到列表顶部,停止向上滑动") break else: self.logger.error("向上滑动列表失败") return False # 获取当前页面中的所有项目 current_items = self.get_current_items() # self.logger.info(f"向上滑动后找到 {len(current_items)} 个项目: {current_items}") # 检查目标文件是否在当前页面中 if fixed_filename in current_items: self.logger.info(f"找到目标文件: {fixed_filename}") # 点击目标文件 if self.click_item_by_text(fixed_filename): return True else: self.logger.error(f"点击目标文件失败: {fixed_filename}") return False scroll_count += 1 self.logger.info(f"第 {scroll_count} 次向上滑动,继续查找...") self.logger.warning(f"经过 {max_scroll_attempts * 2} 次滑动仍未找到目标文件") return False except TimeoutException: self.logger.error("等待线路列表元素超时") return False except Exception as e: self.logger.error(f"查找关键词时出错: {str(e)}") return False def handle_measurement_dialog(self): """处理测量弹窗 - 选择继续测量""" try: self.logger.info("检查测量弹窗...") # 直接尝试点击"继续测量"按钮 continue_btn = WebDriverWait(self.driver, 2).until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/measure_continue_btn")) ) continue_btn.click() self.logger.info("已点击'继续测量'按钮") return True except TimeoutException: self.logger.info("未找到继续测量按钮,可能没有弹窗") return True # 没有弹窗也认为是成功的 except Exception as e: self.logger.error(f"点击继续测量按钮时出错: {str(e)}") return False # 检查有没有平差处理按钮 def check_apply_btn(self): """检查是否有平差处理按钮""" try: apply_btn = WebDriverWait(self.driver, 5).until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn")) ) if apply_btn.is_displayed(): logging.info("进入平差页面") else: self.logger.info("没有找到'平差处理'按钮") return True except TimeoutException: self.logger.info("未找到平差处理按钮") return False # 没有弹窗也认为是成功的 except Exception as e: self.logger.error(f"点击平差处理按钮时出错: {str(e)}") return False def get_line_end_time(self, line_code): """ 从全局字典中获取线路编码对应的结束时间 参数: line_code: 线路编码 返回: tuple: (date_str, time_str) 日期和时间字符串 """ if line_code in global_variable.get_line_time_mapping_dict(): end_time = global_variable.get_line_time_mapping_dict()[line_code] date_str = end_time.strftime("%Y-%m-%d") time_str = end_time.strftime("%H:%M:%S") return (date_str, time_str) else: self.logger.warning(f"未找到线路编码 {line_code} 的结束时间") return (None, None) def show_line_time_mapping(self): """ 显示当前全局字典中的所有线路编码和时间 """ self.logger.info("\n当前全局字典内容:") if global_variable.get_line_time_mapping_dict(): for line_code, end_time in sorted(global_variable.get_line_time_mapping_dict().items()): date_str = end_time.strftime("%Y-%m-%d") time_str = end_time.strftime("%H:%M:%S") self.logger.info(f" {line_code}: {date_str} {time_str}") else: self.logger.info(" 全局字典为空") self.logger.info(f"总计: {len(global_variable.get_line_time_mapping_dict())} 条记录\n") def clear_line_time_mapping(self): """ 清空全局字典 """ global_variable.get_line_time_mapping_dict().clear() self.logger.info("已清空全局字典") def set_device_time(self, device_id, time_str=None, date_str=None, disable_auto_sync=True): """ 通过ADB设置设备时间(带管理员权限) 参数: device_id: 设备ID time_str: 时间字符串,格式 "HH:MM:SS" (例如: "14:30:00") date_str: 日期字符串,格式 "YYYY-MM-DD" (例如: "2024-10-15") disable_auto_sync: 是否禁用自动时间同步(防止设置的时间被网络时间覆盖) 返回: bool: 操作是否成功 """ try: if time_str is None and date_str is None: return True # 首先尝试获取设备的root权限 self.logger.info(f"尝试获取设备 {device_id} 的root权限...") root_result = subprocess.run( ["adb", "-s", device_id, "root"], capture_output=True, text=True, timeout=10 ) # 检查root权限获取是否成功(有些设备可能返回非0但实际已获取权限) if root_result.returncode != 0: self.logger.warning(f"获取root权限返回非0状态码,但继续尝试操作: {root_result.stderr.strip()}") now = datetime.now() hour, minute, second = map(int, (time_str or f"{now.hour}:{now.minute}:{now.second}").split(":")) year, month, day = map(int, (date_str or f"{now.year}-{now.month}-{now.day}").split("-")) # 禁用自动同步 if disable_auto_sync: # 使用su命令以root权限执行设置 subprocess.run( ["adb", "-s", device_id, "shell", "su", "-c", "settings put global auto_time 0"], timeout=5 ) subprocess.run( ["adb", "-s", device_id, "shell", "su", "-c", "settings put global auto_time_zone 0"], timeout=5 ) # 优先尝试旧格式 (MMDDhhmmYYYY.ss) adb_time_str_old = f"{month:02d}{day:02d}{hour:02d}{minute:02d}{year:04d}.{second:02d}" cmd_old = [ "adb", "-s", device_id, "shell", "su", "-c", f"date {adb_time_str_old}" ] result = subprocess.run(cmd_old, capture_output=True, text=True, timeout=10) if result.returncode != 0: self.logger.warning(f"旧格式失败,尝试新格式设置日期时间") # 尝试新格式(Toybox),使用su -c确保以root权限执行 adb_time_str_new = f"{year:04d}{month:02d}{day:02d}.{hour:02d}{minute:02d}{second:02d}" cmd_new = [ "adb", "-s", device_id, "shell", "su", "-c", f"date {adb_time_str_new}" ] result = subprocess.run(cmd_new, capture_output=True, text=True, timeout=10) if result.returncode == 0: self.logger.info(f"设备 {device_id} 时间设置成功: {year}-{month}-{day} {hour}:{minute}:{second}") return True else: self.logger.error(f"设备 {device_id} 设置时间失败: {result.stderr.strip()}") return False except subprocess.TimeoutExpired: self.logger.error(f"设备 {device_id} 设置时间命令执行超时") return False except Exception as e: self.logger.error(f"设备 {device_id} 设置时间时发生异常: {str(e)}") return False def disable_wifi(self, device_id): """ 通过ADB关闭设备WiFi 返回: bool: 操作是否成功 """ try: # 关闭WiFi cmd_disable_wifi = [ "adb", "-s", device_id, "shell", "svc", "wifi", "disable" ] result = subprocess.run(cmd_disable_wifi, capture_output=True, text=True, timeout=10) if result.returncode == 0: self.logger.info(f"设备 {device_id} WiFi已关闭") time.sleep(1) # 等待WiFi完全关闭 return True else: self.logger.error(f"设备 {device_id} 关闭WiFi失败: {result.stderr}") return False except subprocess.TimeoutExpired: self.logger.error(f"设备 {device_id} 关闭WiFi命令执行超时") return False except Exception as e: self.logger.error(f"设备 {device_id} 关闭WiFi时发生错误: {str(e)}") return False def enable_wifi(self, device_id): """ 通过ADB打开设备WiFi 返回: bool: 操作是否成功 """ try: # 打开WiFi cmd_enable_wifi = [ "adb", "-s", device_id, "shell", "svc", "wifi", "enable" ] result = subprocess.run(cmd_enable_wifi, capture_output=True, text=True, timeout=10) if result.returncode == 0: self.logger.info(f"设备 {device_id} WiFi已打开") time.sleep(3) # 等待WiFi完全连接 return True else: self.logger.error(f"设备 {device_id} 打开WiFi失败: {result.stderr}") return False except subprocess.TimeoutExpired: self.logger.error(f"设备 {device_id} 打开WiFi命令执行超时") return False except Exception as e: self.logger.error(f"设备 {device_id} 打开WiFi时发生错误: {str(e)}") return False def get_current_time(self, device_id): """ 获取设备当前时间 返回: str: 设备当前时间字符串,如果获取失败则返回None """ try: cmd_get_time = [ "adb", "-s", device_id, "shell", "date" ] result = subprocess.run(cmd_get_time, capture_output=True, text=True, timeout=10) if result.returncode == 0: current_time = result.stdout.strip() self.logger.info(f"设备 {device_id} 当前时间: {current_time}") return current_time else: self.logger.error(f"设备 {device_id} 获取时间失败: {result.stderr}") return None except subprocess.TimeoutExpired: self.logger.error(f"设备 {device_id} 获取时间命令执行超时") return None except Exception as e: self.logger.error(f"设备 {device_id} 获取时间时发生错误: {str(e)}") return None def check_wifi_status(self, device_id): """ 检查设备WiFi状态 返回: str: "enabled"表示已开启, "disabled"表示已关闭, None表示获取失败 """ try: cmd_check_wifi = [ "adb", "-s", device_id, "shell", "dumpsys", "wifi", "|", "grep", "Wi-Fi" ] result = subprocess.run(cmd_check_wifi, capture_output=True, text=True, timeout=10) if result.returncode == 0: wifi_status = result.stdout.strip() if "enabled" in wifi_status.lower(): self.logger.info(f"设备 {device_id} WiFi状态: 已开启") return "enabled" elif "disabled" in wifi_status.lower(): self.logger.info(f"设备 {device_id} WiFi状态: 已关闭") return "disabled" else: self.logger.warning(f"设备 {device_id} 无法确定WiFi状态: {wifi_status}") return None else: # 尝试另一种方法检查WiFi状态 cmd_check_wifi_alt = [ "adb", "-s", device_id, "shell", "settings", "get", "global", "wifi_on" ] result_alt = subprocess.run(cmd_check_wifi_alt, capture_output=True, text=True, timeout=10) if result_alt.returncode == 0: wifi_on = result_alt.stdout.strip() if wifi_on == "1": self.logger.info(f"设备 {device_id} WiFi状态: 已开启") return "enabled" elif wifi_on == "0": self.logger.info(f"设备 {device_id} WiFi状态: 已关闭") return "disabled" else: self.logger.warning(f"设备 {device_id} 无法确定WiFi状态: {wifi_on}") return None else: self.logger.error(f"设备 {device_id} 检查WiFi状态失败: {result_alt.stderr}") return None except subprocess.TimeoutExpired: self.logger.error(f"设备 {device_id} 检查WiFi状态命令执行超时") return None except Exception as e: self.logger.error(f"设备 {device_id} 检查WiFi状态时发生错误: {str(e)}") return None def take_screenshot(self, filename_prefix="screenshot"): """ 通过Appium驱动截取设备屏幕 参数: filename_prefix: 截图文件前缀 返回: bool: 操作是否成功 """ try: # 创建测试结果目录 screenshots_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../test_results/screenshots') if not os.path.exists(screenshots_dir): os.makedirs(screenshots_dir) self.logger.info(f"创建截图目录: {screenshots_dir}") # 截图保存 screenshot_file = os.path.join( screenshots_dir, f"{filename_prefix}_{datetime.now().strftime('%Y%m%d')}.png" ) self.driver.save_screenshot(screenshot_file) self.logger.info(f"截图已保存: {screenshot_file}") return True except Exception as e: self.logger.error(f"截图时发生错误: {str(e)}") return False def wait_for_measurement_end(self, timeout=900): """ 等待按钮变成"测量结束",最多15分钟,包含驱动重新初始化机制 Args: timeout: 超时时间,默认900秒(15分钟) Returns: bool: 是否成功等到测量结束按钮 """ try: # 更新WebDriverWait等待时间为900秒 self.wait = WebDriverWait(self.driver, 900) self.logger.info(f"设备等待测量结束按钮出现,最多等待 {timeout} 秒") start_time = time.time() reinit_attempts = 0 max_reinit_attempts = 3 # 最大重新初始化次数 while time.time() - start_time < timeout: try: # 使用XPath查找文本为"测量结束"的按钮 measurement_end_button = self.driver.find_element( AppiumBy.XPATH, "//android.widget.Button[@text='测量结束']" ) if measurement_end_button.is_displayed() and measurement_end_button.is_enabled(): self.logger.info(f"设备检测到测量结束按钮") return True except NoSuchElementException: # 按钮未找到,继续等待 pass except Exception as e: error_msg = str(e) self.logger.warning(f"设备查找测量结束按钮时出现异常: {error_msg}") # 检测是否是UiAutomator2服务崩溃 if 'UiAutomator2 server' in error_msg and 'instrumentation process is not running' in error_msg and reinit_attempts < max_reinit_attempts: reinit_attempts += 1 self.logger.info(f"设备检测到UiAutomator2服务崩溃,尝试第 {reinit_attempts} 次重新初始化驱动") # 尝试重新初始化驱动 if self._reinit_driver(): self.logger.info(f"设备驱动重新初始化成功") else: self.logger.error(f"设备驱动重新初始化失败") # 继续尝试,而不是立即失败 # 等待一段时间后再次检查 time.sleep(3) # 每30秒输出一次等待状态 if int(time.time() - start_time) % 30 == 0: elapsed = int(time.time() - start_time) self.logger.info(f"设备 {self.device_id} 已等待 {elapsed} 秒,仍在等待测量结束...") self.logger.error(f"设备 {self.device_id} 等待测量结束按钮超时") return False except Exception as e: self.logger.error(f"设备 {self.device_id} 等待测量结束时发生错误: {str(e)}") return False def _reinit_driver(self): """ 重新初始化Appium驱动 Returns: bool: 是否成功重新初始化 """ try: # 首先尝试关闭现有的驱动 if hasattr(self, 'driver') and self.driver: try: self.driver.quit() except: self.logger.warning("关闭现有驱动时出现异常") # 导入必要的模块 from appium import webdriver from appium.options.android import UiAutomator2Options # 重新创建驱动配置 options = UiAutomator2Options() options.platform_name = "Android" options.device_name = self.device_id options.app_package = "com.bjjw.cjgc" options.app_activity = ".activity.LoginActivity" options.automation_name = "UiAutomator2" options.no_reset = True options.auto_grant_permissions = True options.new_command_timeout = 300 options.udid = self.device_id # 重新连接驱动 self.logger.info(f"正在重新初始化设备 {self.device_id} 的驱动...") self.driver = webdriver.Remote("http://localhost:4723", options=options) # 重新初始化等待对象 from selenium.webdriver.support.ui import WebDriverWait self.wait = WebDriverWait(self.driver, 1) self.logger.info(f"设备 {self.device_id} 驱动重新初始化完成") return True except Exception as e: self.logger.error(f"设备 {self.device_id} 驱动重新初始化失败: {str(e)}") return False def handle_confirmation_dialog(self, device_id, timeout=2): """ 处理确认弹窗,点击"是"按钮 Args: device_id: 设备ID timeout: 等待弹窗的超时时间 Returns: bool: 是否成功处理弹窗 """ # 等待弹窗出现(最多等待2秒) try: dialog_message = WebDriverWait(self.driver, timeout).until( EC.presence_of_element_located((AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']")) ) self.logger.info(f"设备 {device_id} 检测到确认弹窗") # 查找并点击"是"按钮 confirm_button = self.driver.find_element( AppiumBy.XPATH, "//android.widget.Button[@text='是' and @resource-id='android:id/button1']" ) if confirm_button.is_displayed() and confirm_button.is_enabled(): self.logger.info(f"设备 {device_id} 点击确认弹窗的'是'按钮") confirm_button.click() time.sleep(0.5) return True else: self.logger.error(f"设备 {device_id} '是'按钮不可点击") return False except TimeoutException: # 超时未找到弹窗,认为没有弹窗,返回成功 self.logger.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功") return True def click_back_button(self, device_id): """点击手机系统返回按钮""" try: self.driver.back() self.logger.info("已点击手机系统返回按钮") return True except Exception as e: self.logger.error(f"点击手机系统返回按钮失败: {str(e)}") return False def handle_back_button_with_confirmation(self, device_id, timeout=10): """ 处理返回按钮的确认弹窗 Args: device_id: 设备ID timeout: 等待弹窗的超时时间 Returns: bool: 是否成功处理返回确认弹窗 """ logging.info(f"进入handle_back_button_with_confirmation函数") try: self.logger.info(f"设备 {device_id} 等待返回确认弹窗出现") start_time = time.time() while time.time() - start_time < timeout: try: # 检查是否存在确认弹窗 - 使用多种定位策略提高兼容性 dialog_selectors = [ "//android.widget.TextView[@text='是否退出测量界面?']", "//android.widget.TextView[contains(@text, '退出测量界面')]", "//android.widget.TextView[contains(@text, '是否退出')]" ] dialog_message = None for selector in dialog_selectors: try: dialog_message = self.driver.find_element(AppiumBy.XPATH, selector) if dialog_message.is_displayed(): break except NoSuchElementException: continue if dialog_message and dialog_message.is_displayed(): self.logger.info(f"设备 {device_id} 检测到返回确认弹窗") # 查找并点击"是"按钮 - 使用多种定位策略 confirm_selectors = [ "//android.widget.Button[@text='是' and @resource-id='android:id/button1']", "//android.widget.Button[@text='是']", "//android.widget.Button[@resource-id='android:id/button1']", "//android.widget.Button[contains(@text, '是')]" ] confirm_button = None for selector in confirm_selectors: try: confirm_button = self.driver.find_element(AppiumBy.XPATH, selector) if confirm_button.is_displayed() and confirm_button.is_enabled(): break except NoSuchElementException: continue if confirm_button and confirm_button.is_displayed() and confirm_button.is_enabled(): self.logger.info(f"设备 {device_id} 点击确认弹窗的'是'按钮") confirm_button.click() time.sleep(1) # 验证弹窗是否消失 try: self.driver.find_element(AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']") self.logger.warning(f"设备 {device_id} 确认弹窗可能未正确关闭") return False except NoSuchElementException: self.logger.info(f"设备 {device_id} 确认弹窗已成功关闭") return True else: self.logger.error(f"设备 {device_id} 未找到可点击的'是'按钮") return False except NoSuchElementException: # 弹窗未找到,继续等待 pass except Exception as e: self.logger.warning(f"设备 {device_id} 查找确认弹窗时出现异常: {str(e)}") time.sleep(1) self.logger.error(f"设备 {device_id} 等待返回确认弹窗超时") return False except Exception as e: self.logger.error(f"设备 {device_id} 处理返回确认弹窗时发生错误: {str(e)}") return False def handle_adjustment_result_dialog(self): """处理平差结果确认弹窗""" try: self.logger.info("开始检测平差结果弹窗") # 等待弹窗出现(最多等待5秒) warning_dialog = WebDriverWait(self.driver, 5).until( EC.presence_of_element_located((AppiumBy.ID, "android:id/parentPanel")) ) # 验证弹窗内容 alert_title = warning_dialog.find_element(AppiumBy.ID, "android:id/alertTitle") alert_message = warning_dialog.find_element(AppiumBy.ID, "android:id/message") self.logger.info(f"检测到弹窗 - 标题: {alert_title.text}, 消息: {alert_message.text}") # 确认是目标弹窗 if "警告" in alert_title.text and "是否保留测量成果" in alert_message.text: self.logger.info("确认是平差结果确认弹窗") # 点击"是 保留成果"按钮 yes_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button1") if yes_button.text == "是 保留成果": yes_button.click() self.logger.info("已点击'是 保留成果'按钮") # 等待弹窗消失 WebDriverWait(self.driver, 5).until( EC.invisibility_of_element_located((AppiumBy.ID, "android:id/parentPanel")) ) self.logger.info("弹窗已关闭") return True else: self.logger.error(f"按钮文本不匹配,期望'是 保留成果',实际: {yes_button.text}") return False else: self.logger.warning("弹窗内容不匹配,不是目标弹窗") return False except TimeoutException: self.logger.info("未检测到平差结果弹窗,继续流程") return True # 没有弹窗也是正常情况 except Exception as e: self.logger.error(f"处理平差结果弹窗时出错: {str(e)}") return False def check_measurement_list(self, device_id): """ 检查是否存在测量列表 Args: device_id: 设备ID Returns: bool: 如果不存在测量列表返回True,存在返回False """ try: # 等待线路列表容器出现 self.wait.until( EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID)) ) self.logger.info("线路列表容器已找到") # 如果存在MEASURE_LIST_ID,说明有测量列表,不需要执行后续步骤 self.logger.info(f"设备 {device_id} 存在测量列表,无需执行后续返回操作") return False except TimeoutException: # 等待超时,说明没有测量列表 self.logger.info(f"设备 {device_id} 未找到测量列表,可以继续执行后续步骤") return True except Exception as e: self.logger.error(f"设备 {device_id} 检查测量列表时发生错误: {str(e)}") return True def handle_back_navigation(self, breakpoint_name, device_id): """ 完整的返回导航处理流程 Args: breakpoint_name: 断点名称 device_id: 设备ID Returns: bool: 整个返回导航流程是否成功 """ try: # time.sleep(2) self.logger.info(f"已点击平差处理按钮,检查是否在测量页面") # 检测是否存在测量列表(修正逻辑) has_measurement_list = self.check_measurement_list(device_id) if not has_measurement_list: self.logger.info(f"设备 {device_id} 存在测量列表,重新执行平差流程") # 把断点名称给find_keyword if not self.find_keyword(breakpoint_name): self.logger.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名") return False if not self.handle_measurement_dialog(): self.logger.error(f"设备 {device_id} 处理测量弹窗失败") return False if not self.check_apply_btn(): self.logger.error(f"设备 {device_id} 检查平差处理按钮失败") return False # 4. 点击平差处理按钮 if not self.click_adjustment_button(device_id): self.logger.error(f"设备 {device_id} 点击平差处理按钮失败") return False self.logger.info(f"重新选择断点并点击平差处理按钮成功") return True else: self.logger.info(f"不在测量页面,继续执行后续返回操作") return True except Exception as e: self.logger.error(f"设备 {device_id} 处理返回导航时发生错误: {str(e)}") return False def execute_back_navigation_steps(self, device_id): """ 执行实际的返回导航步骤 Args: device_id: 设备ID Returns: bool: 导航是否成功 """ try: # 1. 首先点击返回按钮 if not self.click_back_button(device_id): self.logger.error(f"设备 {device_id} 点击返回按钮失败") return False # 2. 处理返回确认弹窗 self.logger.info(f"已点击返回按钮,等待处理返回确认弹窗") if not self.handle_confirmation_dialog(device_id): self.logger.error(f"设备 {device_id} 处理返回确认弹窗失败") return False # 3. 验证是否成功返回到上一页面 time.sleep(1) # 等待页面跳转完成 # 可以添加页面验证逻辑,比如检查是否返回到预期的页面 # 这里可以根据实际应用添加特定的页面元素验证 self.logger.info(f"设备 {device_id} 返回导航流程完成") return True except Exception as e: self.logger.error(f"设备 {device_id} 执行返回导航步骤时发生错误: {str(e)}") return False def scroll_to_bottom_and_screenshot(self, device_id): """ 检测到测量结束后,下滑列表到最底端,点击最后一个spinner,再下滑一次,点击平差处理按钮后截图 Args: device_id: 设备ID Returns: bool: 操作是否成功 """ try: self.logger.info(f"设备 {device_id} 开始执行测量结束后的操作流程") time.sleep(5) # 1. 下滑列表到最底端 if not self.scroll_list_to_bottom(device_id): self.logger.error(f"设备 {device_id} 下滑列表到底部失败") return False # 2. 点击最后一个spinner if not self.click_last_spinner_with_retry(device_id): self.logger.error(f"设备 {device_id} 点击最后一个spinner失败") return False # 3. 再下滑一次 if not self.scroll_down_once(device_id): self.logger.warning(f"设备 {device_id} 再次下滑失败,但继续执行") # 4. 点击平差处理按钮 if not self.click_adjustment_button(device_id): self.logger.error(f"设备 {device_id} 点击平差处理按钮失败") return False # # 5. 在点击平差处理按钮后截图 # time.sleep(2) # 等待平差处理按钮点击后的界面变化 # if not self.take_screenshot("after_adjustment_button_click"): # self.logger.error(f"设备 {device_id} 截图失败") # return False self.logger.info(f"设备 {device_id} 测量结束后操作流程完成") return True except Exception as e: self.logger.error(f"设备 {device_id} 执行测量结束后操作时发生错误: {str(e)}") return False def scroll_list_to_bottom(self, device_id, max_swipes=60): """ 下滑列表到最底端 Args: device_id: 设备ID max_swipes: 最大下滑次数 Returns: bool: 是否滑动到底部 """ try: self.logger.info(f"设备 {device_id} 开始下滑列表到底部") # 获取列表元素 list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list") same_content_count = 0 # 初始化第一次的子元素文本 initial_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView") current_child_texts = "|".join([ elem.text.strip() for elem in initial_child_elements if elem.text and elem.text.strip() ]) for i in range(max_swipes): # 执行下滑操作 self.driver.execute_script("mobile: scrollGesture", { 'elementId': list_view.id, 'direction': 'down', 'percent': 0.8, 'duration': 500 }) time.sleep(0.5) # 获取滑动后的子元素文本 new_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView") new_child_texts = "|".join([ elem.text.strip() for elem in new_child_elements if elem.text and elem.text.strip() ]) # 判断内容是否变化:若连续3次相同,认为到达底部 if new_child_texts == current_child_texts: same_content_count += 1 if same_content_count >= 2: self.logger.info(f"设备 {device_id} 列表已滑动到底部,共滑动 {i+1} 次") return True else: same_content_count = 0 # 内容变化,重置计数 current_child_texts = new_child_texts # 更新上一次内容 self.logger.debug(f"设备 {device_id} 第 {i+1} 次下滑完成,当前子元素文本: {new_child_texts[:50]}...") # 打印部分文本 self.logger.warning(f"设备 {device_id} 达到最大下滑次数 {max_swipes},可能未完全到底部") return True except Exception as e: self.logger.error(f"设备 {device_id} 下滑列表时发生错误: {str(e)}") return False def click_last_spinner_with_retry(self, device_id, max_retries=2): """带重试机制的点击方法""" for attempt in range(max_retries): try: if self.click_last_spinner(device_id): return True self.logger.warning(f"设备 {device_id} 第{attempt + 1}次点击失败,准备重试") time.sleep(1) # 重试前等待 except Exception as e: self.logger.error(f"设备 {device_id} 第{attempt + 1}次尝试失败: {str(e)}") self.logger.error(f"设备 {device_id} 所有重试次数已用尽") return False def click_last_spinner(self, device_id): """ 点击最后一个spinner Args: device_id: 设备ID Returns: bool: 是否成功点击 """ try: self.logger.info(f"设备 {device_id} 查找最后一个spinner") # 查找所有的spinner元素 spinners = self.driver.find_elements(AppiumBy.ID, "com.bjjw.cjgc:id/spinner") if not spinners: self.logger.error(f"设备 {device_id} 未找到任何spinner元素") return False # 获取最后一个spinner last_spinner = spinners[-1] if not (last_spinner.is_displayed() and last_spinner.is_enabled()): self.logger.error(f"设备 {device_id} 最后一个spinner不可点击") return False # 点击操作 self.logger.info(f"设备 {device_id} 点击最后一个spinner") last_spinner.click() # 执行额外一次下滑操作 self.scroll_down_once(device_id) max_retries = 3 # 最大重试次数 retry_count = 0 wait_timeout = 5 # 增加等待时间到5秒 while retry_count < max_retries: try: # 确保device_id正确设置,使用全局变量作为备用 if not hasattr(self, 'device_id') or not self.device_id: # 优先使用传入的device_id,其次使用全局变量 self.device_id = device_id if device_id else global_variable.get_device_id() # 使用self.device_id,确保有默认值 actual_device_id = self.device_id if self.device_id else global_variable.get_device_id() if not check_session_valid(self.driver, actual_device_id): self.logger.warning(f"设备 {actual_device_id} 会话无效,尝试重新连接驱动...") try: # 使用正确的设备ID进行重连 new_driver, new_wait = reconnect_driver(actual_device_id, self.driver) if new_driver: self.driver = new_driver self.wait = new_wait self.logger.info(f"设备 {actual_device_id} 驱动重连成功") else: self.logger.error(f"设备 {actual_device_id} 驱动重连失败") retry_count += 1 continue except Exception as e: self.logger.error(f"设备 {actual_device_id} 驱动重连异常: {str(e)}") retry_count += 1 continue # 点击spinner(如果是重试,需要重新获取元素) if retry_count > 0: spinners = self.driver.find_elements(AppiumBy.CLASS_NAME, "android.widget.Spinner") if not spinners: self.logger.error(f"设备 {device_id} 未找到spinner元素") retry_count += 1 continue last_spinner = spinners[-1] if not (last_spinner.is_displayed() and last_spinner.is_enabled()): self.logger.error(f"设备 {device_id} spinner不可点击") retry_count += 1 continue self.logger.info(f"设备 {device_id} 重新点击spinner") last_spinner.click() # 重试时也执行下滑操作 self.scroll_down_once(device_id) # 等待下拉菜单出现,增加等待时间到5秒 wait = WebDriverWait(self.driver, wait_timeout) detail_show = wait.until( EC.presence_of_element_located((AppiumBy.ID, "com.bjjw.cjgc:id/detailshow")) ) if detail_show.is_displayed(): self.logger.info(f"设备 {device_id} spinner点击成功,下拉菜单已展开") return True else: self.logger.error(f"设备 {device_id} 下拉菜单未显示") retry_count += 1 continue except Exception as wait_error: error_msg = str(wait_error) self.logger.error(f"设备 {device_id} 等待下拉菜单超时 (第{retry_count+1}次尝试): {error_msg}") # 检查是否是连接断开相关的错误 if not check_session_valid(self.driver, self.device_id): self.logger.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...") # if any(keyword in error_msg for keyword in ['socket hang up', 'Could not proxy command']): # self.logger.warning(f"设备 {device_id} 检测到连接相关错误,尝试重连...") if not reconnect_driver(self.device_id, self.driver): self.logger.error(f"设备 {device_id} 驱动重连失败") retry_count += 1 if retry_count < max_retries: self.logger.info(f"设备 {device_id} 将在1秒后进行第{retry_count+1}次重试") time.sleep(1) # 等待1秒后重试 self.logger.error(f"设备 {device_id} 经过{max_retries}次重试后仍无法展开下拉菜单") return False except Exception as e: self.logger.error(f"设备 {device_id} 点击最后一个spinner时发生错误: {str(e)}") return False def scroll_down_once(self, device_id): """ 再次下滑一次 Args: device_id: 设备ID Returns: bool: 是否成功下滑 """ try: self.logger.info(f"设备 {device_id} 执行额外一次下滑") # 获取列表元素 list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list") # 执行下滑操作 self.driver.execute_script("mobile: scrollGesture", { 'elementId': list_view.id, 'direction': 'down', 'percent': 0.5 }) time.sleep(1) self.logger.info(f"设备 {device_id} 额外下滑完成") return True except Exception as e: self.logger.error(f"设备 {device_id} 额外下滑时发生错误: {str(e)}") return False def click_adjustment_button(self, device_id): """ 点击平差处理按钮 Args: device_id: 设备ID Returns: bool: 是否成功点击 """ try: self.logger.info(f"设备 {device_id} 查找平差处理按钮") # 查找平差处理按钮 adjustment_button = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn") # 验证按钮文本 button_text = adjustment_button.text if "平差处理" not in button_text: self.logger.warning(f"设备 {device_id} 按钮文本不匹配,期望'平差处理',实际: {button_text}") if adjustment_button.is_displayed() and adjustment_button.is_enabled(): self.logger.info(f"设备 {device_id} 点击平差处理按钮") adjustment_button.click() time.sleep(3) # 等待平差处理完成 return True else: self.logger.error(f"设备 {device_id} 平差处理按钮不可点击") return False except NoSuchElementException: self.logger.error(f"设备 {device_id} 未找到平差处理按钮") return False except Exception as e: self.logger.error(f"设备 {device_id} 点击平差处理按钮时发生错误: {str(e)}") return False def add_breakpoint_to_upload_list(self, breakpoint_name, line_num): """添加平差完成的断点到上传列表和字典""" if breakpoint_name and breakpoint_name not in global_variable.get_upload_breakpoint_list(): global_variable.get_upload_breakpoint_list().append(breakpoint_name) global_variable.get_upload_breakpoint_dict()[breakpoint_name] = { 'breakpoint_name': breakpoint_name, 'line_num': line_num } logging.info(f"成功添加断点 '{breakpoint_name}' 到上传列表") logging.info(f"断点详细信息: 线路编码={line_num}") return True else: logging.warning(f"断点名为空或已存在于列表中") return False def handle_confirmation_dialog_save(self, device_id, timeout=2): """ 处理确认弹窗,点击"是"按钮 Args: device_id: 设备ID timeout: 等待弹窗的超时时间 Returns: bool: 是否成功处理弹窗 """ # 等待弹窗出现(最多等待2秒) try: dialog_message = WebDriverWait(self.driver, timeout).until( EC.presence_of_element_located((AppiumBy.ID, "android:id/content")) ) self.logger.info(f"设备 {device_id} 检测到确认弹窗") # 查找并点击"是"按钮 confirm_button = self.driver.find_element( AppiumBy.ID, "android:id/button1" ) if confirm_button.is_displayed() and confirm_button.is_enabled(): self.logger.info(f"设备 {device_id} 点击确认弹窗的'是'按钮") confirm_button.click() time.sleep(0.5) return True else: self.logger.error(f"设备 {device_id} '是'按钮不可点击") return False except TimeoutException: # 超时未找到弹窗,认为没有弹窗,返回成功 self.logger.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功") return True def screenshot_page_manager(self, device_id): """执行截图页面管理操作""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: # 检查Appium是否运行,如果没有运行则重启 if not driver_utils.is_appium_running(4723): self.logger.warning("Appium服务器未运行,尝试重启...") if not driver_utils.restart_appium_server(4723): self.logger.error("重启Appium服务器失败") return False # 重新初始化driver if not reconnect_driver(device_id): self.logger.error("重新初始化driver失败") return False # 检查GLOBAL_UPLOAD_BREAKPOINT_DICT是否为空,如果为空则初始化一些测试数据 if not global_variable.get_upload_breakpoint_dict(): self.logger.warning("global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT为空,正在初始化测试数据") global_variable.set_upload_breakpoint_dict({'CDWZQ-2标-龙骨湾右线大桥-0-7号墩-平原': 'L156372', 'CDWZQ-2标-蓝家湾特大 桥-31-31-平原': 'L159206'}) breakpoint_names = list(global_variable.get_upload_breakpoint_dict().keys()) processed_breakpoints = [] # 开始循环处理断点 for breakpoint_name in breakpoint_names: if breakpoint_name in processed_breakpoints: continue line_code = global_variable.get_upload_breakpoint_dict()[breakpoint_name] self.logger.info(f"开始处理要平差的断点 {breakpoint_name}") # 把断点名称给find_keyword if not self.find_keyword(breakpoint_name): self.logger.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名") continue # 继续处理下一个断点 if not self.handle_measurement_dialog(): self.logger.error(f"设备 {device_id} 处理测量弹窗失败") continue if not self.check_apply_btn(): self.logger.error(f"设备 {device_id} 检查平差处理按钮失败") self.execute_back_navigation_steps(device_id) continue # 4. 点击平差处理按钮 if not self.click_adjustment_button(device_id): self.logger.error(f"设备 {device_id} 点击平差处理按钮失败") continue # 检查是否在测量页面,在就重新执行选择断点,滑动列表到底部,点击最后一个spinner, 再下滑一次,点击平差处理按钮平差 if not self.handle_back_navigation(breakpoint_name, device_id): self.logger.error(f"{breakpoint_name}平差失败,未截图") continue # 检测并处理"是 保留成果"弹窗 if not self.handle_adjustment_result_dialog(): self.logger.error("处理平差结果弹窗失败") continue # # 平差完成,将断点数据保存到上传列表中 # if not self.add_breakpoint_to_upload_list(breakpoint_name, line_code): # self.logger.error(f"设备 {device_id} 保存断点 {breakpoint_name} 到上传列表失败") # continue # # 检查是否在测量页面,在就重新执行选择断点,滑动列表到底部,点击最后一个spinner, 再下滑一次,点击平差处理按钮平差 # if not self.handle_back_navigation(breakpoint_name, device_id): # self.logger.error(f"{breakpoint_name}平差失败,未截图") # continue # # 检测并处理"是 保留成果"弹窗 # if not self.handle_adjustment_result_dialog(): # self.logger.error("处理平差结果弹窗失败") # continue # 点击返回按钮并处理弹窗 if not self.execute_back_navigation_steps(device_id): self.logger.error(f"设备 {device_id} 处理返回按钮确认失败") continue # # 成功处理完一个断点,添加到已处理列表 # processed_breakpoints.append(breakpoint_name) # self.logger.info(f"成功处理断点: {breakpoint_name}") # # 检查是否所有断点都处理完成 # if len(processed_breakpoints) == len(breakpoint_names): # self.logger.info(f"设备 {device_id} 平差页面操作执行完成") # return True # else: # self.logger.warning(f"设备 {device_id} 部分断点处理失败,已成功处理 {len(processed_breakpoints)}/{len(breakpoint_names)} 个断点") # return True self.logger.warning(f"设备 {device_id} 上传流程执行完成") return True except Exception as e: retry_count += 1 self.logger.error(f"设备 {device_id} 执行截图页面操作时出错 (重试 {retry_count}/{max_retries}): {str(e)}") # 检查是否为连接错误 if driver_utils.check_connection_error(e): self.logger.warning("检测到连接错误,尝试重启Appium服务器...") if not driver_utils.restart_appium_server(4723): self.logger.error("重启Appium服务器失败") else: self.logger.info("Appium服务器重启成功,等待重新连接...") time.sleep(10) # 重新初始化driver if not reconnect_driver(device_id): self.logger.error("重新初始化driver失败") if retry_count >= max_retries: break continue if retry_count >= max_retries: self.logger.error(f"设备 {device_id} 达到最大重试次数,停止执行") break self.logger.info(f"等待10秒后重试...") time.sleep(10) return False def run_automation_test(self): # 滑动列表到底部 if not self.scroll_list_to_bottom(self.device_id): self.logger.error(f"设备 {self.device_id} 下滑列表到底部失败") return False # 2. 点击最后一个spinner if not self.click_last_spinner_with_retry(self.device_id): self.logger.error(f"设备 {self.device_id} 点击最后一个spinner失败") return False # 3. 再下滑一次 if not self.scroll_down_once(self.device_id): self.logger.warning(f"设备 {self.device_id} 再次下滑失败,但继续执行") # 4. 点击平差处理按钮 if not self.click_adjustment_button(self.device_id): self.logger.error(f"设备 {self.device_id} 点击平差处理按钮失败") return False