import logging import time import requests import pandas as pd from io import BytesIO from datetime import datetime import os import subprocess import globals.global_variable as global_variable import globals.driver_utils as driver_utils # 导入驱动工具模块 from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException from appium.webdriver.common.appiumby import AppiumBy from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait import globals.ids as ids class CheckStation: def __init__(self, driver=None, wait=None,device_id=None): """初始化CheckStation对象""" if device_id is None: self.device_id = driver_utils.get_device_id() else: self.device_id = device_id if driver is None or wait is None: self.driver, self.wait = driver_utils.init_appium_driver(self.device_id) else: self.driver = driver self.wait = wait # if driver_utils.grant_appium_permissions(self.device_id): # logging.info(f"设备 {self.device_id} 授予Appium权限成功") # else: # logging.warning(f"设备 {self.device_id} 授予Appium权限失败") # # 确保Appium服务器正在运行,不在运行则启动 # if not driver_utils.check_server_status(4723): # driver_utils.start_appium_server() try: if not driver_utils.check_session_valid(self.driver, self.device_id): logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...") self.driver, self.wait = driver_utils.reconnect_driver(self.device_id, self.driver) if not self.driver: logging.error(f"设备 {self.device_id} 驱动重连失败") return False except Exception as inner_e: logging.warning(f"设备 {self.device_id} 检查会话状态时出错: {str(inner_e)}") return False # 检查应用是否成功启动 if driver_utils.is_app_launched(self.driver): logging.info(f"设备 {self.device_id} 沉降观测App已成功启动") else: logging.warning(f"设备 {self.device_id} 应用可能未正确启动") driver_utils.check_app_status(self.driver) # @staticmethod # def get_device_id() -> str: # """ # 获取设备ID,优先使用已连接设备,否则使用全局配置 # """ # try: # # 检查已连接设备 # result = subprocess.run( # ["adb", "devices"], # capture_output=True, # text=True, # timeout=10 # ) # # 解析设备列表 # for line in result.stdout.strip().split('\n')[1:]: # if line.strip() and "device" in line and "offline" not in line: # device_id = line.split('\t')[0] # logging.info(f"使用已连接设备: {device_id}") # global_variable.GLOBAL_DEVICE_ID = device_id # return device_id # except Exception as e: # logging.warning(f"设备检测失败: {e}") # # 使用全局配置 # device_id = global_variable.GLOBAL_DEVICE_ID # logging.info(f"使用全局配置设备: {device_id}") # return device_id def get_measure_data(self): # 模拟获取测量数据 pass # def add_transition_point(self): # # 添加转点逻辑 # print("添加转点") # return True def add_transition_point(self): """添加转点""" try: if not driver_utils.check_session_valid(self.driver, self.device_id): logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...") self.driver, self.wait = driver_utils.reconnect_driver(self.device_id, self.driver) if not self.driver: logging.error(f"设备 {self.device_id} 驱动重连失败") return False except Exception as inner_e: logging.warning(f"设备 {self.device_id} 检查会话状态时出错: {str(inner_e)}") return False try: # 查找并点击添加转点按钮 add_transition_btn = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/btn_add_ZPoint")) ) add_transition_btn.click() logging.info("已点击添加转点按钮") return True except TimeoutException: logging.error("等待添加转点按钮超时") return False except Exception as e: logging.error(f"添加转点时出错: {str(e)}") return False def get_excel_from_url(self, url): """ 从URL获取Excel文件并解析为字典 Excel只有一列数据(A列),每行是站点值 Args: url: Excel文件的URL地址 Returns: dict: 解析后的站点数据字典 {行号: 值},失败返回None """ try: print(f"正在从URL获取数据: {url}") response = requests.get(url, timeout=30) response.raise_for_status() # 检查请求是否成功 # 使用pandas读取Excel数据,指定没有表头,只读第一个sheet excel_data = pd.read_excel( BytesIO(response.content), header=None, # 没有表头 sheet_name=0, # 只读取第一个sheet dtype=str # 全部作为字符串读取 ) station_dict = {} # 解析Excel数据:使用行号+1作为站点编号,A列的值作为站点值 print("解析Excel数据(使用行号作为站点编号)...") for index, row in excel_data.iterrows(): station_num = index + 1 # 行号从1开始作为站点编号 station_value = str(row[0]).strip() if pd.notna(row[0]) else "" if station_value: # 只保存非空值 station_dict[station_num] = station_value print(f"成功解析Excel,共{len(station_dict)}条数据") return station_dict except requests.exceptions.RequestException as e: print(f"请求URL失败: {e}") return None except Exception as e: print(f"解析Excel失败: {e}") return None def check_station_exists(self, station_data: dict, station_num: int) -> str: """ 根据站点编号检查该站点的值是否以Z开头 Args: station_data: 站点数据字典 {编号: 值} station_num: 要检查的站点编号 Returns: str: 如果站点存在且以Z开头返回"add",否则返回"pass" """ if station_num not in station_data: print(f"站点{station_num}不存在") return "error" value = station_data[station_num] str_value = str(value).strip() is_z = str_value.upper().startswith('Z') result = "add" if is_z else "pass" print(f"站点{station_num}: {value} -> {result}") return result def main_run(self): if not self.add_transition_point(): logging.error("添加转点失败") return False if not self.take_screenshot(): logging.error("截图失败") return False logging.info("检查站点成功") return True def run(self): # last_station_num = 0 url = f"https://database.yuxindazhineng.com/team-bucket/69378c5b4f42d83d9504560d/前测点表/20260309/CDWZQ-2标-龙家沟左线大桥-0-11号墩-平原.xlsx" station_data = self.get_excel_from_url(url) print(station_data) station_quantity = len(station_data) #总站点数量 over_station_num = 0 #已完成的站点数量 over_station_list = [] #已完成的站点列表 while over_station_num < station_quantity: try: # 键盘输出线路编号 station_num_input = input("请输入线路编号:") if not station_num_input.isdigit(): # 检查输入是否为数字 print("输入错误:请输入一个整数") continue station_num = int(station_num_input) # 转为整数 if station_num in over_station_list: print("已处理该站点,跳过") continue # if last_station_num == station_num: # print("输入与上次相同,跳过处理") # continue # last_station_num = station_num result = self.check_station_exists(station_data, station_num) if result == "error": print("处理错误:站点不存在") # 错误处理逻辑,比如记录日志、发送警报等 elif result == "add": print("执行添加操作") # 添加转点 if not self.add_transition_point(): print("添加转点失败") # 可以决定是否继续循环 continue over_station_num += 1 else: # result == "pass" print("跳过处理") over_station_num += 1 over_station_list.append(station_num) # 可以添加适当的延时,避免CPU占用过高 # time.sleep(1) except KeyboardInterrupt: print("程序被用户中断") break except Exception as e: print(f"发生错误: {e}") time.sleep(20) # 错误处理,可以继续循环或退出 print(f"已处理{over_station_num}个站点") # 滑动列表到底部 if not self.scroll_list_to_bottom(self.device_id): logging.error(f"设备 {self.device_id} 下滑列表到底部失败") return False # 2. 点击最后一个spinner if not self.click_last_spinner_with_retry(self.device_id): logging.error(f"设备 {self.device_id} 点击最后一个spinner失败") return False # 3. 再下滑一次 if not self.scroll_down_once(self.device_id): logging.warning(f"设备 {self.device_id} 再次下滑失败,但继续执行") # # 截图 # self.driver.save_screenshot("check_station.png") if not self.take_screenshot(): logging.error(f"设备 {self.device_id} 截图失败") # 打完数据,截图完毕,点击平差处理按钮 if not self.click_adjustment_button(self.device_id): logging.error(f"设备 {self.device_id} 点击平差处理按钮失败") return False item = global_variable.GLOBAL_CURRENT_PROJECT_NAME if item.endswith('-平原'): item = item[:-3] # 去掉最后3个字符"-平原" # 检查是否在测量页面,在就重新执行选择断点,滑动列表到底部,点击最后一个spinner, 再下滑一次,点击平差处理按钮平差 if not self.handle_back_navigation(item, self.device_id): logging.error(f"{item}平差失败") # 检测并处理"是 保留成果"弹窗 if not self.handle_adjustment_result_dialog(): logging.error("处理平差结果弹窗失败") # 检查在不在测量列表页面,不在就点击返回按钮并处理弹窗 if self.check_measurement_list(self.device_id): logging.error(f"设备 {self.device_id} 未在测量列表页面") # 点击返回按钮并处理弹窗 if not self.execute_back_navigation_steps(self.device_id): logging.error(f"设备 {self.device_id} 处理返回按钮确认失败") # # 点击返回按钮并处理弹窗 # if not self.execute_back_navigation_steps(self.device_id): # logging.error(f"设备 {self.device_id} 处理返回按钮确认失败") return True def click_adjustment_button(self, device_id): """ 点击平差处理按钮 Args: device_id: 设备ID Returns: bool: 是否成功点击 """ try: logging.info(f"设备 {device_id} 查找平差处理按钮") # 查找平差处理按钮 adjustment_button = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn") # 验证按钮文本 button_text = adjustment_button.text if "平差处理" not in button_text: logging.warning(f"设备 {device_id} 按钮文本不匹配,期望'平差处理',实际: {button_text}") if adjustment_button.is_displayed() and adjustment_button.is_enabled(): logging.info(f"设备 {device_id} 点击平差处理按钮") adjustment_button.click() time.sleep(3) # 等待平差处理完成 return True else: logging.error(f"设备 {device_id} 平差处理按钮不可点击") return False except NoSuchElementException: logging.error(f"设备 {device_id} 未找到平差处理按钮") return False except Exception as e: logging.error(f"设备 {device_id} 点击平差处理按钮时发生错误: {str(e)}") return False def handle_back_navigation(self, breakpoint_name, device_id): """ 完整的返回导航处理流程 Args: breakpoint_name: 断点名称 device_id: 设备ID Returns: bool: 整个返回导航流程是否成功 """ try: # time.sleep(2) logging.info(f"已点击平差处理按钮,检查是否在测量页面") # 检测是否存在测量列表 has_measurement_list = self.check_measurement_list(device_id) if not has_measurement_list: logging.info(f"设备 {device_id} 存在测量列表,重新执行平差流程") # 把断点名称给find_keyword if not self.find_keyword(breakpoint_name): logging.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名") return False if not self.handle_measurement_dialog(): logging.error(f"设备 {device_id} 处理测量弹窗失败") return False if not self.check_apply_btn(): logging.error(f"设备 {device_id} 检查平差处理按钮失败") return False # 4. 点击平差处理按钮 if not self.click_adjustment_button(device_id): logging.error(f"设备 {device_id} 点击平差处理按钮失败") return False logging.info(f"重新选择断点并点击平差处理按钮成功") return True else: logging.info(f"不在测量页面,继续执行后续返回操作") return True except Exception as e: logging.error(f"设备 {device_id} 处理返回导航时发生错误: {str(e)}") return False def handle_adjustment_result_dialog(self): """处理平差结果确认弹窗""" try: logging.info("开始检测平差结果弹窗") # 等待弹窗出现(最多等待5秒) warning_dialog = WebDriverWait(self.driver, 5).until( EC.presence_of_element_located((AppiumBy.ID, "android:id/parentPanel")) ) # 验证弹窗内容 alert_title = warning_dialog.find_element(AppiumBy.ID, "android:id/alertTitle") alert_message = warning_dialog.find_element(AppiumBy.ID, "android:id/message") logging.info(f"检测到弹窗 - 标题: {alert_title.text}, 消息: {alert_message.text}") # 确认是目标弹窗 if "警告" in alert_title.text and "是否保留测量成果" in alert_message.text: logging.info("确认是平差结果确认弹窗") # 点击"是 保留成果"按钮 yes_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button1") if yes_button.text == "是 保留成果": yes_button.click() logging.info("已点击'是 保留成果'按钮") # 等待弹窗消失 WebDriverWait(self.driver, 5).until( EC.invisibility_of_element_located((AppiumBy.ID, "android:id/parentPanel")) ) logging.info("弹窗已关闭") return True else: logging.error(f"按钮文本不匹配,期望'是 保留成果',实际: {yes_button.text}") return False else: logging.warning("弹窗内容不匹配,不是目标弹窗") return False except TimeoutException: logging.info("未检测到平差结果弹窗,继续流程") return True # 没有弹窗也是正常情况 except Exception as e: logging.error(f"处理平差结果弹窗时出错: {str(e)}") return False def check_measurement_list(self, device_id): """ 检查是否存在测量列表 Args: device_id: 设备ID Returns: bool: 如果不存在测量列表返回True,存在返回False """ try: # 等待线路列表容器出现 self.wait.until( EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID)) ) logging.info("线路列表容器已找到") # 如果存在MEASURE_LIST_ID,说明有测量列表,不需要执行后续步骤 logging.info(f"设备 {device_id} 存在测量列表,无需执行后续返回操作") return False except TimeoutException: # 等待超时,说明没有测量列表 logging.info(f"设备 {device_id} 未找到测量列表,可以继续执行后续步骤") return True except Exception as e: logging.error(f"设备 {device_id} 检查测量列表时发生错误: {str(e)}") return True def find_keyword(self, fixed_filename): """查找指定关键词并点击,支持向下和向上滑动查找""" try: # if not check_session_valid(self.driver, self.device_id): # logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...") # if not reconnect_driver(self.device_id, self.driver): # logging.error(f"设备 {self.device_id} 驱动重连失败") # 等待线路列表容器出现 self.wait.until( EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID)) ) logging.info("线路列表容器已找到") max_scroll_attempts = 100 # 最大滚动尝试次数 scroll_count = 0 previous_items = set() # 记录前一次获取的项目集合,用于检测是否到达边界 # 首先尝试向下滑动查找 while scroll_count < max_scroll_attempts: # 获取当前页面中的所有项目 current_items = self.get_current_items() logging.info(f"当前页面找到 {len(current_items)} 个项目: {current_items}") # 检查目标文件是否在当前页面中 if fixed_filename in current_items: logging.info(f"找到目标文件: {fixed_filename}") # 点击目标文件 if self.click_item_by_text(fixed_filename): return True else: logging.error(f"点击目标文件失败: {fixed_filename}") return False # 检查是否到达底部:连续两次获取的项目相同 if current_items == previous_items and len(current_items) > 0: logging.info("连续两次获取的项目相同,已到达列表底部") break # 更新前一次项目集合 previous_items = current_items.copy() # 向下滑动列表以加载更多项目 if not self.scroll_list(direction="down"): logging.error("向下滑动列表失败") return False scroll_count += 1 logging.info(f"第 {scroll_count} 次向下滑动,继续查找...") # 如果向下滑动未找到,尝试向上滑动查找 logging.info("向下滑动未找到目标,开始向上滑动查找") # 重置滚动计数 scroll_count = 0 while scroll_count < max_scroll_attempts: # 向上滑动列表 # 如果返回False,说明已经滑动到顶 if not self.scroll_list(direction="up"): # 检查是否是因为滑动到顶而返回False if "已滑动到列表顶部" in logging.handlers[0].buffer[-1].message: logging.info("已滑动到列表顶部,停止向上滑动") break else: logging.error("向上滑动列表失败") return False # 获取当前页面中的所有项目 current_items = self.get_current_items() # logging.info(f"向上滑动后找到 {len(current_items)} 个项目: {current_items}") # 检查目标文件是否在当前页面中 if fixed_filename in current_items: logging.info(f"找到目标文件: {fixed_filename}") # 点击目标文件 if self.click_item_by_text(fixed_filename): return True else: logging.error(f"点击目标文件失败: {fixed_filename}") return False scroll_count += 1 logging.info(f"第 {scroll_count} 次向上滑动,继续查找...") logging.warning(f"经过 {max_scroll_attempts * 2} 次滑动仍未找到目标文件") return False except TimeoutException: logging.error("等待线路列表元素超时") return False except Exception as e: logging.error(f"查找关键词时出错: {str(e)}") return False def get_current_items(self): """获取当前页面中的所有项目文本""" try: items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID) item_texts = [] for item in items: try: title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID) if title_element and title_element.text: item_texts.append(title_element.text) except NoSuchElementException: continue return item_texts except Exception as e: logging.error(f"获取当前项目失败: {str(e)}") return [] def click_item_by_text(self, text): """点击指定文本的项目""" try: # 查找包含指定文本的项目 items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID) for item in items: try: title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID) if title_element and title_element.text == text: title_element.click() logging.info(f"已点击项目: {text}") return True except NoSuchElementException: continue logging.warning(f"未找到可点击的项目: {text}") return False except Exception as e: logging.error(f"点击项目失败: {str(e)}") return False def handle_measurement_dialog(self): """处理测量弹窗 - 选择继续测量""" try: logging.info("检查测量弹窗...") # 直接尝试点击"继续测量"按钮 continue_btn = WebDriverWait(self.driver, 2).until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/measure_continue_btn")) ) continue_btn.click() logging.info("已点击'继续测量'按钮") return True except TimeoutException: logging.info("未找到继续测量按钮,可能没有弹窗") return True # 没有弹窗也认为是成功的 except Exception as e: logging.error(f"点击继续测量按钮时出错: {str(e)}") return False # 检查有没有平差处理按钮 def check_apply_btn(self): """检查是否有平差处理按钮""" try: apply_btn = WebDriverWait(self.driver, 1).until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn")) ) if apply_btn.is_displayed(): logging.info("进入平差页面") else: logging.info("没有找到'平差处理'按钮") return True except TimeoutException: logging.info("未找到平差处理按钮") return False # 没有弹窗也认为是成功的 except Exception as e: logging.error(f"点击平差处理按钮时出错: {str(e)}") return False def execute_back_navigation_steps(self, device_id): """ 执行实际的返回导航步骤 Args: device_id: 设备ID Returns: bool: 导航是否成功 """ try: # 1. 首先点击返回按钮 if not self.click_back_button(device_id): logging.error(f"设备 {device_id} 点击返回按钮失败") return False # 2. 处理返回确认弹窗 logging.info(f"已点击返回按钮,等待处理返回确认弹窗") if not self.handle_confirmation_dialog(device_id): logging.error(f"设备 {device_id} 处理返回确认弹窗失败") # return False # 3. 验证是否成功返回到上一页面 time.sleep(0.5) # 等待页面跳转完成 # 可以添加页面验证逻辑,比如检查是否返回到预期的页面 # 这里可以根据实际应用添加特定的页面元素验证 logging.info(f"设备 {device_id} 返回导航流程完成") return True except Exception as e: logging.error(f"设备 {device_id} 执行返回导航步骤时发生错误: {str(e)}") return False def click_back_button(self, device_id): """点击手机系统返回按钮""" try: self.driver.back() logging.info("已点击手机系统返回按钮") return True except Exception as e: logging.error(f"点击手机系统返回按钮失败: {str(e)}") return False def handle_confirmation_dialog(self, device_id, timeout=2): """ 处理确认弹窗,点击"是"按钮 Args: device_id: 设备ID timeout: 等待弹窗的超时时间 Returns: bool: 是否成功处理弹窗 """ # 等待弹窗出现(最多等待2秒) try: max_attempts = 2 for attempt in range(max_attempts): try: dialog_message = WebDriverWait(self.driver, timeout).until( EC.presence_of_element_located((AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']")) ) logging.info(f"设备 {device_id} 检测到确认弹窗 (第 {attempt + 1} 次)") # 查找并点击"是"按钮 confirm_button = self.driver.find_element( AppiumBy.XPATH, "//android.widget.Button[@text='是' and @resource-id='android:id/button1']" ) if confirm_button.is_displayed() and confirm_button.is_enabled(): logging.info(f"设备 {device_id} 点击确认弹窗的'是'按钮 (第 {attempt + 1} 次)") confirm_button.click() time.sleep(0.5) # 如果是第一次尝试,继续检查是否还有弹窗 if attempt < max_attempts - 1: logging.info(f"设备 {device_id} 等待 1 秒后检查是否还有弹窗") time.sleep(0.5) continue return True else: logging.error(f"设备 {device_id} '是'按钮不可点击") return False except TimeoutException: # 超时未找到弹窗,认为没有弹窗,返回成功 logging.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功") return True except Exception as e: logging.error(f"设备 {device_id} 处理确认弹窗时出错: {str(e)}") return False def scroll_list_to_bottom(self, device_id, max_swipes=60): """ 下滑列表到最底端 Args: device_id: 设备ID max_swipes: 最大下滑次数 Returns: bool: 是否滑动到底部 """ try: logging.info(f"设备 {device_id} 开始下滑列表到底部") # 获取列表元素 list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list") logging.info(f"时间戳1: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}") same_content_count = 0 # 初始化第一次的子元素文本 initial_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView") logging.info(f"时间戳2: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}") # current_child_texts = "|".join([ # elem.text.strip() for elem in initial_child_elements # if elem.text and elem.text.strip() # ]) current_child_texts = "|".join( elem_text.strip() for elem in initial_child_elements if (elem_text := elem.text) and elem_text.strip() ) logging.info(f"时间戳3: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}") for i in range(max_swipes): # 执行下滑操作 self.driver.execute_script("mobile: scrollGesture", { 'elementId': list_view.id, 'direction': 'down', 'percent': 0.8, 'duration': 500 }) # 获取滑动后的子元素文本 new_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView") new_child_texts = "|".join( elem_text.strip() for elem in new_child_elements if (elem_text := elem.text) and elem_text.strip() ) # 判断内容是否变化:若连续3次相同,认为到达底部 if new_child_texts == current_child_texts: same_content_count += 1 if same_content_count >= 2: logging.info(f"设备 {device_id} 列表已滑动到底部,共滑动 {i+1} 次") return True else: same_content_count = 0 # 内容变化,重置计数 current_child_texts = new_child_texts # 更新上一次内容 logging.debug(f"设备 {device_id} 第 {i+1} 次下滑完成,当前子元素文本: {new_child_texts[:50]}...") # 打印部分文本 logging.warning(f"设备 {device_id} 达到最大下滑次数 {max_swipes},可能未完全到底部") return True except Exception as e: logging.error(f"设备 {device_id} 下滑列表时发生错误: {str(e)}") return False def click_last_spinner_with_retry(self, device_id, max_retries=2): """带重试机制的点击方法""" for attempt in range(max_retries): try: if self.click_last_spinner(device_id): return True logging.warning(f"设备 {device_id} 第{attempt + 1}次点击失败,准备重试") time.sleep(0.5) # 重试前等待 except Exception as e: logging.error(f"设备 {device_id} 第{attempt + 1}次尝试失败: {str(e)}") logging.error(f"设备 {device_id} 所有重试次数已用尽") return False def click_last_spinner(self, device_id): """ 点击最后一个spinner Args: device_id: 设备ID Returns: bool: 是否成功点击 """ try: logging.info(f"设备 {device_id} 查找最后一个spinner") # 查找所有的spinner元素 spinners = self.driver.find_elements(AppiumBy.ID, "com.bjjw.cjgc:id/spinner") if not spinners: logging.error(f"设备 {device_id} 未找到任何spinner元素") return False # 获取最后一个spinner last_spinner = spinners[-1] if not (last_spinner.is_displayed() and last_spinner.is_enabled()): logging.error(f"设备 {device_id} 最后一个spinner不可点击") return False # 点击操作 logging.info(f"设备 {device_id} 点击最后一个spinner") last_spinner.click() # 执行额外一次下滑操作 self.scroll_down_once(device_id) max_retries = 3 # 最大重试次数 retry_count = 0 wait_timeout = 5 # 增加等待时间到5秒 while retry_count < max_retries: try: # 确保device_id正确设置,使用全局变量作为备用 if not hasattr(self, 'device_id') or not self.device_id: # 优先使用传入的device_id,其次使用全局变量 self.device_id = device_id if device_id else global_variable.get_device_id() # 使用self.device_id,确保有默认值 actual_device_id = self.device_id if self.device_id else global_variable.get_device_id() if not driver_utils.check_session_valid(self.driver, actual_device_id): logging.warning(f"设备 {actual_device_id} 会话无效,尝试重新连接驱动...") try: # 使用正确的设备ID进行重连 new_driver, new_wait = reconnect_driver(actual_device_id, self.driver) if new_driver: self.driver = new_driver self.wait = new_wait logging.info(f"设备 {actual_device_id} 驱动重连成功") else: logging.error(f"设备 {actual_device_id} 驱动重连失败") retry_count += 1 continue except Exception as e: logging.error(f"设备 {actual_device_id} 驱动重连异常: {str(e)}") retry_count += 1 continue # 点击spinner(如果是重试,需要重新获取元素) if retry_count > 0: spinners = self.driver.find_elements(AppiumBy.CLASS_NAME, "android.widget.Spinner") if not spinners: logging.error(f"设备 {device_id} 未找到spinner元素") retry_count += 1 continue last_spinner = spinners[-1] if not (last_spinner.is_displayed() and last_spinner.is_enabled()): logging.error(f"设备 {device_id} spinner不可点击") retry_count += 1 continue logging.info(f"设备 {device_id} 重新点击spinner") last_spinner.click() # 重试时也执行下滑操作 self.scroll_down_once(device_id) # 等待下拉菜单出现,增加等待时间到5秒 wait = WebDriverWait(self.driver, wait_timeout) detail_show = wait.until( EC.presence_of_element_located((AppiumBy.ID, "com.bjjw.cjgc:id/detailshow")) ) if detail_show.is_displayed(): logging.info(f"设备 {device_id} spinner点击成功,下拉菜单已展开") return True else: logging.error(f"设备 {device_id} 下拉菜单未显示") retry_count += 1 continue except Exception as wait_error: error_msg = str(wait_error) logging.error(f"设备 {device_id} 等待下拉菜单超时 (第{retry_count+1}次尝试): {error_msg}") # 检查是否是连接断开相关的错误 if not driver_utils.check_session_valid(self.driver, self.device_id): logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...") # if any(keyword in error_msg for keyword in ['socket hang up', 'Could not proxy command']): # logging.warning(f"设备 {device_id} 检测到连接相关错误,尝试重连...") if not reconnect_driver(self.device_id, self.driver): logging.error(f"设备 {device_id} 驱动重连失败") retry_count += 1 if retry_count < max_retries: logging.info(f"设备 {device_id} 将在1秒后进行第{retry_count+1}次重试") time.sleep(1) # 等待1秒后重试 logging.error(f"设备 {device_id} 经过{max_retries}次重试后仍无法展开下拉菜单") return False except Exception as e: logging.error(f"设备 {device_id} 点击最后一个spinner时发生错误: {str(e)}") return False def scroll_down_once(self, device_id): """ 再次下滑一次 Args: device_id: 设备ID Returns: bool: 是否成功下滑 """ try: logging.info(f"设备 {device_id} 执行额外一次下滑") # 获取列表元素 list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list") # 执行下滑操作 self.driver.execute_script("mobile: scrollGesture", { 'elementId': list_view.id, 'direction': 'down', 'percent': 0.5 }) time.sleep(0.2) logging.info(f"设备 {device_id} 额外下滑完成") return True except Exception as e: logging.error(f"设备 {device_id} 额外下滑时发生错误: {str(e)}") return False def take_screenshot(self): """ 通过Appium驱动截取设备屏幕 参数: filename_prefix: 断点名称 返回: bool: 操作是否成功 """ try: # 获取项目名称 project_name = global_variable.GLOBAL_USERNAME or "用户名" filename_prefix = global_variable.GLOBAL_CURRENT_PROJECT_NAME or "平差页面截图" # 获取当前日期 date_str = datetime.now().strftime("%Y%m%d") # if not date_str: # date_str = datetime.now().strftime("%Y%m%d") # 获取当前时间(如果没有提供),并确保格式合法(不含冒号) time_str = datetime.now().strftime("%H%M%S") # 创建D盘下的截图目录结构:D:\uploadInfo\picture\项目名\年月日 screenshots_dir = os.path.join("D:\\", "uploadInfo", "picture", project_name, date_str) # 确保目录存在 try: os.makedirs(screenshots_dir, exist_ok=True) logging.info(f"截图目录: {screenshots_dir}") except Exception as dir_error: logging.error(f"创建截图目录失败: {str(dir_error)}") return False line_code = global_variable.GLOBAL_LINE_NUM if not line_code: logging.error(f"未找到与断点名称 {filename_prefix} 对应的线路编码") line_code = "unknown" # 截图保存 screenshot_file = os.path.join( screenshots_dir, f"{line_code}_{filename_prefix}_{time_str}.png" ) # 尝试保存截图 try: success = self.driver.save_screenshot(screenshot_file) if success: logging.info(f"截图已保存: {screenshot_file}") # 验证文件是否真的存在 if os.path.exists(screenshot_file): logging.info(f"截图文件验证存在: {screenshot_file}") else: logging.warning(f"截图文件保存成功但验证不存在: {screenshot_file}") return True else: logging.error(f"Appium截图保存失败: {screenshot_file}") return False except Exception as save_error: logging.error(f"保存截图时发生错误: {str(save_error)}") return False except Exception as e: logging.error(f"截图时发生错误: {str(e)}") return False def get_excel_from_url(url): """ 从URL获取Excel文件并解析为字典 Excel只有一列数据(A列),每行是站点值 Args: url: Excel文件的URL地址 Returns: dict: 解析后的站点数据字典 {行号: 值},失败返回None """ try: print(f"正在从URL获取数据: {url}") response = requests.get(url, timeout=30) response.raise_for_status() # 检查请求是否成功 # 使用pandas读取Excel数据,指定没有表头,只读第一个sheet excel_data = pd.read_excel( BytesIO(response.content), header=None, # 没有表头 sheet_name=0, # 只读取第一个sheet dtype=str # 全部作为字符串读取 ) station_dict = {} # 解析Excel数据:使用行号+1作为站点编号,A列的值作为站点值 print("解析Excel数据(使用行号作为站点编号)...") for index, row in excel_data.iterrows(): station_num = index + 1 # 行号从1开始作为站点编号 station_value = str(row[0]).strip() if pd.notna(row[0]) else "" if station_value: # 只保存非空值 station_dict[station_num] = station_value print(f"成功解析Excel,共{len(station_dict)}条数据") return station_dict except requests.exceptions.RequestException as e: print(f"请求URL失败: {e}") return None except Exception as e: print(f"解析Excel失败: {e}") return None if __name__ == "__main__": check_station = CheckStation() check_station.run()