first commit

This commit is contained in:
2026-03-12 17:03:56 +08:00
commit aa4d4c7d7c
48 changed files with 10958 additions and 0 deletions

View File

@@ -0,0 +1,403 @@
# 测量标签栏页面操作
# page_objects/measure_tabbar_page.py
from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
import logging
import time
import globals.ids as ids
import globals.global_variable as global_variable # 导入全局变量模块
from globals.driver_utils import check_session_valid, reconnect_driver, go_main_click_tabber_button # 导入会话检查和重连函数
# import globals.driver_utils as driver_utils # 导入全局变量模块
class MeasureTabbarPage:
def __init__(self, driver, wait,device_id):
self.driver = driver
self.wait = wait
self.logger = logging.getLogger(__name__)
self.seen_items = set() # 记录已经看到的项目,用于检测是否滚动到底部
self.all_items = set() # 记录所有看到的项目,用于检测是否已经查看过所有项目
# 获取设备ID用于重连操作
self.device_id = device_id
def is_measure_tabbar_visible(self):
"""文件列表是否可见"""
try:
return self.driver.find_element(AppiumBy.ID, ids.MEASURE_LIST_ID).is_displayed()
except NoSuchElementException:
self.logger.warning("文件列表未找到")
return False
except Exception as e:
self.logger.error(f"文件列表可见性时发生意外错误: {str(e)}")
return False
def click_measure_tabbar(self):
"""点击测量标签栏"""
try:
measure_tab = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, ids.MEASURE_TABBAR_ID))
)
measure_tab.click()
self.logger.info("已点击测量标签栏")
time.sleep(1)
# 等待测量页面加载完成
self.wait.until(
lambda driver: self.is_measure_tabbar_visible()
)
return True
except TimeoutException:
self.logger.error("等待测量标签栏可点击超时")
return False
except Exception as e:
self.logger.error(f"点击测量标签栏时出错: {str(e)}")
return False
# def scroll_list(self):
# """滑动列表以加载更多项目"""
# try:
# # 获取列表容器
# list_container = self.driver.find_element(AppiumBy.ID, ids.MEASURE_LIST_ID)
# # 计算滑动坐标
# start_x = list_container.location['x'] + list_container.size['width'] // 2
# start_y = list_container.location['y'] + list_container.size['height'] * 0.8
# end_y = list_container.location['y'] + list_container.size['height'] * 0.2
# # 执行滑动
# self.driver.swipe(start_x, start_y, start_x, end_y, 1000)
# self.logger.info("已滑动列表")
# # 等待新内容加载
# time.sleep(2)
# return True
# except Exception as e:
# self.logger.error(f"滑动列表失败: {str(e)}")
# return False
def scroll_list(self, direction="down"):
"""滑动列表以加载更多项目
Args:
direction: 滑动方向,"down"表示向下滑动,"up"表示向上滑动
Returns:
bool: 滑动是否成功执行对于向上滑动如果滑动到顶则返回False
"""
max_retry = 2
retry_count = 0
while retry_count <= max_retry:
# 检查会话是否有效
if not check_session_valid(self.driver, self.device_id):
self.logger.warning(f"会话无效,尝试重新连接... (尝试 {retry_count + 1}/{max_retry})")
try:
# 尝试重新连接驱动
self.driver, self.wait = reconnect_driver(self.device_id, self.driver)
self.logger.info("驱动重新连接成功")
except Exception as reconnect_error:
self.logger.error(f"驱动重新连接失败: {str(reconnect_error)}")
retry_count += 1
if retry_count > max_retry:
self.logger.error("达到最大重试次数,滑动列表失败")
return False
time.sleep(1)
continue
try:
# 获取列表容器
list_container = self.driver.find_element(AppiumBy.ID, ids.MEASURE_LIST_ID)
# 计算滑动坐标
start_x = list_container.location['x'] + list_container.size['width'] // 2
if direction == "down":
# 向下滑动
start_y = list_container.location['y'] + list_container.size['height'] * 0.95
end_y = list_container.location['y'] + list_container.size['height'] * 0.05
else:
# 向上滑动
# 记录滑动前的项目,用于判断是否滑动到顶
before_scroll_items = self.get_current_items()
start_y = list_container.location['y'] + list_container.size['height'] * 0.05
end_y = list_container.location['y'] + list_container.size['height'] * 0.95
# 执行滑动
self.driver.swipe(start_x, start_y, start_x, end_y, 1000)
# 等待新内容加载
time.sleep(1)
# 向上滑动时,检查是否滑动到顶
if direction == "up":
after_scroll_items = self.get_current_items()
# 如果滑动后的项目与滑动前的项目相同,说明已经滑动到顶
if after_scroll_items == before_scroll_items:
self.logger.info("已滑动到列表顶部,列表内容不变")
return False
return True
except Exception as e:
error_msg = str(e)
self.logger.error(f"滑动列表失败: {error_msg}")
# 如果是连接相关的错误,尝试重连
if any(keyword in error_msg.lower() for keyword in ['socket hang up', 'could not proxy command', 'session']):
retry_count += 1
if retry_count > max_retry:
self.logger.error("达到最大重试次数,滑动列表失败")
return False
self.logger.warning(f"尝试重新连接后重试... (尝试 {retry_count}/{max_retry})")
time.sleep(1)
else:
# 非连接错误直接返回False
return False
def get_current_items(self):
"""获取当前页面中的所有项目文本"""
max_retry = 2
retry_count = 0
while retry_count <= max_retry:
# 检查会话是否有效
if not check_session_valid(self.driver, self.device_id):
self.logger.warning(f"会话无效,尝试重新连接... (尝试 {retry_count + 1}/{max_retry})")
try:
# 尝试重新连接驱动
self.driver, self.wait = reconnect_driver(self.device_id, self.driver)
self.logger.info("驱动重新连接成功")
except Exception as reconnect_error:
self.logger.error(f"驱动重新连接失败: {str(reconnect_error)}")
retry_count += 1
if retry_count > max_retry:
self.logger.error("达到最大重试次数,获取当前项目失败")
return []
time.sleep(0.1) # 等待1秒后重试
continue
try:
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
item_texts = []
for item in items:
try:
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
if title_element and title_element.text:
item_texts.append(title_element.text)
except NoSuchElementException:
continue
except Exception as item_error:
self.logger.warning(f"处理项目时出错: {str(item_error)}")
continue
return item_texts
except Exception as e:
error_msg = str(e)
self.logger.error(f"获取当前项目失败: {error_msg}")
# 如果是连接相关的错误,尝试重连
if any(keyword in error_msg.lower() for keyword in ['socket hang up', 'could not proxy command', 'session']):
retry_count += 1
if retry_count > max_retry:
self.logger.error("达到最大重试次数,获取当前项目失败")
return []
self.logger.warning(f"尝试重新连接后重试... (尝试 {retry_count}/{max_retry})")
time.sleep(1)
else:
# 非连接错误,直接返回空列表
return []
def click_item_by_text(self, text):
"""点击指定文本的项目"""
max_retry = 2
retry_count = 0
while retry_count <= max_retry:
# 检查会话是否有效
if not check_session_valid(self.driver, self.device_id):
self.logger.warning(f"会话无效,尝试重新连接... (尝试 {retry_count + 1}/{max_retry})")
try:
# 尝试重新连接驱动
self.driver, self.wait = reconnect_driver(self.device_id, self.driver)
self.logger.info("驱动重新连接成功")
except Exception as reconnect_error:
self.logger.error(f"驱动重新连接失败: {str(reconnect_error)}")
retry_count += 1
if retry_count > max_retry:
self.logger.error("达到最大重试次数,点击项目失败")
return False
time.sleep(1)
continue
try:
# 查找包含指定文本的项目
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
for item in items:
try:
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
if title_element and title_element.text == text:
title_element.click()
self.logger.info(f"已点击项目: {text}")
return True
except NoSuchElementException:
continue
except Exception as item_error:
self.logger.warning(f"处理项目时出错: {str(item_error)}")
continue
self.logger.warning(f"未找到可点击的项目: {text}")
return False
except Exception as e:
error_msg = str(e)
self.logger.error(f"点击项目失败: {error_msg}")
# 如果是连接相关的错误,尝试重连
if any(keyword in error_msg.lower() for keyword in ['socket hang up', 'could not proxy command', 'session']):
retry_count += 1
if retry_count > max_retry:
self.logger.error("达到最大重试次数,点击项目失败")
return False
self.logger.warning(f"尝试重新连接后重试... (尝试 {retry_count}/{max_retry})")
time.sleep(1)
else:
# 非连接错误直接返回False
return False
def find_keyword(self, fixed_filename):
"""查找指定关键词并点击,支持向下和向上滑动查找"""
try:
# 等待线路列表容器出现
self.wait.until(
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
)
# self.logger.info("线路列表容器已找到")
max_scroll_attempts = 50 # 最大滚动尝试次数
scroll_count = 0
found_items_count = 0 # 记录已找到的项目数量
last_items_count = 0 # 记录上一次找到的项目数量
previous_items = set() # 记录前一次获取的项目集合,用于检测是否到达边界
# 首先尝试向下滑动查找
while scroll_count < max_scroll_attempts:
# 获取当前页面中的所有项目
current_items = self.get_current_items()
# self.logger.info(f"当前页面找到 {len(current_items)} 个项目: {current_items}")
# 检查目标文件是否在当前页面中
if fixed_filename in current_items:
# self.logger.info(f"找到目标文件: {fixed_filename}")
# 点击目标文件
if self.click_item_by_text(fixed_filename):
return True
else:
self.logger.error(f"点击目标文件失败: {fixed_filename}")
return False
# 检查是否到达底部:连续两次获取的项目相同
if current_items == previous_items and len(current_items) > 0:
self.logger.info("连续两次获取的项目相同,已到达列表底部")
break
# 更新前一次项目集合
previous_items = current_items.copy()
# # 记录所有看到的项目
# self.all_items.update(current_items)
# # 检查是否已经查看过所有项目
# if len(current_items) > 0 and found_items_count == len(self.all_items):
# self.logger.info("已向下查看所有项目,未找到目标文件")
# break
# # return False
# found_items_count = len(self.all_items)
# 向下滑动列表以加载更多项目
if not self.scroll_list(direction="down"):
self.logger.error("向下滑动列表失败")
return False
scroll_count += 1
self.logger.info(f"{scroll_count} 次向下滑动,继续查找...")
# 如果向下滑动未找到,尝试向上滑动查找
self.logger.info("向下滑动未找到目标,开始向上滑动查找")
# 重置滚动计数
scroll_count = 0
while scroll_count < max_scroll_attempts:
# 向上滑动列表
# 如果返回False说明已经滑动到顶
if not self.scroll_list(direction="up"):
# 检查是否是因为滑动到顶而返回False
if "已滑动到列表顶部" in self.logger.handlers[0].buffer[-1].message:
self.logger.info("已滑动到列表顶部,停止向上滑动")
break
else:
self.logger.error("向上滑动列表失败")
return False
# 获取当前页面中的所有项目
current_items = self.get_current_items()
# self.logger.info(f"向上滑动后找到 {len(current_items)} 个项目: {current_items}")
# 检查目标文件是否在当前页面中
if fixed_filename in current_items:
self.logger.info(f"找到目标文件: {fixed_filename}")
# 点击目标文件
if self.click_item_by_text(fixed_filename):
return True
else:
self.logger.error(f"点击目标文件失败: {fixed_filename}")
return False
scroll_count += 1
self.logger.info(f"{scroll_count} 次向上滑动,继续查找...")
self.logger.warning(f"经过 {max_scroll_attempts * 2} 次滑动仍未找到目标文件")
return False
except TimeoutException:
self.logger.error("等待线路列表元素超时")
return False
except Exception as e:
self.logger.error(f"查找关键词时出错: {str(e)}")
return False
def measure_tabbar_page_manager(self):
"""执行测量操作"""
try:
# 跳转到测量页面
if not go_main_click_tabber_button(self.driver, self.device_id, "com.bjjw.cjgc:id/img_3_layout"):
logging.error(f"设备 {self.device_id} 跳转到测量页面失败")
return False
# # 点击测量标签栏
# if not self.click_measure_tabbar():
# self.logger.error("点击测量标签栏失败")
# return False
# 固定文件名
fixed_filename = global_variable.GLOBAL_CURRENT_PROJECT_NAME
self.logger.info(f"开始查找测量数据: {fixed_filename}")
# 重置已看到的项目集合
self.seen_items = set()
self.all_items = set()
# 查找并点击测量数据
if self.find_keyword(fixed_filename):
self.logger.info("成功找到并点击测量数据")
return True
else:
self.logger.warning("未找到测量数据")
return False
except Exception as e:
self.logger.error(f"执行测量操作时出错: {str(e)}")
return False