Files
cjgc_screenshot/page_objects/screenshot_page.py
2026-02-10 17:39:58 +08:00

1577 lines
70 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# screenshot_page.py
import subprocess
import logging
import time
import re
import os
import threading
from datetime import datetime
from appium.webdriver.common.appiumby import AppiumBy
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import globals.global_variable as global_variable # 导入全局变量模块
import globals.ids as ids
# 导入全局驱动工具函数
from globals.driver_utils import check_session_valid, reconnect_driver
import globals.driver_utils as driver_utils
import globals.global_variable as global_variable # 导入全局变量模块
class ScreenshotPage:
def __init__(self, driver, wait, device_id=None):
self.driver = driver
self.wait = WebDriverWait(driver, 2)
self.device_id = device_id
self.logger = logging.getLogger(__name__)
self.all_items = set()
def scroll_list(self, direction="down"):
"""滑动列表以加载更多项目
Args:
direction: 滑动方向,"down"表示向下滑动,"up"表示向上滑动
Returns:
bool: 滑动是否成功执行对于向上滑动如果滑动到顶则返回False
"""
try:
# 获取列表容器
list_container = self.driver.find_element(AppiumBy.ID, ids.MEASURE_LIST_ID)
# 计算滑动坐标
start_x = list_container.location['x'] + list_container.size['width'] // 2
if direction == "down":
# 向下滑动
start_y = list_container.location['y'] + list_container.size['height'] * 0.95
end_y = list_container.location['y'] + list_container.size['height'] * 0.05
self.logger.info("向下滑动列表")
else:
# 向上滑动
# 记录滑动前的项目,用于判断是否滑动到顶
before_scroll_items = self.get_current_items()
start_y = list_container.location['y'] + list_container.size['height'] * 0.05
end_y = list_container.location['y'] + list_container.size['height'] * 0.95
self.logger.info("向上滑动列表")
# 执行滑动
self.driver.swipe(start_x, start_y, start_x, end_y, 1000)
# 向上滑动时,检查是否滑动到顶
if direction == "up":
after_scroll_items = self.get_current_items()
# 如果滑动后的项目与滑动前的项目相同,说明已经滑动到顶
if after_scroll_items == before_scroll_items:
self.logger.info("已滑动到列表顶部,列表内容不变")
return False
return True
except Exception as e:
self.logger.error(f"滑动列表失败: {str(e)}")
return False
def get_current_items(self):
"""获取当前页面中的所有项目文本"""
try:
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
item_texts = []
for item in items:
try:
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
if title_element and title_element.text:
item_texts.append(title_element.text)
except NoSuchElementException:
continue
return item_texts
except Exception as e:
self.logger.error(f"获取当前项目失败: {str(e)}")
return []
def click_item_by_text(self, text):
"""点击指定文本的项目"""
try:
# 查找包含指定文本的项目
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
for item in items:
try:
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
if title_element and title_element.text == text:
title_element.click()
self.logger.info(f"已点击项目: {text}")
return True
except NoSuchElementException:
continue
self.logger.warning(f"未找到可点击的项目: {text}")
return False
except Exception as e:
self.logger.error(f"点击项目失败: {str(e)}")
return False
def find_keyword(self, fixed_filename):
"""查找指定关键词并点击,支持向下和向上滑动查找"""
try:
if not check_session_valid(self.driver, self.device_id):
self.logger.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
if not reconnect_driver(self.device_id, self.driver):
self.logger.error(f"设备 {self.device_id} 驱动重连失败")
# 等待线路列表容器出现
self.wait.until(
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
)
self.logger.info("线路列表容器已找到")
max_scroll_attempts = 100 # 最大滚动尝试次数
scroll_count = 0
previous_items = set() # 记录前一次获取的项目集合,用于检测是否到达边界
# 首先尝试向下滑动查找
while scroll_count < max_scroll_attempts:
# 获取当前页面中的所有项目
current_items = self.get_current_items()
# self.logger.info(f"当前页面找到 {len(current_items)} 个项目: {current_items}")
# 检查目标文件是否在当前页面中
if fixed_filename in current_items:
self.logger.info(f"找到目标文件: {fixed_filename}")
# 点击目标文件
if self.click_item_by_text(fixed_filename):
return True
else:
self.logger.error(f"点击目标文件失败: {fixed_filename}")
return False
# 检查是否到达底部:连续两次获取的项目相同
if current_items == previous_items and len(current_items) > 0:
self.logger.info("连续两次获取的项目相同,已到达列表底部")
break
# 更新前一次项目集合
previous_items = current_items.copy()
# 向下滑动列表以加载更多项目
if not self.scroll_list(direction="down"):
self.logger.error("向下滑动列表失败")
return False
scroll_count += 1
self.logger.info(f"{scroll_count} 次向下滑动,继续查找...")
# 如果向下滑动未找到,尝试向上滑动查找
self.logger.info("向下滑动未找到目标,开始向上滑动查找")
# 重置滚动计数
scroll_count = 0
while scroll_count < max_scroll_attempts:
# 向上滑动列表
# 如果返回False说明已经滑动到顶
if not self.scroll_list(direction="up"):
# 检查是否是因为滑动到顶而返回False
if "已滑动到列表顶部" in self.logger.handlers[0].buffer[-1].message:
self.logger.info("已滑动到列表顶部,停止向上滑动")
break
else:
self.logger.error("向上滑动列表失败")
return False
# 获取当前页面中的所有项目
current_items = self.get_current_items()
# self.logger.info(f"向上滑动后找到 {len(current_items)} 个项目: {current_items}")
# 检查目标文件是否在当前页面中
if fixed_filename in current_items:
self.logger.info(f"找到目标文件: {fixed_filename}")
# 点击目标文件
if self.click_item_by_text(fixed_filename):
return True
else:
self.logger.error(f"点击目标文件失败: {fixed_filename}")
return False
scroll_count += 1
self.logger.info(f"{scroll_count} 次向上滑动,继续查找...")
self.logger.warning(f"经过 {max_scroll_attempts * 2} 次滑动仍未找到目标文件")
return False
except TimeoutException:
self.logger.error("等待线路列表元素超时")
return False
except Exception as e:
self.logger.error(f"查找关键词时出错: {str(e)}")
return False
def handle_measurement_dialog(self):
"""处理测量弹窗 - 选择继续测量"""
try:
self.logger.info("检查测量弹窗...")
# 直接尝试点击"继续测量"按钮
continue_btn = WebDriverWait(self.driver, 2).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/measure_continue_btn"))
)
continue_btn.click()
self.logger.info("已点击'继续测量'按钮")
return True
except TimeoutException:
self.logger.info("未找到继续测量按钮,可能没有弹窗")
return True # 没有弹窗也认为是成功的
except Exception as e:
self.logger.error(f"点击继续测量按钮时出错: {str(e)}")
return False
# 检查有没有平差处理按钮
def check_apply_btn(self):
"""检查是否有平差处理按钮"""
try:
apply_btn = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn"))
)
if apply_btn.is_displayed():
logging.info("进入平差页面")
else:
self.logger.info("没有找到'平差处理'按钮")
return True
except TimeoutException:
self.logger.info("未找到平差处理按钮")
return False # 没有弹窗也认为是成功的
except Exception as e:
self.logger.error(f"点击平差处理按钮时出错: {str(e)}")
return False
def get_line_end_time(self, line_code):
"""
从全局字典中获取线路编码对应的结束时间
参数:
line_code: 线路编码
返回:
tuple: (date_str, time_str) 日期和时间字符串
"""
if line_code in global_variable.get_line_time_mapping_dict():
end_time = global_variable.get_line_time_mapping_dict()[line_code]
date_str = end_time.strftime("%Y-%m-%d")
time_str = end_time.strftime("%H:%M:%S")
return (date_str, time_str)
else:
self.logger.warning(f"未找到线路编码 {line_code} 的结束时间")
return (None, None)
def show_line_time_mapping(self):
"""
显示当前全局字典中的所有线路编码和时间
"""
self.logger.info("\n当前全局字典内容:")
if global_variable.get_line_time_mapping_dict():
for line_code, end_time in sorted(global_variable.get_line_time_mapping_dict().items()):
date_str = end_time.strftime("%Y-%m-%d")
time_str = end_time.strftime("%H:%M:%S")
self.logger.info(f" {line_code}: {date_str} {time_str}")
else:
self.logger.info(" 全局字典为空")
self.logger.info(f"总计: {len(global_variable.get_line_time_mapping_dict())} 条记录\n")
def clear_line_time_mapping(self):
"""
清空全局字典
"""
global_variable.get_line_time_mapping_dict().clear()
self.logger.info("已清空全局字典")
def set_device_time(self, device_id, time_str=None, date_str=None, disable_auto_sync=True):
"""
通过ADB设置设备时间带管理员权限
参数:
device_id: 设备ID
time_str: 时间字符串,格式 "HH:MM:SS" (例如: "14:30:00")
date_str: 日期字符串,格式 "YYYY-MM-DD" (例如: "2024-10-15")
disable_auto_sync: 是否禁用自动时间同步(防止设置的时间被网络时间覆盖)
返回:
bool: 操作是否成功
"""
try:
if time_str is None and date_str is None:
return True
# 首先尝试获取设备的root权限
self.logger.info(f"尝试获取设备 {device_id} 的root权限...")
root_result = subprocess.run(
["adb", "-s", device_id, "root"],
capture_output=True,
text=True,
timeout=10
)
# 检查root权限获取是否成功有些设备可能返回非0但实际已获取权限
if root_result.returncode != 0:
self.logger.warning(f"获取root权限返回非0状态码但继续尝试操作: {root_result.stderr.strip()}")
now = datetime.now()
hour, minute, second = map(int, (time_str or f"{now.hour}:{now.minute}:{now.second}").split(":"))
year, month, day = map(int, (date_str or f"{now.year}-{now.month}-{now.day}").split("-"))
# 禁用自动同步
if disable_auto_sync:
# 使用su命令以root权限执行设置
subprocess.run(
["adb", "-s", device_id, "shell", "su", "-c",
"settings put global auto_time 0"],
timeout=5
)
subprocess.run(
["adb", "-s", device_id, "shell", "su", "-c",
"settings put global auto_time_zone 0"],
timeout=5
)
# 优先尝试旧格式 (MMDDhhmmYYYY.ss)
adb_time_str_old = f"{month:02d}{day:02d}{hour:02d}{minute:02d}{year:04d}.{second:02d}"
cmd_old = [
"adb", "-s", device_id, "shell", "su", "-c",
f"date {adb_time_str_old}"
]
result = subprocess.run(cmd_old, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
self.logger.warning(f"旧格式失败,尝试新格式设置日期时间")
# 尝试新格式Toybox使用su -c确保以root权限执行
adb_time_str_new = f"{year:04d}{month:02d}{day:02d}.{hour:02d}{minute:02d}{second:02d}"
cmd_new = [
"adb", "-s", device_id, "shell", "su", "-c",
f"date {adb_time_str_new}"
]
result = subprocess.run(cmd_new, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
self.logger.info(f"设备 {device_id} 时间设置成功: {year}-{month}-{day} {hour}:{minute}:{second}")
return True
else:
self.logger.error(f"设备 {device_id} 设置时间失败: {result.stderr.strip()}")
return False
except subprocess.TimeoutExpired:
self.logger.error(f"设备 {device_id} 设置时间命令执行超时")
return False
except Exception as e:
self.logger.error(f"设备 {device_id} 设置时间时发生异常: {str(e)}")
return False
def disable_wifi(self, device_id):
"""
通过ADB关闭设备WiFi
返回:
bool: 操作是否成功
"""
try:
# 关闭WiFi
cmd_disable_wifi = [
"adb", "-s", device_id,
"shell", "svc", "wifi", "disable"
]
result = subprocess.run(cmd_disable_wifi, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
self.logger.info(f"设备 {device_id} WiFi已关闭")
time.sleep(1) # 等待WiFi完全关闭
return True
else:
self.logger.error(f"设备 {device_id} 关闭WiFi失败: {result.stderr}")
return False
except subprocess.TimeoutExpired:
self.logger.error(f"设备 {device_id} 关闭WiFi命令执行超时")
cmd_check_wifi = [
"adb", "-s", device_id,
"shell", "settings get global wifi_on"
]
check_result = subprocess.run(cmd_check_wifi, capture_output=True, text=True, timeout=5)
wifi_state = check_result.stdout.strip()
if wifi_state == "0":
self.logger.info(f"设备 {device_id} WiFi已关闭校验通过")
return True
return False
except Exception as e:
self.logger.error(f"设备 {device_id} 关闭WiFi时发生错误: {str(e)}")
return False
def enable_wifi(self, device_id):
"""
通过ADB打开设备WiFi
返回:
bool: 操作是否成功
"""
try:
# 打开WiFi
cmd_enable_wifi = [
"adb", "-s", device_id,
"shell", "svc", "wifi", "enable"
]
result = subprocess.run(cmd_enable_wifi, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
self.logger.info(f"设备 {device_id} WiFi已打开")
time.sleep(3) # 等待WiFi完全连接
return True
else:
self.logger.error(f"设备 {device_id} 打开WiFi失败: {result.stderr}")
return False
except subprocess.TimeoutExpired:
self.logger.error(f"设备 {device_id} 打开WiFi命令执行超时")
return False
except Exception as e:
self.logger.error(f"设备 {device_id} 打开WiFi时发生错误: {str(e)}")
return False
def get_current_time(self, device_id):
"""
获取设备当前时间
返回:
str: 设备当前时间字符串如果获取失败则返回None
"""
try:
cmd_get_time = [
"adb", "-s", device_id,
"shell", "date"
]
result = subprocess.run(cmd_get_time, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
current_time = result.stdout.strip()
self.logger.info(f"设备 {device_id} 当前时间: {current_time}")
return current_time
else:
self.logger.error(f"设备 {device_id} 获取时间失败: {result.stderr}")
return None
except subprocess.TimeoutExpired:
self.logger.error(f"设备 {device_id} 获取时间命令执行超时")
return None
except Exception as e:
self.logger.error(f"设备 {device_id} 获取时间时发生错误: {str(e)}")
return None
def check_wifi_status(self, device_id):
"""
检查设备WiFi状态
返回:
str: "enabled"表示已开启, "disabled"表示已关闭, None表示获取失败
"""
try:
cmd_check_wifi = [
"adb", "-s", device_id,
"shell", "dumpsys", "wifi", "|", "grep", "Wi-Fi"
]
result = subprocess.run(cmd_check_wifi, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
wifi_status = result.stdout.strip()
if "enabled" in wifi_status.lower():
self.logger.info(f"设备 {device_id} WiFi状态: 已开启")
return "enabled"
elif "disabled" in wifi_status.lower():
self.logger.info(f"设备 {device_id} WiFi状态: 已关闭")
return "disabled"
else:
self.logger.warning(f"设备 {device_id} 无法确定WiFi状态: {wifi_status}")
return None
else:
# 尝试另一种方法检查WiFi状态
cmd_check_wifi_alt = [
"adb", "-s", device_id,
"shell", "settings", "get", "global", "wifi_on"
]
result_alt = subprocess.run(cmd_check_wifi_alt, capture_output=True, text=True, timeout=10)
if result_alt.returncode == 0:
wifi_on = result_alt.stdout.strip()
if wifi_on == "1":
self.logger.info(f"设备 {device_id} WiFi状态: 已开启")
return "enabled"
elif wifi_on == "0":
self.logger.info(f"设备 {device_id} WiFi状态: 已关闭")
return "disabled"
else:
self.logger.warning(f"设备 {device_id} 无法确定WiFi状态: {wifi_on}")
return None
else:
self.logger.error(f"设备 {device_id} 检查WiFi状态失败: {result_alt.stderr}")
return None
except subprocess.TimeoutExpired:
self.logger.error(f"设备 {device_id} 检查WiFi状态命令执行超时")
return None
except Exception as e:
self.logger.error(f"设备 {device_id} 检查WiFi状态时发生错误: {str(e)}")
return None
def take_screenshot(self, filename_prefix="screenshot", date_str=None, time_str=None):
"""
通过Appium驱动截取设备屏幕
参数:
filename_prefix: 截图文件前缀
返回:
bool: 操作是否成功
"""
try:
# 获取项目名称
project_name = global_variable.get_current_project_name() or "默认项目"
# 获取当前日期(如果没有提供)
if not date_str:
date_str = datetime.now().strftime("%Y%m%d")
# 获取当前时间(如果没有提供),并确保格式合法(不含冒号)
if not time_str:
time_str = datetime.now().strftime("%H%M%S")
else:
# 移除时间中的冒号,确保文件名合法
time_str = time_str.replace(":", "")
# 创建D盘下的截图目录结构D:\uploadInfo\picture\项目名\年月日
screenshots_dir = os.path.join("D:\\", "uploadInfo", "picture", project_name, date_str)
# 确保目录存在
try:
os.makedirs(screenshots_dir, exist_ok=True)
self.logger.info(f"截图目录: {screenshots_dir}")
except Exception as dir_error:
self.logger.error(f"创建截图目录失败: {str(dir_error)}")
return False
line_code = global_variable.get_upload_breakpoint_dict().get(filename_prefix)
if not line_code:
self.logger.error(f"未找到与断点名称 {filename_prefix} 对应的线路编码")
line_code = "unknown"
# 截图保存
screenshot_file = os.path.join(
screenshots_dir,
f"{line_code}_{filename_prefix}_{time_str}.png"
)
# 尝试保存截图
try:
success = self.driver.save_screenshot(screenshot_file)
if success:
self.logger.info(f"截图已保存: {screenshot_file}")
# 验证文件是否真的存在
if os.path.exists(screenshot_file):
self.logger.info(f"截图文件验证存在: {screenshot_file}")
else:
self.logger.warning(f"截图文件保存成功但验证不存在: {screenshot_file}")
return True
else:
self.logger.error(f"Appium截图保存失败: {screenshot_file}")
return False
except Exception as save_error:
self.logger.error(f"保存截图时发生错误: {str(save_error)}")
return False
except Exception as e:
self.logger.error(f"截图时发生错误: {str(e)}")
return False
def write_screenshot_status(self, breakpoint_name, success=True):
"""
将截图状态写入D盘的txt文件
参数:
breakpoint_name: 断点名称
success: 截图是否成功
返回:
bool: 操作是否成功
"""
try:
# D盘txt文件路径
txt_file_path = "D:\\uploadInfo\\screenshot_status.txt"
# 获取当前时间
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 状态信息
status = "成功" if success else "失败"
status_line = f"{current_time} - {breakpoint_name} - 截图{status}\n"
# 追加写入文件
with open(txt_file_path, 'a', encoding='utf-8') as f:
f.write(status_line)
self.logger.info(f"截图状态已写入文件: {txt_file_path} - {status_line.strip()}")
return True
except Exception as e:
self.logger.error(f"写入截图状态文件时发生错误: {str(e)}")
return False
def update_file_status(self, username, from_status, to_status):
"""
安全地更新 time.txt 中该用户的状态
例如: 将 'true' 改为 'running', 或将 'running' 改为 'done'
"""
TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
if not os.path.exists(TIME_FILE_PATH):
return False
success = False
file_lock = threading.Lock()
with file_lock:
try:
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
# new_lines = []
# for line in lines:
# clean_line = line.strip()
# # 匹配逻辑:包含用户名 且 以 from_status 结尾
# if f" {username} " in line and clean_line.endswith(from_status):
# line = line.replace(from_status, to_status)
# success = True
# new_lines.append(line)
new_lines = []
for line in lines:
# 使用正则确保精准匹配用户名和结尾状态
# 匹配规则:行内包含该用户名,且该行以 from_status 结尾
if re.search(rf'\b{username}\b', line) and line.strip().endswith(from_status):
# 只替换行尾的那个状态词
line = re.sub(rf'{from_status}$', to_status, line.rstrip()) + '\n'
success = True
new_lines.append(line)
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
f.writelines(new_lines)
if success:
print(f"📝 [文件更新] 用户 {username}: {from_status} -> {to_status}")
return success
except Exception as e:
print(f"❌ 更新文件状态失败 ({username}): {e}")
return False
def update_upload_info_status(self, status):
"""
更新D:/uploadInfo文件夹下的time.txt文件的状态
参数:
status: 状态值,如"ok""again"
返回:
bool: 操作是否成功
"""
try:
# time.txt文件路径
time_file_path = "D:\\uploadInfo\\time.txt"
# 确保文件夹存在
os.makedirs(os.path.dirname(time_file_path), exist_ok=True)
# 写入状态
with open(time_file_path, 'w', encoding='utf-8') as f:
f.write(status)
self.logger.info(f"已更新上传状态文件: {time_file_path} -> {status}")
return True
except Exception as e:
self.logger.error(f"更新上传状态文件时发生错误: {str(e)}")
return False
def handle_confirmation_dialog(self, device_id, timeout=2):
"""
处理确认弹窗,点击""按钮
Args:
device_id: 设备ID
timeout: 等待弹窗的超时时间
Returns:
bool: 是否成功处理弹窗
"""
# 等待弹窗出现最多等待2秒
try:
dialog_message = WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']"))
)
self.logger.info(f"设备 {device_id} 检测到确认弹窗")
# 查找并点击"是"按钮
confirm_button = self.driver.find_element(
AppiumBy.XPATH,
"//android.widget.Button[@text='' and @resource-id='android:id/button1']"
)
if confirm_button.is_displayed() and confirm_button.is_enabled():
self.logger.info(f"设备 {device_id} 点击确认弹窗的''按钮")
confirm_button.click()
time.sleep(0.5)
return True
else:
self.logger.error(f"设备 {device_id} ''按钮不可点击")
return False
except TimeoutException:
# 超时未找到弹窗,认为没有弹窗,返回成功
self.logger.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功")
return True
def click_back_button(self, device_id):
"""点击手机系统返回按钮"""
try:
self.driver.back()
self.logger.info("已点击手机系统返回按钮")
return True
except Exception as e:
self.logger.error(f"点击手机系统返回按钮失败: {str(e)}")
return False
def handle_back_button_with_confirmation(self, device_id, timeout=10):
"""
处理返回按钮的确认弹窗
Args:
device_id: 设备ID
timeout: 等待弹窗的超时时间
Returns:
bool: 是否成功处理返回确认弹窗
"""
logging.info(f"进入handle_back_button_with_confirmation函数")
try:
self.logger.info(f"设备 {device_id} 等待返回确认弹窗出现")
start_time = time.time()
while time.time() - start_time < timeout:
try:
# 检查是否存在确认弹窗 - 使用多种定位策略提高兼容性
dialog_selectors = [
"//android.widget.TextView[@text='是否退出测量界面?']",
"//android.widget.TextView[contains(@text, '退出测量界面')]",
"//android.widget.TextView[contains(@text, '是否退出')]"
]
dialog_message = None
for selector in dialog_selectors:
try:
dialog_message = self.driver.find_element(AppiumBy.XPATH, selector)
if dialog_message.is_displayed():
break
except NoSuchElementException:
continue
if dialog_message and dialog_message.is_displayed():
self.logger.info(f"设备 {device_id} 检测到返回确认弹窗")
# 查找并点击"是"按钮 - 使用多种定位策略
confirm_selectors = [
"//android.widget.Button[@text='' and @resource-id='android:id/button1']",
"//android.widget.Button[@text='']",
"//android.widget.Button[@resource-id='android:id/button1']",
"//android.widget.Button[contains(@text, '')]"
]
confirm_button = None
for selector in confirm_selectors:
try:
confirm_button = self.driver.find_element(AppiumBy.XPATH, selector)
if confirm_button.is_displayed() and confirm_button.is_enabled():
break
except NoSuchElementException:
continue
if confirm_button and confirm_button.is_displayed() and confirm_button.is_enabled():
self.logger.info(f"设备 {device_id} 点击确认弹窗的''按钮")
confirm_button.click()
time.sleep(0.5)
# 验证弹窗是否消失
try:
self.driver.find_element(AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']")
self.logger.warning(f"设备 {device_id} 确认弹窗可能未正确关闭")
return False
except NoSuchElementException:
self.logger.info(f"设备 {device_id} 确认弹窗已成功关闭")
return True
else:
self.logger.error(f"设备 {device_id} 未找到可点击的''按钮")
return False
except NoSuchElementException:
# 弹窗未找到,继续等待
pass
except Exception as e:
self.logger.warning(f"设备 {device_id} 查找确认弹窗时出现异常: {str(e)}")
time.sleep(0.5)
self.logger.error(f"设备 {device_id} 等待返回确认弹窗超时")
return False
except Exception as e:
self.logger.error(f"设备 {device_id} 处理返回确认弹窗时发生错误: {str(e)}")
return False
def handle_adjustment_result_dialog(self):
"""处理平差结果确认弹窗"""
try:
self.logger.info("开始检测平差结果弹窗")
# 等待弹窗出现最多等待5秒
warning_dialog = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
)
# 验证弹窗内容
alert_title = warning_dialog.find_element(AppiumBy.ID, "android:id/alertTitle")
alert_message = warning_dialog.find_element(AppiumBy.ID, "android:id/message")
self.logger.info(f"检测到弹窗 - 标题: {alert_title.text}, 消息: {alert_message.text}")
# 确认是目标弹窗
if "警告" in alert_title.text and "是否保留测量成果" in alert_message.text:
self.logger.info("确认是平差结果确认弹窗")
# 点击"是 保留成果"按钮
yes_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button1")
if yes_button.text == "是 保留成果":
yes_button.click()
self.logger.info("已点击'是 保留成果'按钮")
# 等待弹窗消失
WebDriverWait(self.driver, 5).until(
EC.invisibility_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
)
self.logger.info("弹窗已关闭")
return True
else:
self.logger.error(f"按钮文本不匹配,期望'是 保留成果',实际: {yes_button.text}")
return False
else:
self.logger.warning("弹窗内容不匹配,不是目标弹窗")
return False
except TimeoutException:
self.logger.info("未检测到平差结果弹窗,继续流程")
return True # 没有弹窗也是正常情况
except Exception as e:
self.logger.error(f"处理平差结果弹窗时出错: {str(e)}")
return False
def check_measurement_list(self, device_id):
"""
检查是否存在测量列表
Args:
device_id: 设备ID
Returns:
bool: 如果不存在测量列表返回True存在返回False
"""
try:
# 等待线路列表容器出现
self.wait.until(
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
)
self.logger.info("线路列表容器已找到")
# 如果存在MEASURE_LIST_ID说明有测量列表不需要执行后续步骤
self.logger.info(f"设备 {device_id} 存在测量列表,无需执行后续返回操作")
return False
except TimeoutException:
# 等待超时,说明没有测量列表
self.logger.info(f"设备 {device_id} 未找到测量列表,可以继续执行后续步骤")
return True
except Exception as e:
self.logger.error(f"设备 {device_id} 检查测量列表时发生错误: {str(e)}")
return True
def handle_back_navigation(self, breakpoint_name, device_id):
"""
完整的返回导航处理流程
Args:
breakpoint_name: 断点名称
device_id: 设备ID
Returns:
bool: 整个返回导航流程是否成功
"""
try:
# time.sleep(2)
self.logger.info(f"已点击平差处理按钮,检查是否在测量页面")
# 检测是否存在测量列表(修正逻辑)
has_measurement_list = self.check_measurement_list(device_id)
if not has_measurement_list:
self.logger.info(f"设备 {device_id} 存在测量列表,重新执行平差流程")
# 把断点名称给find_keyword
if not self.find_keyword(breakpoint_name):
self.logger.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名")
return False
if not self.handle_measurement_dialog():
self.logger.error(f"设备 {device_id} 处理测量弹窗失败")
return False
if not self.check_apply_btn():
self.logger.error(f"设备 {device_id} 检查平差处理按钮失败")
return False
# 滑动列表到底部
if not self.scroll_list_to_bottom(device_id):
self.logger.error(f"设备 {device_id} 下滑列表到底部失败")
return False
# 2. 点击最后一个spinner
if not self.click_last_spinner_with_retry(device_id):
self.logger.error(f"设备 {device_id} 点击最后一个spinner失败")
return False
# 3. 再下滑一次
if not self.scroll_down_once(device_id):
self.logger.warning(f"设备 {device_id} 再次下滑失败,但继续执行")
# 4. 点击平差处理按钮
if not self.click_adjustment_button(device_id):
self.logger.error(f"设备 {device_id} 点击平差处理按钮失败")
return False
self.logger.info(f"重新选择断点并点击平差处理按钮成功")
return True
else:
self.logger.info(f"不在测量页面,继续执行后续返回操作")
return True
except Exception as e:
self.logger.error(f"设备 {device_id} 处理返回导航时发生错误: {str(e)}")
return False
def execute_back_navigation_steps(self, device_id):
"""
执行实际的返回导航步骤
Args:
device_id: 设备ID
Returns:
bool: 导航是否成功
"""
try:
# 1. 首先点击返回按钮
if not self.click_back_button(device_id):
self.logger.error(f"设备 {device_id} 点击返回按钮失败")
return False
# 2. 处理返回确认弹窗
self.logger.info(f"已点击返回按钮,等待处理返回确认弹窗")
if not self.handle_confirmation_dialog(device_id):
self.logger.error(f"设备 {device_id} 处理返回确认弹窗失败")
return False
# 3. 验证是否成功返回到上一页面
time.sleep(0.5) # 等待页面跳转完成
# 可以添加页面验证逻辑,比如检查是否返回到预期的页面
# 这里可以根据实际应用添加特定的页面元素验证
self.logger.info(f"设备 {device_id} 返回导航流程完成")
return True
except Exception as e:
self.logger.error(f"设备 {device_id} 执行返回导航步骤时发生错误: {str(e)}")
return False
def scroll_list_to_bottom(self, device_id, max_swipes=60):
"""
下滑列表到最底端
Args:
device_id: 设备ID
max_swipes: 最大下滑次数
Returns:
bool: 是否滑动到底部
"""
try:
self.logger.info(f"设备 {device_id} 开始下滑列表到底部")
# 获取列表元素
list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list")
same_content_count = 0
# 初始化第一次的子元素文本
initial_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView")
current_child_texts = "|".join([
elem.text.strip() for elem in initial_child_elements
if elem.text and elem.text.strip()
])
for i in range(max_swipes):
# 执行下滑操作
self.driver.execute_script("mobile: scrollGesture", {
'elementId': list_view.id,
'direction': 'down',
'percent': 0.8,
'duration': 500
})
# time.sleep(0.5)
# 获取滑动后的子元素文本
new_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView")
new_child_texts = "|".join([
elem.text.strip() for elem in new_child_elements
if elem.text and elem.text.strip()
])
# 判断内容是否变化若连续3次相同认为到达底部
if new_child_texts == current_child_texts:
same_content_count += 1
if same_content_count >= 2:
self.logger.info(f"设备 {device_id} 列表已滑动到底部,共滑动 {i+1}")
return True
else:
same_content_count = 0 # 内容变化,重置计数
current_child_texts = new_child_texts # 更新上一次内容
self.logger.debug(f"设备 {device_id}{i+1} 次下滑完成,当前子元素文本: {new_child_texts[:50]}...") # 打印部分文本
self.logger.warning(f"设备 {device_id} 达到最大下滑次数 {max_swipes},可能未完全到底部")
return True
except Exception as e:
self.logger.error(f"设备 {device_id} 下滑列表时发生错误: {str(e)}")
return False
def click_last_spinner_with_retry(self, device_id, max_retries=2):
"""带重试机制的点击方法"""
for attempt in range(max_retries):
try:
if self.click_last_spinner(device_id):
return True
self.logger.warning(f"设备 {device_id}{attempt + 1}次点击失败,准备重试")
time.sleep(0.5) # 重试前等待
except Exception as e:
self.logger.error(f"设备 {device_id}{attempt + 1}次尝试失败: {str(e)}")
self.logger.error(f"设备 {device_id} 所有重试次数已用尽")
return False
def click_last_spinner(self, device_id):
"""
点击最后一个spinner
Args:
device_id: 设备ID
Returns:
bool: 是否成功点击
"""
try:
self.logger.info(f"设备 {device_id} 查找最后一个spinner")
# 查找所有的spinner元素
spinners = self.driver.find_elements(AppiumBy.ID, "com.bjjw.cjgc:id/spinner")
if not spinners:
self.logger.error(f"设备 {device_id} 未找到任何spinner元素")
return False
# 获取最后一个spinner
last_spinner = spinners[-1]
if not (last_spinner.is_displayed() and last_spinner.is_enabled()):
self.logger.error(f"设备 {device_id} 最后一个spinner不可点击")
return False
# 点击操作
self.logger.info(f"设备 {device_id} 点击最后一个spinner")
last_spinner.click()
# 执行额外一次下滑操作
self.scroll_down_once(device_id)
max_retries = 3 # 最大重试次数
retry_count = 0
wait_timeout = 5 # 增加等待时间到5秒
while retry_count < max_retries:
try:
# 确保device_id正确设置使用全局变量作为备用
if not hasattr(self, 'device_id') or not self.device_id:
# 优先使用传入的device_id其次使用全局变量
self.device_id = device_id if device_id else global_variable.get_device_id()
# 使用self.device_id确保有默认值
actual_device_id = self.device_id if self.device_id else global_variable.get_device_id()
if not check_session_valid(self.driver, actual_device_id):
self.logger.warning(f"设备 {actual_device_id} 会话无效,尝试重新连接驱动...")
try:
# 使用正确的设备ID进行重连
new_driver, new_wait = reconnect_driver(actual_device_id, self.driver)
if new_driver:
self.driver = new_driver
self.wait = new_wait
self.logger.info(f"设备 {actual_device_id} 驱动重连成功")
else:
self.logger.error(f"设备 {actual_device_id} 驱动重连失败")
retry_count += 1
continue
except Exception as e:
self.logger.error(f"设备 {actual_device_id} 驱动重连异常: {str(e)}")
retry_count += 1
continue
# 点击spinner如果是重试需要重新获取元素
if retry_count > 0:
spinners = self.driver.find_elements(AppiumBy.CLASS_NAME, "android.widget.Spinner")
if not spinners:
self.logger.error(f"设备 {device_id} 未找到spinner元素")
retry_count += 1
continue
last_spinner = spinners[-1]
if not (last_spinner.is_displayed() and last_spinner.is_enabled()):
self.logger.error(f"设备 {device_id} spinner不可点击")
retry_count += 1
continue
self.logger.info(f"设备 {device_id} 重新点击spinner")
last_spinner.click()
# 重试时也执行下滑操作
self.scroll_down_once(device_id)
# 等待下拉菜单出现增加等待时间到5秒
wait = WebDriverWait(self.driver, wait_timeout)
detail_show = wait.until(
EC.presence_of_element_located((AppiumBy.ID, "com.bjjw.cjgc:id/detailshow"))
)
if detail_show.is_displayed():
self.logger.info(f"设备 {device_id} spinner点击成功下拉菜单已展开")
return True
else:
self.logger.error(f"设备 {device_id} 下拉菜单未显示")
retry_count += 1
continue
except Exception as wait_error:
error_msg = str(wait_error)
self.logger.error(f"设备 {device_id} 等待下拉菜单超时 (第{retry_count+1}次尝试): {error_msg}")
# 检查是否是连接断开相关的错误
if not check_session_valid(self.driver, self.device_id):
self.logger.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
# if any(keyword in error_msg for keyword in ['socket hang up', 'Could not proxy command']):
# self.logger.warning(f"设备 {device_id} 检测到连接相关错误,尝试重连...")
if not reconnect_driver(self.device_id, self.driver):
self.logger.error(f"设备 {device_id} 驱动重连失败")
retry_count += 1
if retry_count < max_retries:
self.logger.info(f"设备 {device_id} 将在1秒后进行第{retry_count+1}次重试")
time.sleep(1) # 等待1秒后重试
self.logger.error(f"设备 {device_id} 经过{max_retries}次重试后仍无法展开下拉菜单")
return False
except Exception as e:
self.logger.error(f"设备 {device_id} 点击最后一个spinner时发生错误: {str(e)}")
return False
def scroll_down_once(self, device_id):
"""
再次下滑一次
Args:
device_id: 设备ID
Returns:
bool: 是否成功下滑
"""
try:
self.logger.info(f"设备 {device_id} 执行额外一次下滑")
# 获取列表元素
list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list")
# 执行下滑操作
self.driver.execute_script("mobile: scrollGesture", {
'elementId': list_view.id,
'direction': 'down',
'percent': 0.5
})
time.sleep(0.2)
self.logger.info(f"设备 {device_id} 额外下滑完成")
return True
except Exception as e:
self.logger.error(f"设备 {device_id} 额外下滑时发生错误: {str(e)}")
return False
def click_adjustment_button(self, device_id):
"""
点击平差处理按钮
Args:
device_id: 设备ID
Returns:
bool: 是否成功点击
"""
try:
self.logger.info(f"设备 {device_id} 查找平差处理按钮")
# 查找平差处理按钮
adjustment_button = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn")
# 验证按钮文本
button_text = adjustment_button.text
if "平差处理" not in button_text:
self.logger.warning(f"设备 {device_id} 按钮文本不匹配,期望'平差处理',实际: {button_text}")
if adjustment_button.is_displayed() and adjustment_button.is_enabled():
self.logger.info(f"设备 {device_id} 点击平差处理按钮")
adjustment_button.click()
time.sleep(3) # 等待平差处理完成
return True
else:
self.logger.error(f"设备 {device_id} 平差处理按钮不可点击")
return False
except NoSuchElementException:
self.logger.error(f"设备 {device_id} 未找到平差处理按钮")
return False
except Exception as e:
self.logger.error(f"设备 {device_id} 点击平差处理按钮时发生错误: {str(e)}")
return False
def add_breakpoint_to_upload_list(self, breakpoint_name, line_num):
"""添加平差完成的断点到上传列表和字典"""
if breakpoint_name and breakpoint_name not in global_variable.get_upload_breakpoint_list():
global_variable.get_upload_breakpoint_list().append(breakpoint_name)
global_variable.get_upload_breakpoint_dict()[breakpoint_name] = {
'breakpoint_name': breakpoint_name,
'line_num': line_num
}
logging.info(f"成功添加断点 '{breakpoint_name}' 到上传列表")
logging.info(f"断点详细信息: 线路编码={line_num}")
return True
else:
logging.warning(f"断点名为空或已存在于列表中")
return False
def handle_confirmation_dialog_save(self, device_id, timeout=2):
"""
处理确认弹窗,点击""按钮
Args:
device_id: 设备ID
timeout: 等待弹窗的超时时间
Returns:
bool: 是否成功处理弹窗
"""
# 等待弹窗出现最多等待2秒
try:
dialog_message = WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((AppiumBy.ID, "android:id/content"))
)
self.logger.info(f"设备 {device_id} 检测到确认弹窗")
# 查找并点击"是"按钮
confirm_button = self.driver.find_element(
AppiumBy.ID, "android:id/button1"
)
if confirm_button.is_displayed() and confirm_button.is_enabled():
self.logger.info(f"设备 {device_id} 点击确认弹窗的''按钮")
confirm_button.click()
time.sleep(0.5)
return True
else:
self.logger.error(f"设备 {device_id} ''按钮不可点击")
return False
except TimeoutException:
# 超时未找到弹窗,认为没有弹窗,返回成功
self.logger.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功")
return True
def load_line_time_mapping_dict(self, filename="20251022.1.CZSCZQ-3fhg0410.txt", log_directory="D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510", poll_interval=120, max_wait_time=18000):
"""
加载指定文件中的线路编码和时间到全局字典
参数:
filename: 文件名 (例如: "20251022.1.CZSCZQ-3fhg0410.txt")
log_directory: 文件所在目录
poll_interval: 轮询间隔时间默认2分钟
max_wait_time: 最大等待时间默认5小时
"""
try:
current_year_month = datetime.now().strftime("%Y%m")
# 将Logs\\后的内容替换为实际年月
log_directory = log_directory.split("Logs\\")[0] + "Logs\\" + current_year_month
# 获取当前年月日例如20251023
current_date = datetime.now().strftime("%Y%m%d")
# 拼接格式:年月日.1.用户名.txt
filename = f"{current_date}.1.{global_variable.get_username()}.txt"
file_path = os.path.join(log_directory, filename)
# 轮询等待文件出现
start_time = time.time()
wait_count = 0
while not os.path.exists(file_path):
wait_count += 1
elapsed_time = time.time() - start_time
if elapsed_time >= max_wait_time:
self.logger.error(f"等待文件超时 ({max_wait_time}秒),文件仍未出现: {file_path}")
return False
self.logger.info(f"{wait_count}次检查 - 文件不存在,等待 {poll_interval} 秒后重试... (已等待 {elapsed_time:.0f} 秒)")
time.sleep(poll_interval)
# 文件存在,继续处理
self.logger.info(f"文件已找到,正在加载: {file_path}")
# 临时字典,用于存储当前文件中的线路编码和时间
temp_mapping = {}
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# 解析日志行格式: "2025-10-22 16:22:50.171, INFO, 已修改线路时间L205413, 结束时间2025-10-22 09:18:17.460"
if "已修改线路时间" in line and "结束时间" in line:
# 提取线路编码
line_code_match = re.search(r'已修改线路时间[:]\s*([^,]+)', line)
# 提取结束时间
end_time_match = re.search(r'结束时间[:]\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', line)
if line_code_match and end_time_match:
line_code = line_code_match.group(1).strip()
end_time_str = end_time_match.group(1)
# 解析日期时间
try:
end_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S")
# 存入临时字典(同一个线路编码,后面的会覆盖前面的,实现取最后一条)
temp_mapping[line_code] = end_time
self.logger.info(f"{line_num}行: 找到线路编码 {line_code} 时间: {end_time}")
except ValueError as e:
self.logger.warning(f"{line_num}行: 解析时间格式错误: {end_time_str}, 错误: {e}")
continue
else:
self.logger.warning(f"{line_num}行: 无法解析线路编码或结束时间")
# 将临时字典的内容更新到全局字典
for line_code, end_time in temp_mapping.items():
global_variable.get_line_time_mapping_dict()[line_code] = end_time
self.logger.info(f"更新全局字典: {line_code} -> {end_time}")
total_wait_time = time.time() - start_time
self.logger.info(f"文件加载完成,等待时间: {total_wait_time:.0f}秒,共处理 {len(temp_mapping)} 条记录")
self.logger.info(f"当前全局字典总数: {len(global_variable.get_line_time_mapping_dict())} 条记录")
return True
except Exception as e:
self.logger.error(f"加载文件 {filename} 时出错: {str(e)}")
return False
def screenshot_page_manager(self, device_id):
"""执行截图页面管理操作"""
try:
# 加载指定文件中的线路编码和时间到全局字典
if not self.load_line_time_mapping_dict("20251022.1.CZSCZQ-3fhg0410.txt", "D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510"):
self.logger.error(f"设备 {device_id} 加载线路时间映射字典失败")
return False
# time.sleep(5)
# 循环检查数据数量是否一致,直到获取到完整数据
retry_count = 0
while True:
# 获取断点列表和线路时间字典的数量
breakpoint_count = len(global_variable.get_upload_breakpoint_dict())
line_time_count = len(global_variable.get_line_time_mapping_dict())
self.logger.info(f"设备 {device_id} 断点列表数量: {breakpoint_count}, 文件中获取的线路时间数量: {line_time_count}")
# 如果断点列表为空,无法比较,直接跳出循环
if breakpoint_count == 0:
self.logger.warning(f"设备 {device_id} 断点列表为空,无法进行数量比较")
break
# 如果数量一致,获取到完整数据,跳出循环
if line_time_count == breakpoint_count:
self.logger.info(f"设备 {device_id} 数据数量一致,已获取完整数据")
break
# 数量不一致,等待三分钟后再次获取文件
retry_count += 1
self.logger.warning(f"设备 {device_id} 数据数量不一致: 断点列表({breakpoint_count}) != 线路时间({line_time_count}),第{retry_count}次重试等待1分钟后重新加载文件")
time.sleep(60) # 等待3分钟
# 重新加载文件
if not self.load_line_time_mapping_dict("20251022.1.CZSCZQ-3fhg0410.txt", "D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510"):
self.logger.error(f"设备 {device_id} 重新加载线路时间映射字典失败")
else:
self.logger.info(f"设备 {device_id} 重新加载完成,新的线路时间数量: {len(global_variable.get_line_time_mapping_dict())}")
# # 禁用WiFi
# if not self.disable_wifi(device_id):
# self.logger.error(f"设备 {device_id} 禁用WiFi失败")
# return False
# 获取GLOBAL_BREAKPOINT_DICT中的断点名称和对应的线路编码
# 检查GLOBAL_BREAKPOINT_DICT是否为空如果为空则初始化一些测试数据
if not global_variable.get_upload_breakpoint_dict():
self.logger.warning("global_variable.get_upload_breakpoint_dict()为空,正在初始化测试数据")
# 添加一些测试断点数据,实际使用时应该从其他地方加载
# 注意这里的值应该是字典与section_mileage_config_page.py中的数据结构保持一致
global_variable.set_breakpoint_dict({
"CZSCZQ-3-康定2号隧道-DK297+201-DK297+199-山区": {
'breakpoint_name': "CZSCZQ-3-康定2号隧道-DK297+201-DK297+199-山区",
'line_num': "L205413"
},
"CZSCZQ-3-康定2号隧道-DK296+701-DK296+699-山区": {
'breakpoint_name': "CZSCZQ-3-康定2号隧道-DK296+701-DK296+699-山区",
'line_num': "L205414"
}
})
# 开始循环
all_success = True
for breakpoint_name in global_variable.get_upload_breakpoint_dict().keys():
self.logger.info(f"开始处理要平差截图的断点 {breakpoint_name}")
# 把断点名称给find_keyword
if not self.find_keyword(breakpoint_name):
self.logger.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名")
all_success = False
continue
if not self.handle_measurement_dialog():
self.logger.error(f"设备 {device_id} 处理测量弹窗失败")
all_success = False
continue
if not self.check_apply_btn():
self.logger.error(f"设备 {device_id} 检查平差处理按钮失败")
all_success = False
self.click_back_button(device_id)
continue
# 根据断点名称在get_upload_breakpoint_dict()中获取线路编码
line_code = global_variable.get_upload_breakpoint_dict().get(breakpoint_name)
if not line_code:
self.logger.error(f"设备 {device_id} 未找到断点 {breakpoint_name} 对应的线路编码")
all_success = False
continue
# 根据线路编码查找对应的时间
date_str, time_str = self.get_line_end_time(line_code)
if not time_str or not date_str:
self.logger.error(f"设备 {device_id} 未找到线路 {line_code} 对应的时间")
all_success = False
continue
# 修改时间
if not self.set_device_time(device_id, time_str, date_str):
self.logger.error(f"设备 {device_id} 设置设备时间失败")
all_success = False
continue
# 滑动列表到底部
if not self.scroll_list_to_bottom(device_id):
self.logger.error(f"设备 {device_id} 下滑列表到底部失败")
all_success = False
continue
# 2. 点击最后一个spinner
if not self.click_last_spinner_with_retry(device_id):
self.logger.error(f"设备 {device_id} 点击最后一个spinner失败")
all_success = False
continue
# 3. 再下滑一次
if not self.scroll_down_once(device_id):
self.logger.warning(f"设备 {device_id} 再次下滑失败,但继续执行")
# 平差处理完成后截图
time.sleep(0.2) # 等待平差处理按钮点击后的界面变化
logging.info("断点保存到上传列表成功,开始截图")
# png_time = date_str + " " + time_str
if not self.take_screenshot(breakpoint_name, date_str, time_str):
self.logger.error(f"设备 {device_id} 截图失败")
self.write_screenshot_status(breakpoint_name, success=False)
all_success = False
else:
self.write_screenshot_status(breakpoint_name, success=True)
# 点击返回按钮并处理弹窗
if not self.execute_back_navigation_steps(device_id):
self.logger.error(f"设备 {device_id} 处理返回按钮确认失败")
all_success = False
# 启用WiFi
# if not self.enable_wifi(device_id):
# self.logger.error(f"设备 {device_id} 启用WiFi失败")
# return False
# 根据截图结果更新time.txt文件状态
status = "ok" if all_success else "true"
# self.update_upload_info_status(status)
username = global_variable.get_username()
self.update_file_status(username, "running", status)
self.logger.info(f"{username} 截图完成状态为 {status}")
self.logger.info(f"设备 {device_id} 截图页面操作执行完成,状态: {status}")
return True
except Exception as e:
self.logger.error(f"设备 {device_id} 执行截图页面操作时出错: {str(e)}")
# 保存错误截图
# error_screenshot_file = os.path.join(
# results_dir,
# f"screenshot_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
# )
# self.driver.save_screenshot(error_screenshot_file)
# self.logger.info(f"错误截图已保存: {error_screenshot_file}")
return False