Compare commits

...

6 Commits

23 changed files with 3225 additions and 279 deletions

View File

@@ -1,7 +1,9 @@
import logging
import os
import sys
import time
import subprocess
import argparse
from tkinter import E
from appium import webdriver
from appium.options.android import UiAutomator2Options
@@ -15,6 +17,7 @@ from page_objects.section_mileage_config_page import SectionMileageConfigPage
from page_objects.upload_config_page import UploadConfigPage
from page_objects.more_download_page import MoreDownloadPage
from page_objects.screenshot_page import ScreenshotPage
from check_station import CheckStation
import globals.driver_utils as driver_utils # 导入驱动工具模块
import globals.global_variable as global_variable
from page_objects.login_page import LoginPage
@@ -23,27 +26,6 @@ import globals.create_link as create_link
class DeviceAutomation:
# def __init__(self, device_id=None):
# # 如果没有提供设备ID则自动获取
# if device_id is None:
# self.device_id = driver_utils.get_device_id()
# else:
# self.device_id = device_id
# # 初始化权限
# if driver_utils.grant_appium_permissions(self.device_id):
# logging.info(f"设备 {self.device_id} 授予Appium权限成功")
# else:
# logging.warning(f"设备 {self.device_id} 授予Appium权限失败")
# # 确保Appium服务器正在运行,不在运行则启动
# if not driver_utils.check_server_status(4723):
# driver_utils.start_appium_server()
# # 初始化Appium驱动和页面对象
# self.init_driver()
# # 创建测试结果目录
# self.results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_results')
def __init__(self, driver=None, wait=None, device_id=None):
self.driver = driver
self.wait = wait
@@ -138,6 +120,7 @@ class DeviceAutomation:
logging.warning(f"设备 {self.device_id} 检查会话状态时出错: {str(inner_e)}")
return False
# # 使用全局函数初始化驱动
self.results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_results')
# self.driver, self.wait = driver_utils.init_appium_driver(self.device_id)
# 初始化页面对象
logging.info(f"设备 {self.device_id} 开始初始化页面对象")
@@ -148,6 +131,7 @@ class DeviceAutomation:
self.upload_config_page = UploadConfigPage(self.driver, self.wait, self.device_id)
self.more_download_page = MoreDownloadPage(self.driver, self.wait,self.device_id)
self.screenshot_page = ScreenshotPage(self.driver, self.wait, self.device_id)
self.check_station = CheckStation(self.driver, self.wait, self.device_id)
logging.info(f"设备 {self.device_id} 所有页面对象初始化完成")
# 检查应用是否成功启动
@@ -161,8 +145,14 @@ class DeviceAutomation:
logging.error(f"设备 {self.device_id} 初始化驱动失败: {str(e)}")
raise
def run_automation(self):
"""根据当前应用状态处理相应的操作"""
def run_automation(self, task_data=None):
"""根据当前应用状态处理相应的操作
Args:
task_data: 可选的任务数据字典,包含 user_name, line_name, line_num, id 等字段。
如果传入,则使用传入值设置全局变量;
如果不传,则使用硬编码的默认值。
"""
try:
max_retry = 3 # 限制最大重试次数
retry_count = 0
@@ -182,7 +172,7 @@ class DeviceAutomation:
login_success = False
for attempt in range(max_retries_login + 1):
if self.login_page.login("wangshun"):
if self.login_page.login("czsczq115ykl"):
login_success = True
break
else:
@@ -200,35 +190,6 @@ class DeviceAutomation:
else:
retry_count += 1
# login_btn_exists = self.login_page.is_login_page()
# if not login_btn_exists:
# logging.error(f"设备 {self.device_id} 未知应用状态,无法确定当前页面,跳转到登录页面")
# if self.login_page.navigate_to_login_page(self.driver, self.device_id):
# logging.info(f"设备 {self.device_id} 成功跳转到登录页面")
# return self.run_automation() # 递归调用处理登录后的状态
# else:
# logging.error(f"设备 {self.device_id} 跳转到登录页面失败")
# return False
# # 处理登录页面状态
# logging.info(f"设备 {self.device_id} 检测到登录页面,执行登录操作")
# max_retries = 1
# login_success = False
# for attempt in range(max_retries + 1):
# if self.login_page.login():
# login_success = True
# break
# else:
# if attempt < max_retries:
# logging.warning(f"设备 {self.device_id} 登录失败,准备重试 ({attempt + 1}/{max_retries})")
# time.sleep(2) # 等待2秒后重试
# else:
# logging.error(f"设备 {self.device_id} 登录失败,已达到最大重试次数")
# if not login_success:
# return False
logging.info(f"设备 {self.device_id} 登录成功,继续执行更新操作")
time.sleep(1)
@@ -239,43 +200,39 @@ class DeviceAutomation:
return False
task_count = 0
max_tasks = 1 # 最大任务数量,防止无限循环
while task_count < max_tasks:
# 获取测量任务
# 获取测量任务:优先使用传入的 task_data否则使用硬编码默认值
if task_data is None:
logging.info(f"设备 {self.device_id} 获取测量任务 (第{task_count + 1}次)")
# task_data = apis.get_measurement_task()
# logging.info(f"设备 {self.device_id} 获取到的测量任务: {task_data}")
task_data = {
"id": 39,
"id": 21,
"user_name": "czsczq115ykl",
"name": "czsczq115ykl",
"line_num": "L179451",
"line_name": "CDWZQ-2标-资阳沱江特大桥-23-35-山区",
"line_num": "L220179",
"line_name": "CZSCZQ-11-五工区-德达隧道6#斜井左线-DK632+707-DK632+705-山区",
"remaining": "0",
"status": 1
"status": 0
}
if not task_data:
logging.info(f"设备 {self.device_id} 未获取到状态为1的测量任务等待后重试")
time.sleep(1) # 等待1秒后重试
break
# break
# continue
# 设置全局变量
global_variable.GLOBAL_USERNAME = task_data.get('user_name', '')
global_variable.GLOBAL_CURRENT_PROJECT_NAME = task_data.get('line_name', '')
global_variable.GLOBAL_LINE_NUM = task_data.get('line_num', '')
global_variable.GLOBAL_ACCOUNT_ID = task_data.get('id', 0)
logging.info(f"设备 {self.device_id} 当前要处理的项目名称:{global_variable.GLOBAL_CURRENT_PROJECT_NAME}")
# 执行测量操作
# logging.info(f"设备 {self.device_id} 开始执行测量操作")
if not self.measure_tabbar_page.measure_tabbar_page_manager():
logging.error(f"设备 {self.device_id} 测量操作执行失败")
# # 返回到测量页面
# self.driver.back()
# self.check_and_click_confirm_popup_appium()
continue # 继续下一个任务
logging.info(f"设备 {self.device_id} 测量页面操作执行成功")
@@ -283,24 +240,33 @@ class DeviceAutomation:
logging.info(f"设备 {self.device_id} 开始执行断面里程配置")
if not self.section_mileage_config_page.section_mileage_config_page_manager():
logging.error(f"设备 {self.device_id} 断面里程配置执行失败")
continue # 继续下一个任务
# 任务完成后短暂等待
logging.info(f"设备 {self.device_id}{task_count}个任务完成")
task_count += 1
logging.info(f"设备 {self.device_id} 已完成{task_count}个任务,结束打数据流程")
if task_count == 0:
logging.error(f"没有完成打数据的线路,结束任务")
return False
# # 执行打数据加转点
# if not self.check_station.run():
# logging.error(f"设备 {self.device_id} 打数据加转点执行失败")
# return False
# # 执行上传配置管理,传入当前断点名称
# breakpoint_name = global_variable.GLOBAL_CURRENT_PROJECT_NAME
# line_num = global_variable.GLOBAL_LINE_NUM
# if self.upload_config_page.upload_config_page_manager(self.results_dir, breakpoint_name, line_num):
# logging.info(f"设备 {self.device_id} 断点 '{breakpoint_name}' 上传成功")
# upload_success_count += 1
# else:
# logging.error(f"设备 {self.device_id} 断点 '{breakpoint_name}' 上传失败")
# for i in range(3):
# if self.upload_config_page.upload_config_page_manager(self.results_dir, breakpoint_name, line_num):
# logging.info(f"设备 {self.device_id} 断点 '{breakpoint_name}' 重试上传成功")
# upload_success_count += 1
# break
# else:
# logging.error(f"设备 {self.device_id} 断点 '{breakpoint_name}' 上传失败,第 {i+1} 次重试")
# GLOBAL_TESTED_BREAKPOINT_LIST 把已打完的写入日志文件
# with open(os.path.join(self.results_dir, "打数据完成线路.txt"), "w", encoding='utf-8') as f:
# for bp in global_variable.GLOBAL_TESTED_BREAKPOINT_LIST:
# f.write(f"{bp}\n")
return task_count > 0
# return task_count > 0
except Exception as e:
logging.error(f"设备 {self.device_id} 处理应用状态时出错: {str(e)}")
@@ -308,6 +274,14 @@ class DeviceAutomation:
# 主执行逻辑
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='DeviceAutomation 脚本 - 执行设备自动化流程')
parser.add_argument('--user_name', type=str, default='', help='用户名')
parser.add_argument('--line_name', type=str, default='', help='项目/线路名称')
parser.add_argument('--line_num', type=str, default='', help='线路编码')
parser.add_argument('--account_id', type=int, default=0, help='账号ID')
args = parser.parse_args()
device_id = create_link.setup_adb_wireless()
automation = None
try:
@@ -315,12 +289,36 @@ if __name__ == "__main__":
logging.error("未能获取设备 ID无法继续执行自动化流程")
else:
automation = DeviceAutomation(device_id=device_id)
# 如果传入了命令行参数,则构建 task_data 并传入
if args.user_name or args.line_name or args.line_num or args.account_id:
task_data = {
"id": args.account_id,
"user_name": args.user_name,
"line_num": args.line_num,
"line_name": args.line_name,
}
logging.info(f"使用命令行参数: {task_data}")
success = automation.run_automation(task_data=task_data)
else:
logging.info("未传入命令行参数,使用默认 task_data")
success = automation.run_automation()
if success:
logging.info(f"设备 {automation.device_id} 自动化流程执行成功")
print("自动化流程执行成功")
else:
logging.error(f"设备 {automation.device_id} 自动化流程执行失败")
print("自动化流程执行失败")
# 保持脚本运行,不关闭连接
logging.info("自动化流程执行完成,保持连接状态...")
logging.info("按 Ctrl+C 退出并关闭连接")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("用户中断,准备关闭连接...")
except Exception as e:
logging.error(f"设备执行出错: {str(e)}")
finally:

1070
add_trasition.py Normal file

File diff suppressed because it is too large Load Diff

Binary file not shown.

Before

Width:  |  Height:  |  Size: 143 KiB

View File

@@ -11,6 +11,8 @@ import globals.driver_utils as driver_utils # 导入驱动工具模块
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
import globals.ids as ids
@@ -108,11 +110,13 @@ class CheckStation:
return False
try:
# 查找并点击添加转点按钮
# 查找并点击添加转点按钮(双击确保点到)
add_transition_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/btn_add_ZPoint"))
)
add_transition_btn.click()
time.sleep(0.3)
add_transition_btn.click()
logging.info("已点击添加转点按钮")
return True
except TimeoutException:
@@ -200,7 +204,7 @@ class CheckStation:
logging.info("检查站点成功")
return True
def run(self, station_num: int):
def run(self):
# last_station_num = 0
url = f"https://database.yuxindazhineng.com/team-bucket/69378c5b4f42d83d9504560d/前测点表/20260309/CDWZQ-2标-龙家沟左线大桥-0-11号墩-平原.xlsx"
@@ -257,16 +261,54 @@ class CheckStation:
# 错误处理,可以继续循环或退出
print(f"已处理{over_station_num}个站点")
# 滑动列表到底部
if not self.scroll_list_to_bottom(self.device_id):
logging.error(f"设备 {self.device_id} 下滑列表到底部失败")
return False
# 2. 点击最后一个spinner
if not self.click_last_spinner_with_retry(self.device_id):
logging.error(f"设备 {self.device_id} 点击最后一个spinner失败")
return False
# 3. 再下滑一次
if not self.scroll_down_once(self.device_id):
logging.warning(f"设备 {self.device_id} 再次下滑失败,但继续执行")
# # 截图
# self.driver.save_screenshot("check_station.png")
if not self.take_screenshot():
logging.error(f"设备 {device_id} 截图失败")
logging.error(f"设备 {self.device_id} 截图失败")
# 打完数据,截图完毕,点击平差处理按钮
if not self.click_adjustment_button(device_id):
self.logger.error(f"设备 {device_id} 点击平差处理按钮失败")
if not self.click_adjustment_button(self.device_id):
logging.error(f"设备 {self.device_id} 点击平差处理按钮失败")
return False
item = global_variable.GLOBAL_CURRENT_PROJECT_NAME
if item.endswith('-平原'):
item = item[:-3] # 去掉最后3个字符"-平原"
# 检查是否在测量页面在就重新执行选择断点滑动列表到底部点击最后一个spinner 再下滑一次,点击平差处理按钮平差
if not self.handle_back_navigation(item, self.device_id):
logging.error(f"{item}平差失败")
# 检测并处理"是 保留成果"弹窗
if not self.handle_adjustment_result_dialog():
logging.error("处理平差结果弹窗失败")
# 检查在不在测量列表页面,不在就点击返回按钮并处理弹窗
if self.check_measurement_list(self.device_id):
logging.error(f"设备 {self.device_id} 未在测量列表页面")
# 点击返回按钮并处理弹窗
if not self.execute_back_navigation_steps(self.device_id):
logging.error(f"设备 {self.device_id} 处理返回按钮确认失败")
# # 点击返回按钮并处理弹窗
# if not self.execute_back_navigation_steps(self.device_id):
# logging.error(f"设备 {self.device_id} 处理返回按钮确认失败")
return True
def click_adjustment_button(self, device_id):
@@ -280,7 +322,7 @@ class CheckStation:
bool: 是否成功点击
"""
try:
self.logger.info(f"设备 {device_id} 查找平差处理按钮")
logging.info(f"设备 {device_id} 查找平差处理按钮")
# 查找平差处理按钮
adjustment_button = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn")
@@ -288,24 +330,657 @@ class CheckStation:
# 验证按钮文本
button_text = adjustment_button.text
if "平差处理" not in button_text:
self.logger.warning(f"设备 {device_id} 按钮文本不匹配,期望'平差处理',实际: {button_text}")
logging.warning(f"设备 {device_id} 按钮文本不匹配,期望'平差处理',实际: {button_text}")
if adjustment_button.is_displayed() and adjustment_button.is_enabled():
self.logger.info(f"设备 {device_id} 点击平差处理按钮")
logging.info(f"设备 {device_id} 点击平差处理按钮")
adjustment_button.click()
time.sleep(3) # 等待平差处理完成
return True
else:
self.logger.error(f"设备 {device_id} 平差处理按钮不可点击")
logging.error(f"设备 {device_id} 平差处理按钮不可点击")
return False
except NoSuchElementException:
self.logger.error(f"设备 {device_id} 未找到平差处理按钮")
logging.error(f"设备 {device_id} 未找到平差处理按钮")
return False
except Exception as e:
self.logger.error(f"设备 {device_id} 点击平差处理按钮时发生错误: {str(e)}")
logging.error(f"设备 {device_id} 点击平差处理按钮时发生错误: {str(e)}")
return False
def handle_back_navigation(self, breakpoint_name, device_id):
"""
完整的返回导航处理流程
Args:
breakpoint_name: 断点名称
device_id: 设备ID
Returns:
bool: 整个返回导航流程是否成功
"""
try:
# time.sleep(2)
logging.info(f"已点击平差处理按钮,检查是否在测量页面")
# 检测是否存在测量列表
has_measurement_list = self.check_measurement_list(device_id)
if not has_measurement_list:
logging.info(f"设备 {device_id} 存在测量列表,重新执行平差流程")
# 把断点名称给find_keyword
if not self.find_keyword(breakpoint_name):
logging.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名")
return False
if not self.handle_measurement_dialog():
logging.error(f"设备 {device_id} 处理测量弹窗失败")
return False
if not self.check_apply_btn():
logging.error(f"设备 {device_id} 检查平差处理按钮失败")
return False
# 4. 点击平差处理按钮
if not self.click_adjustment_button(device_id):
logging.error(f"设备 {device_id} 点击平差处理按钮失败")
return False
logging.info(f"重新选择断点并点击平差处理按钮成功")
return True
else:
logging.info(f"不在测量页面,继续执行后续返回操作")
return True
except Exception as e:
logging.error(f"设备 {device_id} 处理返回导航时发生错误: {str(e)}")
return False
def handle_adjustment_result_dialog(self):
"""处理平差结果确认弹窗"""
try:
logging.info("开始检测平差结果弹窗")
# 等待弹窗出现最多等待5秒
warning_dialog = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
)
# 验证弹窗内容
alert_title = warning_dialog.find_element(AppiumBy.ID, "android:id/alertTitle")
alert_message = warning_dialog.find_element(AppiumBy.ID, "android:id/message")
logging.info(f"检测到弹窗 - 标题: {alert_title.text}, 消息: {alert_message.text}")
# 确认是目标弹窗
if "警告" in alert_title.text and "是否保留测量成果" in alert_message.text:
logging.info("确认是平差结果确认弹窗")
# 点击"是 保留成果"按钮
yes_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button1")
if yes_button.text == "是 保留成果":
yes_button.click()
logging.info("已点击'是 保留成果'按钮")
# 等待弹窗消失
WebDriverWait(self.driver, 5).until(
EC.invisibility_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
)
logging.info("弹窗已关闭")
return True
else:
logging.error(f"按钮文本不匹配,期望'是 保留成果',实际: {yes_button.text}")
return False
else:
logging.warning("弹窗内容不匹配,不是目标弹窗")
return False
except TimeoutException:
logging.info("未检测到平差结果弹窗,继续流程")
return True # 没有弹窗也是正常情况
except Exception as e:
logging.error(f"处理平差结果弹窗时出错: {str(e)}")
return False
def check_measurement_list(self, device_id):
"""
检查是否存在测量列表
Args:
device_id: 设备ID
Returns:
bool: 如果不存在测量列表返回True存在返回False
"""
try:
# 等待线路列表容器出现
self.wait.until(
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
)
logging.info("线路列表容器已找到")
# 如果存在MEASURE_LIST_ID说明有测量列表不需要执行后续步骤
logging.info(f"设备 {device_id} 存在测量列表,无需执行后续返回操作")
return False
except TimeoutException:
# 等待超时,说明没有测量列表
logging.info(f"设备 {device_id} 未找到测量列表,可以继续执行后续步骤")
return True
except Exception as e:
logging.error(f"设备 {device_id} 检查测量列表时发生错误: {str(e)}")
return True
def find_keyword(self, fixed_filename):
"""查找指定关键词并点击,支持向下和向上滑动查找"""
try:
# if not check_session_valid(self.driver, self.device_id):
# logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
# if not reconnect_driver(self.device_id, self.driver):
# logging.error(f"设备 {self.device_id} 驱动重连失败")
# 等待线路列表容器出现
self.wait.until(
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
)
logging.info("线路列表容器已找到")
max_scroll_attempts = 100 # 最大滚动尝试次数
scroll_count = 0
previous_items = set() # 记录前一次获取的项目集合,用于检测是否到达边界
# 首先尝试向下滑动查找
while scroll_count < max_scroll_attempts:
# 获取当前页面中的所有项目
current_items = self.get_current_items()
logging.info(f"当前页面找到 {len(current_items)} 个项目: {current_items}")
# 检查目标文件是否在当前页面中
if fixed_filename in current_items:
logging.info(f"找到目标文件: {fixed_filename}")
# 点击目标文件
if self.click_item_by_text(fixed_filename):
return True
else:
logging.error(f"点击目标文件失败: {fixed_filename}")
return False
# 检查是否到达底部:连续两次获取的项目相同
if current_items == previous_items and len(current_items) > 0:
logging.info("连续两次获取的项目相同,已到达列表底部")
break
# 更新前一次项目集合
previous_items = current_items.copy()
# 向下滑动列表以加载更多项目
if not self.scroll_list(direction="down"):
logging.error("向下滑动列表失败")
return False
scroll_count += 1
logging.info(f"{scroll_count} 次向下滑动,继续查找...")
# 如果向下滑动未找到,尝试向上滑动查找
logging.info("向下滑动未找到目标,开始向上滑动查找")
# 重置滚动计数
scroll_count = 0
while scroll_count < max_scroll_attempts:
# 向上滑动列表
# 如果返回False说明已经滑动到顶
if not self.scroll_list(direction="up"):
# 检查是否是因为滑动到顶而返回False
if "已滑动到列表顶部" in logging.handlers[0].buffer[-1].message:
logging.info("已滑动到列表顶部,停止向上滑动")
break
else:
logging.error("向上滑动列表失败")
return False
# 获取当前页面中的所有项目
current_items = self.get_current_items()
# logging.info(f"向上滑动后找到 {len(current_items)} 个项目: {current_items}")
# 检查目标文件是否在当前页面中
if fixed_filename in current_items:
logging.info(f"找到目标文件: {fixed_filename}")
# 点击目标文件
if self.click_item_by_text(fixed_filename):
return True
else:
logging.error(f"点击目标文件失败: {fixed_filename}")
return False
scroll_count += 1
logging.info(f"{scroll_count} 次向上滑动,继续查找...")
logging.warning(f"经过 {max_scroll_attempts * 2} 次滑动仍未找到目标文件")
return False
except TimeoutException:
logging.error("等待线路列表元素超时")
return False
except Exception as e:
logging.error(f"查找关键词时出错: {str(e)}")
return False
def get_current_items(self):
"""获取当前页面中的所有项目文本"""
try:
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
item_texts = []
for item in items:
try:
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
if title_element and title_element.text:
item_texts.append(title_element.text)
except NoSuchElementException:
continue
return item_texts
except Exception as e:
logging.error(f"获取当前项目失败: {str(e)}")
return []
def click_item_by_text(self, text):
"""点击指定文本的项目"""
try:
# 查找包含指定文本的项目
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
for item in items:
try:
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
if title_element and title_element.text == text:
title_element.click()
logging.info(f"已点击项目: {text}")
return True
except NoSuchElementException:
continue
logging.warning(f"未找到可点击的项目: {text}")
return False
except Exception as e:
logging.error(f"点击项目失败: {str(e)}")
return False
def handle_measurement_dialog(self):
"""处理测量弹窗 - 选择继续测量"""
try:
logging.info("检查测量弹窗...")
# 直接尝试点击"继续测量"按钮
continue_btn = WebDriverWait(self.driver, 2).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/measure_continue_btn"))
)
continue_btn.click()
logging.info("已点击'继续测量'按钮")
return True
except TimeoutException:
logging.info("未找到继续测量按钮,可能没有弹窗")
return True # 没有弹窗也认为是成功的
except Exception as e:
logging.error(f"点击继续测量按钮时出错: {str(e)}")
return False
# 检查有没有平差处理按钮
def check_apply_btn(self):
"""检查是否有平差处理按钮"""
try:
apply_btn = WebDriverWait(self.driver, 1).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn"))
)
if apply_btn.is_displayed():
logging.info("进入平差页面")
else:
logging.info("没有找到'平差处理'按钮")
return True
except TimeoutException:
logging.info("未找到平差处理按钮")
return False # 没有弹窗也认为是成功的
except Exception as e:
logging.error(f"点击平差处理按钮时出错: {str(e)}")
return False
def execute_back_navigation_steps(self, device_id):
"""
执行实际的返回导航步骤
Args:
device_id: 设备ID
Returns:
bool: 导航是否成功
"""
try:
# 1. 首先点击返回按钮
if not self.click_back_button(device_id):
logging.error(f"设备 {device_id} 点击返回按钮失败")
return False
# 2. 处理返回确认弹窗
logging.info(f"已点击返回按钮,等待处理返回确认弹窗")
if not self.handle_confirmation_dialog(device_id):
logging.error(f"设备 {device_id} 处理返回确认弹窗失败")
# return False
# 3. 验证是否成功返回到上一页面
time.sleep(0.5) # 等待页面跳转完成
# 可以添加页面验证逻辑,比如检查是否返回到预期的页面
# 这里可以根据实际应用添加特定的页面元素验证
logging.info(f"设备 {device_id} 返回导航流程完成")
return True
except Exception as e:
logging.error(f"设备 {device_id} 执行返回导航步骤时发生错误: {str(e)}")
return False
def click_back_button(self, device_id):
"""点击手机系统返回按钮"""
try:
self.driver.back()
logging.info("已点击手机系统返回按钮")
return True
except Exception as e:
logging.error(f"点击手机系统返回按钮失败: {str(e)}")
return False
def handle_confirmation_dialog(self, device_id, timeout=2):
"""
处理确认弹窗,点击""按钮
Args:
device_id: 设备ID
timeout: 等待弹窗的超时时间
Returns:
bool: 是否成功处理弹窗
"""
# 等待弹窗出现最多等待2秒
try:
max_attempts = 2
for attempt in range(max_attempts):
try:
dialog_message = WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']"))
)
logging.info(f"设备 {device_id} 检测到确认弹窗 (第 {attempt + 1} 次)")
# 查找并点击"是"按钮
confirm_button = self.driver.find_element(
AppiumBy.XPATH,
"//android.widget.Button[@text='' and @resource-id='android:id/button1']"
)
if confirm_button.is_displayed() and confirm_button.is_enabled():
logging.info(f"设备 {device_id} 点击确认弹窗的''按钮 (第 {attempt + 1} 次)")
confirm_button.click()
time.sleep(0.5)
# 如果是第一次尝试,继续检查是否还有弹窗
if attempt < max_attempts - 1:
logging.info(f"设备 {device_id} 等待 1 秒后检查是否还有弹窗")
time.sleep(0.5)
continue
return True
else:
logging.error(f"设备 {device_id} ''按钮不可点击")
return False
except TimeoutException:
# 超时未找到弹窗,认为没有弹窗,返回成功
logging.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功")
return True
except Exception as e:
logging.error(f"设备 {device_id} 处理确认弹窗时出错: {str(e)}")
return False
def scroll_list_to_bottom(self, device_id, max_swipes=60):
"""
下滑列表到最底端
Args:
device_id: 设备ID
max_swipes: 最大下滑次数
Returns:
bool: 是否滑动到底部
"""
try:
logging.info(f"设备 {device_id} 开始下滑列表到底部")
# 获取列表元素
list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list")
logging.info(f"时间戳1: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
same_content_count = 0
# 初始化第一次的子元素文本
initial_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView")
logging.info(f"时间戳2: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
# current_child_texts = "|".join([
# elem.text.strip() for elem in initial_child_elements
# if elem.text and elem.text.strip()
# ])
current_child_texts = "|".join(
elem_text.strip() for elem in initial_child_elements
if (elem_text := elem.text) and elem_text.strip()
)
logging.info(f"时间戳3: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
for i in range(max_swipes):
# 执行下滑操作
self.driver.execute_script("mobile: scrollGesture", {
'elementId': list_view.id,
'direction': 'down',
'percent': 0.8,
'duration': 500
})
# 获取滑动后的子元素文本
new_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView")
new_child_texts = "|".join(
elem_text.strip() for elem in new_child_elements
if (elem_text := elem.text) and elem_text.strip()
)
# 判断内容是否变化若连续3次相同认为到达底部
if new_child_texts == current_child_texts:
same_content_count += 1
if same_content_count >= 2:
logging.info(f"设备 {device_id} 列表已滑动到底部,共滑动 {i+1}")
return True
else:
same_content_count = 0 # 内容变化,重置计数
current_child_texts = new_child_texts # 更新上一次内容
logging.debug(f"设备 {device_id}{i+1} 次下滑完成,当前子元素文本: {new_child_texts[:50]}...") # 打印部分文本
logging.warning(f"设备 {device_id} 达到最大下滑次数 {max_swipes},可能未完全到底部")
return True
except Exception as e:
logging.error(f"设备 {device_id} 下滑列表时发生错误: {str(e)}")
return False
def click_last_spinner_with_retry(self, device_id, max_retries=2):
"""带重试机制的点击方法"""
for attempt in range(max_retries):
try:
if self.click_last_spinner(device_id):
return True
logging.warning(f"设备 {device_id}{attempt + 1}次点击失败,准备重试")
time.sleep(0.5) # 重试前等待
except Exception as e:
logging.error(f"设备 {device_id}{attempt + 1}次尝试失败: {str(e)}")
logging.error(f"设备 {device_id} 所有重试次数已用尽")
return False
def click_last_spinner(self, device_id):
"""
点击最后一个spinner
Args:
device_id: 设备ID
Returns:
bool: 是否成功点击
"""
try:
logging.info(f"设备 {device_id} 查找最后一个spinner")
# 查找所有的spinner元素
spinners = self.driver.find_elements(AppiumBy.ID, "com.bjjw.cjgc:id/spinner")
if not spinners:
logging.error(f"设备 {device_id} 未找到任何spinner元素")
return False
# 获取最后一个spinner
last_spinner = spinners[-1]
if not (last_spinner.is_displayed() and last_spinner.is_enabled()):
logging.error(f"设备 {device_id} 最后一个spinner不可点击")
return False
# 点击操作
logging.info(f"设备 {device_id} 点击最后一个spinner")
last_spinner.click()
# 执行额外一次下滑操作
self.scroll_down_once(device_id)
max_retries = 3 # 最大重试次数
retry_count = 0
wait_timeout = 5 # 增加等待时间到5秒
while retry_count < max_retries:
try:
# 确保device_id正确设置使用全局变量作为备用
if not hasattr(self, 'device_id') or not self.device_id:
# 优先使用传入的device_id其次使用全局变量
self.device_id = device_id if device_id else global_variable.get_device_id()
# 使用self.device_id确保有默认值
actual_device_id = self.device_id if self.device_id else global_variable.get_device_id()
if not driver_utils.check_session_valid(self.driver, actual_device_id):
logging.warning(f"设备 {actual_device_id} 会话无效,尝试重新连接驱动...")
try:
# 使用正确的设备ID进行重连
new_driver, new_wait = reconnect_driver(actual_device_id, self.driver)
if new_driver:
self.driver = new_driver
self.wait = new_wait
logging.info(f"设备 {actual_device_id} 驱动重连成功")
else:
logging.error(f"设备 {actual_device_id} 驱动重连失败")
retry_count += 1
continue
except Exception as e:
logging.error(f"设备 {actual_device_id} 驱动重连异常: {str(e)}")
retry_count += 1
continue
# 点击spinner如果是重试需要重新获取元素
if retry_count > 0:
spinners = self.driver.find_elements(AppiumBy.CLASS_NAME, "android.widget.Spinner")
if not spinners:
logging.error(f"设备 {device_id} 未找到spinner元素")
retry_count += 1
continue
last_spinner = spinners[-1]
if not (last_spinner.is_displayed() and last_spinner.is_enabled()):
logging.error(f"设备 {device_id} spinner不可点击")
retry_count += 1
continue
logging.info(f"设备 {device_id} 重新点击spinner")
last_spinner.click()
# 重试时也执行下滑操作
self.scroll_down_once(device_id)
# 等待下拉菜单出现增加等待时间到5秒
wait = WebDriverWait(self.driver, wait_timeout)
detail_show = wait.until(
EC.presence_of_element_located((AppiumBy.ID, "com.bjjw.cjgc:id/detailshow"))
)
if detail_show.is_displayed():
logging.info(f"设备 {device_id} spinner点击成功下拉菜单已展开")
return True
else:
logging.error(f"设备 {device_id} 下拉菜单未显示")
retry_count += 1
continue
except Exception as wait_error:
error_msg = str(wait_error)
logging.error(f"设备 {device_id} 等待下拉菜单超时 (第{retry_count+1}次尝试): {error_msg}")
# 检查是否是连接断开相关的错误
if not driver_utils.check_session_valid(self.driver, self.device_id):
logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
# if any(keyword in error_msg for keyword in ['socket hang up', 'Could not proxy command']):
# logging.warning(f"设备 {device_id} 检测到连接相关错误,尝试重连...")
if not reconnect_driver(self.device_id, self.driver):
logging.error(f"设备 {device_id} 驱动重连失败")
retry_count += 1
if retry_count < max_retries:
logging.info(f"设备 {device_id} 将在1秒后进行第{retry_count+1}次重试")
time.sleep(1) # 等待1秒后重试
logging.error(f"设备 {device_id} 经过{max_retries}次重试后仍无法展开下拉菜单")
return False
except Exception as e:
logging.error(f"设备 {device_id} 点击最后一个spinner时发生错误: {str(e)}")
return False
def scroll_down_once(self, device_id):
"""
再次下滑一次
Args:
device_id: 设备ID
Returns:
bool: 是否成功下滑
"""
try:
logging.info(f"设备 {device_id} 执行额外一次下滑")
# 获取列表元素
list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list")
# 执行下滑操作
self.driver.execute_script("mobile: scrollGesture", {
'elementId': list_view.id,
'direction': 'down',
'percent': 0.5
})
time.sleep(0.2)
logging.info(f"设备 {device_id} 额外下滑完成")
return True
except Exception as e:
logging.error(f"设备 {device_id} 额外下滑时发生错误: {str(e)}")
return False
def take_screenshot(self):
"""

1042
check_upload.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -87,11 +87,12 @@ def get_breakpoint_list():
return data
else:
logging.info(f"接口返回错误: {result.get('code')}")
return [{"id": 37,
"user_name": "wangshun",
"name": "wangshun",
"line_num": "L193588",
"line_name": "CDWZQ-2标-155号路基左线-461221-461570-155左-平原",
return [{
"id": 21,
"user_name": "czsczq115ykl",
"name": "czsczq115ykl",
"line_num": "L220179",
"line_name": "CZSCZQ-11-五工区-德达隧道6#斜井左线-DK632+707-DK632+705-山区",
"status": 3
}]
else:
@@ -433,7 +434,7 @@ def get_user_max_variation(username: str) -> Optional[int]:
try:
# 2. 发送POST请求
logging.info(f"向接口 {api_url} 发送请求,查询用户名:{username}")
logging.info(f"向接口 {api_url} 发送请求,查询用户名:{username}账号ID{account_id}")
response = requests.post(
url=api_url,
json=payload, # 自动将字典转为JSON字符串无需手动json.dumps()

View File

@@ -191,7 +191,24 @@ def setup_adb_wireless():
target_port = 4723
print(f"🚀 开始无线 ADB 建链(端口 {target_port}")
# 获取USB连接的设备
# ===== 优先检查是否已有无线连接 =====
wireless_connections = check_wireless_connections(target_port)
if wireless_connections:
print(f"📡 发现 {len(wireless_connections)} 个现有无线连接")
for conn_id, ip, port in wireless_connections:
if port == target_port:
print(f"✅ 已存在目标端口 {target_port} 的无线连接: {conn_id}")
# 验证连接是否有效
verify_result = run_command("adb devices")
if conn_id in verify_result and "device" in verify_result.split(conn_id)[1][:10]:
print(f"✅ 连接验证通过")
# 启动 Appium如果未运行
start_appium()
return conn_id
else:
print(f"⚠️ 连接验证失败,准备重新建立")
# ===== 获取USB连接的设备 =====
devices_output = run_command("adb devices")
lines = devices_output.splitlines()[1:]

View File

@@ -2,8 +2,8 @@
GLOBAL_DEVICE_ID = "" # 设备ID
GLOBAL_USERNAME = "czyuzongwen" # 用户名
GLOBAL_ACCOUNT_ID = "" # 账号ID
GLOBAL_CURRENT_PROJECT_NAME = "" # 当前测试项目名称
GLOBAL_LINE_NUM = "" # 线路编码
GLOBAL_CURRENT_PROJECT_NAME = "CZSCZQ-11-五工区-德达隧道6#斜井左线-DK632+707-DK632+705-山区" # 当前测试项目名称
GLOBAL_LINE_NUM = "L220179" # 线路编码
GLOBAL_BREAKPOINT_STATUS_CODES = [0,3] # 要获取的断点状态码列表
GLOBAL_UPLOAD_BREAKPOINT_LIST = []
GLOBAL_UPLOAD_BREAKPOINT_DICT = {}

View File

@@ -122,6 +122,109 @@ class LoginPage:
pass
accounts = apis.get_accounts_from_server("68ef0e02b0138d25e2ac9918")
# 检查accounts是否为None如果是则设为空列表
if accounts is None:
logging.warning("获取账户列表返回None设为空列表")
accounts = []
matches = [acc for acc in accounts if acc.get("username") == existing_username]
password = None
account_id = False
if matches:
password = matches[0].get("password")
# ✅ 关键:把 account_id 存入全局变量
account_id = matches[0].get("account_id", False)
# 只有 account_id 存在时才存全局
if account_id is not False:
global_variable.GLOBAL_ACCOUNT_ID = account_id
logging.info(f"匹配到账号信息username={existing_username}, account_id={account_id}")
else:
logging.warning(f"账号 {existing_username} 未返回 account_id已设为 False")
password_field.send_keys(password)
# 4. 可选:隐藏键盘
try:
self.driver.hide_keyboard()
except:
pass
# 点击登录按钮
max_retries = 3
retry_count = 0
while retry_count < max_retries:
login_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, ids.LOGIN_BTN))
)
login_btn.click()
logging.info(f"已点击登录按钮 (尝试 {retry_count + 1}/{max_retries})")
# 等待登录完成
time.sleep(3)
# 检查是否登录成功
if self.is_login_successful():
logging.info("登录成功")
return True
else:
logging.warning("登录后未检测到主页面元素,准备重试")
retry_count += 1
if retry_count < max_retries:
logging.info(f"等待2秒后重新尝试登录...")
time.sleep(2)
logging.error(f"登录失败,已尝试 {max_retries}")
return False
except Exception as e:
logging.error(f"登录过程中出错: {str(e)}")
return False
def login_get_username(self, username=None):
"""执行登录操作"""
try:
logging.info("正在执行登录操作...")
# 获取文本框中已有的用户名
username_field = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, ids.LOGIN_USERNAME))
)
existing_username = username_field.text
# 日志记录获取到的已有用户名(若为空,也需明确记录,避免后续误解)
if existing_username.strip(): # 去除空格后判断是否有有效内容
logging.info(f"已获取文本框中的已有用户名: {existing_username}")
else:
logging.info("文本框中未检测到已有用户名(内容为空)")
# 将用户名写入全局变量中
global_variable.GLOBAL_USERNAME = existing_username # 关键:给全局变量赋值
# 1. 定位密码输入框
password_field = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, ids.LOGIN_PASSWORD))
)
# 2. 清空密码框(如果需要)
try:
password_field.clear()
# time.sleep(0.5) # 等待清除完成
except:
# 如果clear方法不可用尝试其他方式
pass
accounts = apis.get_accounts_from_server("68ef0e02b0138d25e2ac9918")
# 检查accounts是否为None如果是则设为空列表
if accounts is None:
logging.warning("获取账户列表返回None设为空列表")
accounts = []
matches = [acc for acc in accounts if acc.get("username") == existing_username]
password = None
account_id = False
@@ -180,6 +283,7 @@ class LoginPage:
return False
def is_login_successful(self):
"""检查登录是否成功"""
try:

View File

@@ -1657,7 +1657,7 @@ class ScreenshotPage:
return True
def screenshot_page_manager(self, device_id, results_dir):
def screenshot_page_manager(self, device_id):
"""执行截图页面管理操作"""
max_retries = 3
retry_count = 0
@@ -1675,138 +1675,102 @@ class ScreenshotPage:
self.logger.error("重新初始化driver失败")
return False
# 加载指定文件中的线路编码和时间到全局字典
if not self.load_line_time_mapping_dict("20251022.1.CZSCZQ-3fhg0410.txt", "D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510"):
self.logger.error(f"设备 {device_id} 加载线路时间映射字典失败")
return False
# # 加载指定文件中的线路编码和时间到全局字典
# if not self.load_line_time_mapping_dict("20251022.1.CZSCZQ-3fhg0410.txt", "D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510"):
# self.logger.error(f"设备 {device_id} 加载线路时间映射字典失败")
# return False
time.sleep(5)
# time.sleep(5)
# 循环检查数据数量是否一致,直到获取到完整数据
data_retry_count = 0
while True:
# 获取断点列表和线路时间字典的数量
breakpoint_count = len(global_variable.GLOBAL_BREAKPOINT_DICT)
line_time_count = len(global_variable.LINE_TIME_MAPPING_DICT)
self.logger.info(f"设备 {device_id} 断点列表数量: {breakpoint_count}, 文件中获取的线路时间数量: {line_time_count}")
# # 循环检查数据数量是否一致,直到获取到完整数据
# data_retry_count = 0
# while True:
# # 获取断点列表和线路时间字典的数量
# breakpoint_count = len(global_variable.GLOBAL_BREAKPOINT_DICT)
# line_time_count = len(global_variable.LINE_TIME_MAPPING_DICT)
# self.logger.info(f"设备 {device_id} 断点列表数量: {breakpoint_count}, 文件中获取的线路时间数量: {line_time_count}")
# 如果断点列表为空,无法比较,直接跳出循环
if breakpoint_count == 0:
self.logger.warning(f"设备 {device_id} 断点列表为空,无法进行数量比较")
break
# # 如果断点列表为空,无法比较,直接跳出循环
# if breakpoint_count == 0:
# self.logger.warning(f"设备 {device_id} 断点列表为空,无法进行数量比较")
# break
# 如果数量一致,获取到完整数据,跳出循环
# if line_time_count >= 2:
if line_time_count >= breakpoint_count:
self.logger.info(f"设备 {device_id} 数据数量一致,已获取完整数据")
break
# # 如果数量一致,获取到完整数据,跳出循环
# # if line_time_count >= 2:
# if line_time_count >= breakpoint_count:
# self.logger.info(f"设备 {device_id} 数据数量一致,已获取完整数据")
# break
# 数量不一致,等待三分钟后再次获取文件
data_retry_count += 1
self.logger.warning(f"设备 {device_id} 数据数量不一致: 断点列表({breakpoint_count}) != 线路时间({line_time_count}),第{data_retry_count}次重试等待1分钟后重新加载文件")
time.sleep(60) # 等待1分钟
# # 数量不一致,等待三分钟后再次获取文件
# data_retry_count += 1
# self.logger.warning(f"设备 {device_id} 数据数量不一致: 断点列表({breakpoint_count}) != 线路时间({line_time_count}),第{data_retry_count}次重试等待1分钟后重新加载文件")
# time.sleep(60) # 等待1分钟
# 重新加载文件
if not self.load_line_time_mapping_dict("20251022.1.CZSCZQ-3fhg0410.txt", "D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510"):
self.logger.error(f"设备 {device_id} 重新加载线路时间映射字典失败")
else:
self.logger.info(f"设备 {device_id} 重新加载完成,新的线路时间数量: {len(global_variable.LINE_TIME_MAPPING_DICT)}")
# # 重新加载文件
# if not self.load_line_time_mapping_dict("20251022.1.CZSCZQ-3fhg0410.txt", "D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510"):
# self.logger.error(f"设备 {device_id} 重新加载线路时间映射字典失败")
# else:
# self.logger.info(f"设备 {device_id} 重新加载完成,新的线路时间数量: {len(global_variable.LINE_TIME_MAPPING_DICT)}")
# 检查GLOBAL_BREAKPOINT_DICT是否为空如果为空则初始化一些测试数据
if not global_variable.GLOBAL_BREAKPOINT_DICT:
self.logger.warning("global_variable.GLOBAL_BREAKPOINT_DICT为空正在初始化测试数据")
global_variable.GLOBAL_BREAKPOINT_DICT = {
"CZSCZQ-3-康定2号隧道-DK297+201-DK297+199-山区": {
'breakpoint_name': "CZSCZQ-3-康定2号隧道-DK297+201-DK297+199-山区",
'line_num': "L205413"
},
"CZSCZQ-3-康定2号隧道-DK296+701-DK296+699-山区": {
'breakpoint_name': "CZSCZQ-3-康定2号隧道-DK296+701-DK296+699-山区",
'line_num': "L205414"
}
}
# if not global_variable.GLOBAL_BREAKPOINT_DICT:
# self.logger.warning("global_variable.GLOBAL_BREAKPOINT_DICT为空正在初始化测试数据")
# global_variable.GLOBAL_BREAKPOINT_DICT = {
# "CZSCZQ-3-康定2号隧道-DK297+201-DK297+199-山区": {
# 'breakpoint_name': "CZSCZQ-3-康定2号隧道-DK297+201-DK297+199-山区",
# 'line_num': "L205413"
# },
# "CZSCZQ-3-康定2号隧道-DK296+701-DK296+699-山区": {
# 'breakpoint_name': "CZSCZQ-3-康定2号隧道-DK296+701-DK296+699-山区",
# 'line_num': "L205414"
# }
# }
# 创建断点列表的副本,用于重试时重新处理
breakpoint_names = list(global_variable.GLOBAL_BREAKPOINT_DICT.keys())
processed_breakpoints = []
# # 创建断点列表的副本,用于重试时重新处理
# breakpoint_names = list(global_variable.GLOBAL_BREAKPOINT_DICT.keys())
# processed_breakpoints = []
# 开始循环处理断点
for breakpoint_name in breakpoint_names:
if breakpoint_name in processed_breakpoints:
continue
breakpoint_name = global_variable.GLOBAL_CURRENT_PROJECT_NAME
self.logger.info(f"开始处理要平差截图的断点 {breakpoint_name}")
# 把断点名称给find_keyword
if not self.find_keyword(breakpoint_name):
self.logger.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名")
continue # 继续处理下一个断点
if not self.handle_measurement_dialog():
self.logger.error(f"设备 {device_id} 处理测量弹窗失败")
continue
if not self.check_apply_btn():
self.logger.error(f"设备 {device_id} 检查平差处理按钮失败")
continue
# 根据断点名称在GLOBAL_BREAKPOINT_DICT中获取对应的字典
breakpoint_info = global_variable.GLOBAL_BREAKPOINT_DICT.get(breakpoint_name)
if not breakpoint_info:
self.logger.error(f"设备 {device_id} 未找到断点 {breakpoint_name} 对应的信息")
continue
# 从字典中获取线路编码
line_code = breakpoint_info.get('line_num')
if not line_code:
self.logger.error(f"设备 {device_id} 断点 {breakpoint_name} 没有线路编码信息")
continue
# 根据线路编码查找对应的时间
date_str, time_str = self.get_line_end_time(line_code)
if not time_str or not date_str:
self.logger.error(f"设备 {device_id} 未找到线路 {line_code} 对应的时间")
continue
# 修改时间
if not self.set_device_time(device_id, time_str, date_str):
self.logger.error(f"设备 {device_id} 设置设备时间失败")
continue
# 滑动列表到底部
if not self.scroll_list_to_bottom(device_id):
self.logger.error(f"设备 {device_id} 下滑列表到底部失败")
continue
# 2. 点击最后一个spinner
if not self.click_last_spinner_with_retry(device_id):
self.logger.error(f"设备 {device_id} 点击最后一个spinner失败")
continue
# 3. 再下滑一次
if not self.scroll_down_once(device_id):
self.logger.warning(f"设备 {device_id} 再次下滑失败,但继续执行")
if not self.take_screenshot(breakpoint_name):
self.logger.error(f"设备 {device_id} 截图失败")
# 4. 点击平差处理按钮
if not self.click_adjustment_button(device_id):
self.logger.error(f"设备 {device_id} 点击平差处理按钮失败")
continue
# 检查是否在测量页面在就重新执行选择断点滑动列表到底部点击最后一个spinner 再下滑一次,点击平差处理按钮平差
if not self.handle_back_navigation(breakpoint_name, device_id):
self.logger.error(f"{breakpoint_name}平差失败,未截图")
continue
# 检测并处理"是 保留成果"弹窗
if not self.handle_adjustment_result_dialog():
self.logger.error("处理平差结果弹窗失败")
continue
# 平差完成,将断点数据保存到上传列表中
if not self.add_breakpoint_to_upload_list(breakpoint_name, line_code):
self.logger.error(f"设备 {device_id} 保存断点 {breakpoint_name} 到上传列表失败")
continue
# 检查是否在测量页面在就重新执行选择断点滑动列表到底部点击最后一个spinner 再下滑一次,点击平差处理按钮平差
if not self.handle_back_navigation(breakpoint_name, device_id):
@@ -1818,21 +1782,113 @@ class ScreenshotPage:
self.logger.error("处理平差结果弹窗失败")
continue
# 平差处理完成后截图
time.sleep(3) # 等待平差处理按钮点击后的界面变化
logging.info("断点保存到上传列表成功,开始截图")
if not self.take_screenshot(breakpoint_name):
self.logger.error(f"设备 {device_id} 截图失败")
continue
# 点击返回按钮并处理弹窗
if not self.execute_back_navigation_steps(device_id):
self.logger.error(f"设备 {device_id} 处理返回按钮确认失败")
continue
# 成功处理完一个断点,添加到已处理列表
processed_breakpoints.append(breakpoint_name)
self.logger.info(f"成功处理断点: {breakpoint_name}")
# for breakpoint_name in breakpoint_names:
# if breakpoint_name in processed_breakpoints:
# continue
# self.logger.info(f"开始处理要平差截图的断点 {breakpoint_name}")
# # 把断点名称给find_keyword
# if not self.find_keyword(breakpoint_name):
# self.logger.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名")
# continue # 继续处理下一个断点
# if not self.handle_measurement_dialog():
# self.logger.error(f"设备 {device_id} 处理测量弹窗失败")
# continue
# if not self.check_apply_btn():
# self.logger.error(f"设备 {device_id} 检查平差处理按钮失败")
# continue
# # 根据断点名称在GLOBAL_BREAKPOINT_DICT中获取对应的字典
# breakpoint_info = global_variable.GLOBAL_BREAKPOINT_DICT.get(breakpoint_name)
# if not breakpoint_info:
# self.logger.error(f"设备 {device_id} 未找到断点 {breakpoint_name} 对应的信息")
# continue
# # 从字典中获取线路编码
# line_code = breakpoint_info.get('line_num')
# if not line_code:
# self.logger.error(f"设备 {device_id} 断点 {breakpoint_name} 没有线路编码信息")
# continue
# # 根据线路编码查找对应的时间
# date_str, time_str = self.get_line_end_time(line_code)
# if not time_str or not date_str:
# self.logger.error(f"设备 {device_id} 未找到线路 {line_code} 对应的时间")
# continue
# # 修改时间
# if not self.set_device_time(device_id, time_str, date_str):
# self.logger.error(f"设备 {device_id} 设置设备时间失败")
# continue
# # 滑动列表到底部
# if not self.scroll_list_to_bottom(device_id):
# self.logger.error(f"设备 {device_id} 下滑列表到底部失败")
# continue
# # 2. 点击最后一个spinner
# if not self.click_last_spinner_with_retry(device_id):
# self.logger.error(f"设备 {device_id} 点击最后一个spinner失败")
# continue
# # 3. 再下滑一次
# if not self.scroll_down_once(device_id):
# self.logger.warning(f"设备 {device_id} 再次下滑失败,但继续执行")
# # 4. 点击平差处理按钮
# if not self.click_adjustment_button(device_id):
# self.logger.error(f"设备 {device_id} 点击平差处理按钮失败")
# continue
# # 检查是否在测量页面在就重新执行选择断点滑动列表到底部点击最后一个spinner 再下滑一次,点击平差处理按钮平差
# if not self.handle_back_navigation(breakpoint_name, device_id):
# self.logger.error(f"{breakpoint_name}平差失败,未截图")
# continue
# # 检测并处理"是 保留成果"弹窗
# if not self.handle_adjustment_result_dialog():
# self.logger.error("处理平差结果弹窗失败")
# continue
# # 平差完成,将断点数据保存到上传列表中
# if not self.add_breakpoint_to_upload_list(breakpoint_name, line_code):
# self.logger.error(f"设备 {device_id} 保存断点 {breakpoint_name} 到上传列表失败")
# continue
# # 检查是否在测量页面在就重新执行选择断点滑动列表到底部点击最后一个spinner 再下滑一次,点击平差处理按钮平差
# if not self.handle_back_navigation(breakpoint_name, device_id):
# self.logger.error(f"{breakpoint_name}平差失败,未截图")
# continue
# # 检测并处理"是 保留成果"弹窗
# if not self.handle_adjustment_result_dialog():
# self.logger.error("处理平差结果弹窗失败")
# continue
# # 平差处理完成后截图
# time.sleep(3) # 等待平差处理按钮点击后的界面变化
# logging.info("断点保存到上传列表成功,开始截图")
# if not self.take_screenshot(breakpoint_name):
# self.logger.error(f"设备 {device_id} 截图失败")
# continue
# # 点击返回按钮并处理弹窗
# if not self.execute_back_navigation_steps(device_id):
# self.logger.error(f"设备 {device_id} 处理返回按钮确认失败")
# continue
# # 成功处理完一个断点,添加到已处理列表
# processed_breakpoints.append(breakpoint_name)
# self.logger.info(f"成功处理断点: {breakpoint_name}")
# 检查是否所有断点都处理完成
if len(processed_breakpoints) == len(breakpoint_names):

View File

@@ -741,11 +741,13 @@ class SectionMileageConfigPage:
def add_transition_point(self):
"""添加转点"""
try:
# 查找并点击添加转点按钮
# 查找并点击添加转点按钮(双击确保点到)
add_transition_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/btn_add_ZPoint"))
)
add_transition_btn.click()
time.sleep(0.3)
add_transition_btn.click()
self.logger.info("已点击添加转点按钮")
return True
except TimeoutException:
@@ -981,7 +983,7 @@ class SectionMileageConfigPage:
try:
self.logger.info("检查线路弹出测量弹窗...")
# 直接尝试点击"继续测量"按钮
# 直接尝试点击"重新测量"按钮
remeasure_btn = WebDriverWait(self.driver, 2).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/measure_remeasure_all_btn"))
)
@@ -1087,26 +1089,7 @@ class SectionMileageConfigPage:
return False
# if not self.check_station_page.run():
# self.logger.error("检查站页面运行失败")
# return False
# # 添加断点到列表
# if not self.add_breakpoint_to_tested_list():
# return False
# # 点击返回按钮
# if not self.click_back_button():
# return False
# # 测量结束。点击手机物理返回按钮,返回测量页面
# # 点击了手机独步导航栏返回键
# if not self.click_system_back_button():
# return False
self.logger.info("断面里程配置完成,正在执行测量")
self.logger.info("断面里程配置完成,待执行测量")
return True
except Exception as e:

View File

@@ -2093,12 +2093,12 @@ class UploadConfigPage:
else:
self.logger.info("页面中包含变化量属性,继续执行后续操作")
user_id = global_variable.GLOBAL_USERNAME
if user_id is None:
self.logger.error("获取用户ID失败")
user_name = global_variable.GLOBAL_USERNAME
if user_name is None:
self.logger.error("获取用户失败")
return False
max_variation = apis.get_user_max_variation(user_id)
max_variation = apis.get_user_max_variation(user_name)
if max_variation is None:
self.logger.error("获取用户最大变化量失败")
return False

Binary file not shown.

After

Width:  |  Height:  |  Size: 312 KiB

BIN
上传人员信息.xlsx Normal file

Binary file not shown.