1577 lines
70 KiB
Python
1577 lines
70 KiB
Python
# screenshot_page.py
|
||
import subprocess
|
||
import logging
|
||
import time
|
||
import re
|
||
import os
|
||
import threading
|
||
from datetime import datetime
|
||
from appium.webdriver.common.appiumby import AppiumBy
|
||
from selenium.common.exceptions import NoSuchElementException, TimeoutException
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
import globals.global_variable as global_variable # 导入全局变量模块
|
||
import globals.ids as ids
|
||
# 导入全局驱动工具函数
|
||
from globals.driver_utils import check_session_valid, reconnect_driver
|
||
import globals.driver_utils as driver_utils
|
||
import globals.global_variable as global_variable # 导入全局变量模块
|
||
|
||
class ScreenshotPage:
|
||
def __init__(self, driver, wait, device_id=None):
|
||
self.driver = driver
|
||
self.wait = WebDriverWait(driver, 2)
|
||
self.device_id = device_id
|
||
self.logger = logging.getLogger(__name__)
|
||
self.all_items = set()
|
||
|
||
def scroll_list(self, direction="down"):
|
||
"""滑动列表以加载更多项目
|
||
|
||
Args:
|
||
direction: 滑动方向,"down"表示向下滑动,"up"表示向上滑动
|
||
|
||
Returns:
|
||
bool: 滑动是否成功执行,对于向上滑动,如果滑动到顶则返回False
|
||
"""
|
||
try:
|
||
# 获取列表容器
|
||
list_container = self.driver.find_element(AppiumBy.ID, ids.MEASURE_LIST_ID)
|
||
|
||
# 计算滑动坐标
|
||
start_x = list_container.location['x'] + list_container.size['width'] // 2
|
||
|
||
if direction == "down":
|
||
# 向下滑动
|
||
start_y = list_container.location['y'] + list_container.size['height'] * 0.95
|
||
end_y = list_container.location['y'] + list_container.size['height'] * 0.05
|
||
self.logger.info("向下滑动列表")
|
||
else:
|
||
# 向上滑动
|
||
# 记录滑动前的项目,用于判断是否滑动到顶
|
||
before_scroll_items = self.get_current_items()
|
||
start_y = list_container.location['y'] + list_container.size['height'] * 0.05
|
||
end_y = list_container.location['y'] + list_container.size['height'] * 0.95
|
||
self.logger.info("向上滑动列表")
|
||
|
||
# 执行滑动
|
||
self.driver.swipe(start_x, start_y, start_x, end_y, 1000)
|
||
|
||
|
||
# 向上滑动时,检查是否滑动到顶
|
||
if direction == "up":
|
||
after_scroll_items = self.get_current_items()
|
||
# 如果滑动后的项目与滑动前的项目相同,说明已经滑动到顶
|
||
if after_scroll_items == before_scroll_items:
|
||
self.logger.info("已滑动到列表顶部,列表内容不变")
|
||
return False
|
||
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"滑动列表失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
def get_current_items(self):
|
||
"""获取当前页面中的所有项目文本"""
|
||
try:
|
||
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
|
||
item_texts = []
|
||
|
||
for item in items:
|
||
try:
|
||
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
|
||
if title_element and title_element.text:
|
||
item_texts.append(title_element.text)
|
||
except NoSuchElementException:
|
||
continue
|
||
|
||
return item_texts
|
||
except Exception as e:
|
||
self.logger.error(f"获取当前项目失败: {str(e)}")
|
||
return []
|
||
|
||
def click_item_by_text(self, text):
|
||
"""点击指定文本的项目"""
|
||
try:
|
||
# 查找包含指定文本的项目
|
||
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
|
||
|
||
for item in items:
|
||
try:
|
||
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
|
||
if title_element and title_element.text == text:
|
||
title_element.click()
|
||
self.logger.info(f"已点击项目: {text}")
|
||
return True
|
||
except NoSuchElementException:
|
||
continue
|
||
|
||
self.logger.warning(f"未找到可点击的项目: {text}")
|
||
return False
|
||
except Exception as e:
|
||
self.logger.error(f"点击项目失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
|
||
def find_keyword(self, fixed_filename):
|
||
"""查找指定关键词并点击,支持向下和向上滑动查找"""
|
||
try:
|
||
if not check_session_valid(self.driver, self.device_id):
|
||
self.logger.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
|
||
if not reconnect_driver(self.device_id, self.driver):
|
||
self.logger.error(f"设备 {self.device_id} 驱动重连失败")
|
||
|
||
# 等待线路列表容器出现
|
||
self.wait.until(
|
||
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
|
||
)
|
||
self.logger.info("线路列表容器已找到")
|
||
|
||
max_scroll_attempts = 100 # 最大滚动尝试次数
|
||
scroll_count = 0
|
||
previous_items = set() # 记录前一次获取的项目集合,用于检测是否到达边界
|
||
|
||
# 首先尝试向下滑动查找
|
||
while scroll_count < max_scroll_attempts:
|
||
# 获取当前页面中的所有项目
|
||
current_items = self.get_current_items()
|
||
# self.logger.info(f"当前页面找到 {len(current_items)} 个项目: {current_items}")
|
||
|
||
# 检查目标文件是否在当前页面中
|
||
if fixed_filename in current_items:
|
||
self.logger.info(f"找到目标文件: {fixed_filename}")
|
||
# 点击目标文件
|
||
if self.click_item_by_text(fixed_filename):
|
||
return True
|
||
else:
|
||
self.logger.error(f"点击目标文件失败: {fixed_filename}")
|
||
return False
|
||
|
||
# 检查是否到达底部:连续两次获取的项目相同
|
||
if current_items == previous_items and len(current_items) > 0:
|
||
self.logger.info("连续两次获取的项目相同,已到达列表底部")
|
||
break
|
||
|
||
# 更新前一次项目集合
|
||
previous_items = current_items.copy()
|
||
|
||
# 向下滑动列表以加载更多项目
|
||
if not self.scroll_list(direction="down"):
|
||
self.logger.error("向下滑动列表失败")
|
||
return False
|
||
|
||
scroll_count += 1
|
||
self.logger.info(f"第 {scroll_count} 次向下滑动,继续查找...")
|
||
|
||
# 如果向下滑动未找到,尝试向上滑动查找
|
||
self.logger.info("向下滑动未找到目标,开始向上滑动查找")
|
||
|
||
# 重置滚动计数
|
||
scroll_count = 0
|
||
|
||
while scroll_count < max_scroll_attempts:
|
||
# 向上滑动列表
|
||
# 如果返回False,说明已经滑动到顶
|
||
if not self.scroll_list(direction="up"):
|
||
# 检查是否是因为滑动到顶而返回False
|
||
if "已滑动到列表顶部" in self.logger.handlers[0].buffer[-1].message:
|
||
self.logger.info("已滑动到列表顶部,停止向上滑动")
|
||
break
|
||
else:
|
||
self.logger.error("向上滑动列表失败")
|
||
return False
|
||
|
||
# 获取当前页面中的所有项目
|
||
current_items = self.get_current_items()
|
||
# self.logger.info(f"向上滑动后找到 {len(current_items)} 个项目: {current_items}")
|
||
|
||
# 检查目标文件是否在当前页面中
|
||
if fixed_filename in current_items:
|
||
self.logger.info(f"找到目标文件: {fixed_filename}")
|
||
# 点击目标文件
|
||
if self.click_item_by_text(fixed_filename):
|
||
return True
|
||
else:
|
||
self.logger.error(f"点击目标文件失败: {fixed_filename}")
|
||
return False
|
||
|
||
scroll_count += 1
|
||
self.logger.info(f"第 {scroll_count} 次向上滑动,继续查找...")
|
||
|
||
self.logger.warning(f"经过 {max_scroll_attempts * 2} 次滑动仍未找到目标文件")
|
||
return False
|
||
|
||
except TimeoutException:
|
||
self.logger.error("等待线路列表元素超时")
|
||
return False
|
||
except Exception as e:
|
||
self.logger.error(f"查找关键词时出错: {str(e)}")
|
||
return False
|
||
|
||
def handle_measurement_dialog(self):
|
||
"""处理测量弹窗 - 选择继续测量"""
|
||
try:
|
||
self.logger.info("检查测量弹窗...")
|
||
|
||
# 直接尝试点击"继续测量"按钮
|
||
continue_btn = WebDriverWait(self.driver, 2).until(
|
||
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/measure_continue_btn"))
|
||
)
|
||
continue_btn.click()
|
||
self.logger.info("已点击'继续测量'按钮")
|
||
return True
|
||
|
||
except TimeoutException:
|
||
self.logger.info("未找到继续测量按钮,可能没有弹窗")
|
||
return True # 没有弹窗也认为是成功的
|
||
except Exception as e:
|
||
self.logger.error(f"点击继续测量按钮时出错: {str(e)}")
|
||
return False
|
||
|
||
# 检查有没有平差处理按钮
|
||
def check_apply_btn(self):
|
||
"""检查是否有平差处理按钮"""
|
||
try:
|
||
apply_btn = WebDriverWait(self.driver, 5).until(
|
||
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn"))
|
||
)
|
||
if apply_btn.is_displayed():
|
||
logging.info("进入平差页面")
|
||
else:
|
||
self.logger.info("没有找到'平差处理'按钮")
|
||
return True
|
||
except TimeoutException:
|
||
self.logger.info("未找到平差处理按钮")
|
||
return False # 没有弹窗也认为是成功的
|
||
except Exception as e:
|
||
self.logger.error(f"点击平差处理按钮时出错: {str(e)}")
|
||
return False
|
||
|
||
|
||
def get_line_end_time(self, line_code):
|
||
"""
|
||
从全局字典中获取线路编码对应的结束时间
|
||
参数:
|
||
line_code: 线路编码
|
||
返回:
|
||
tuple: (date_str, time_str) 日期和时间字符串
|
||
"""
|
||
|
||
if line_code in global_variable.get_line_time_mapping_dict():
|
||
end_time = global_variable.get_line_time_mapping_dict()[line_code]
|
||
date_str = end_time.strftime("%Y-%m-%d")
|
||
time_str = end_time.strftime("%H:%M:%S")
|
||
return (date_str, time_str)
|
||
else:
|
||
self.logger.warning(f"未找到线路编码 {line_code} 的结束时间")
|
||
return (None, None)
|
||
|
||
def show_line_time_mapping(self):
|
||
"""
|
||
显示当前全局字典中的所有线路编码和时间
|
||
"""
|
||
|
||
self.logger.info("\n当前全局字典内容:")
|
||
if global_variable.get_line_time_mapping_dict():
|
||
for line_code, end_time in sorted(global_variable.get_line_time_mapping_dict().items()):
|
||
date_str = end_time.strftime("%Y-%m-%d")
|
||
time_str = end_time.strftime("%H:%M:%S")
|
||
self.logger.info(f" {line_code}: {date_str} {time_str}")
|
||
else:
|
||
self.logger.info(" 全局字典为空")
|
||
self.logger.info(f"总计: {len(global_variable.get_line_time_mapping_dict())} 条记录\n")
|
||
def clear_line_time_mapping(self):
|
||
"""
|
||
清空全局字典
|
||
"""
|
||
global_variable.get_line_time_mapping_dict().clear()
|
||
self.logger.info("已清空全局字典")
|
||
|
||
def set_device_time(self, device_id, time_str=None, date_str=None, disable_auto_sync=True):
|
||
"""
|
||
通过ADB设置设备时间(带管理员权限)
|
||
|
||
参数:
|
||
device_id: 设备ID
|
||
time_str: 时间字符串,格式 "HH:MM:SS" (例如: "14:30:00")
|
||
date_str: 日期字符串,格式 "YYYY-MM-DD" (例如: "2024-10-15")
|
||
disable_auto_sync: 是否禁用自动时间同步(防止设置的时间被网络时间覆盖)
|
||
|
||
返回:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
if time_str is None and date_str is None:
|
||
return True
|
||
|
||
# 首先尝试获取设备的root权限
|
||
self.logger.info(f"尝试获取设备 {device_id} 的root权限...")
|
||
root_result = subprocess.run(
|
||
["adb", "-s", device_id, "root"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=10
|
||
)
|
||
|
||
# 检查root权限获取是否成功(有些设备可能返回非0但实际已获取权限)
|
||
if root_result.returncode != 0:
|
||
self.logger.warning(f"获取root权限返回非0状态码,但继续尝试操作: {root_result.stderr.strip()}")
|
||
|
||
now = datetime.now()
|
||
hour, minute, second = map(int, (time_str or f"{now.hour}:{now.minute}:{now.second}").split(":"))
|
||
year, month, day = map(int, (date_str or f"{now.year}-{now.month}-{now.day}").split("-"))
|
||
|
||
# 禁用自动同步
|
||
if disable_auto_sync:
|
||
# 使用su命令以root权限执行设置
|
||
subprocess.run(
|
||
["adb", "-s", device_id, "shell", "su", "-c",
|
||
"settings put global auto_time 0"],
|
||
timeout=5
|
||
)
|
||
subprocess.run(
|
||
["adb", "-s", device_id, "shell", "su", "-c",
|
||
"settings put global auto_time_zone 0"],
|
||
timeout=5
|
||
)
|
||
|
||
# 优先尝试旧格式 (MMDDhhmmYYYY.ss)
|
||
adb_time_str_old = f"{month:02d}{day:02d}{hour:02d}{minute:02d}{year:04d}.{second:02d}"
|
||
cmd_old = [
|
||
"adb", "-s", device_id, "shell", "su", "-c",
|
||
f"date {adb_time_str_old}"
|
||
]
|
||
result = subprocess.run(cmd_old, capture_output=True, text=True, timeout=10)
|
||
|
||
if result.returncode != 0:
|
||
self.logger.warning(f"旧格式失败,尝试新格式设置日期时间")
|
||
|
||
# 尝试新格式(Toybox),使用su -c确保以root权限执行
|
||
adb_time_str_new = f"{year:04d}{month:02d}{day:02d}.{hour:02d}{minute:02d}{second:02d}"
|
||
cmd_new = [
|
||
"adb", "-s", device_id, "shell", "su", "-c",
|
||
f"date {adb_time_str_new}"
|
||
]
|
||
result = subprocess.run(cmd_new, capture_output=True, text=True, timeout=10)
|
||
|
||
if result.returncode == 0:
|
||
self.logger.info(f"设备 {device_id} 时间设置成功: {year}-{month}-{day} {hour}:{minute}:{second}")
|
||
return True
|
||
else:
|
||
self.logger.error(f"设备 {device_id} 设置时间失败: {result.stderr.strip()}")
|
||
return False
|
||
|
||
except subprocess.TimeoutExpired:
|
||
self.logger.error(f"设备 {device_id} 设置时间命令执行超时")
|
||
return False
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 设置时间时发生异常: {str(e)}")
|
||
return False
|
||
|
||
def disable_wifi(self, device_id):
|
||
"""
|
||
通过ADB关闭设备WiFi
|
||
|
||
返回:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
# 关闭WiFi
|
||
cmd_disable_wifi = [
|
||
"adb", "-s", device_id,
|
||
"shell", "svc", "wifi", "disable"
|
||
]
|
||
|
||
result = subprocess.run(cmd_disable_wifi, capture_output=True, text=True, timeout=10)
|
||
|
||
if result.returncode == 0:
|
||
self.logger.info(f"设备 {device_id} WiFi已关闭")
|
||
time.sleep(1) # 等待WiFi完全关闭
|
||
return True
|
||
else:
|
||
self.logger.error(f"设备 {device_id} 关闭WiFi失败: {result.stderr}")
|
||
return False
|
||
|
||
except subprocess.TimeoutExpired:
|
||
self.logger.error(f"设备 {device_id} 关闭WiFi命令执行超时")
|
||
cmd_check_wifi = [
|
||
"adb", "-s", device_id,
|
||
"shell", "settings get global wifi_on"
|
||
]
|
||
check_result = subprocess.run(cmd_check_wifi, capture_output=True, text=True, timeout=5)
|
||
wifi_state = check_result.stdout.strip()
|
||
|
||
if wifi_state == "0":
|
||
self.logger.info(f"设备 {device_id} WiFi已关闭(校验通过)")
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 关闭WiFi时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def enable_wifi(self, device_id):
|
||
"""
|
||
通过ADB打开设备WiFi
|
||
|
||
返回:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
# 打开WiFi
|
||
cmd_enable_wifi = [
|
||
"adb", "-s", device_id,
|
||
"shell", "svc", "wifi", "enable"
|
||
]
|
||
|
||
result = subprocess.run(cmd_enable_wifi, capture_output=True, text=True, timeout=10)
|
||
|
||
if result.returncode == 0:
|
||
self.logger.info(f"设备 {device_id} WiFi已打开")
|
||
time.sleep(3) # 等待WiFi完全连接
|
||
return True
|
||
else:
|
||
self.logger.error(f"设备 {device_id} 打开WiFi失败: {result.stderr}")
|
||
return False
|
||
|
||
except subprocess.TimeoutExpired:
|
||
self.logger.error(f"设备 {device_id} 打开WiFi命令执行超时")
|
||
return False
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 打开WiFi时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def get_current_time(self, device_id):
|
||
"""
|
||
获取设备当前时间
|
||
|
||
返回:
|
||
str: 设备当前时间字符串,如果获取失败则返回None
|
||
"""
|
||
try:
|
||
cmd_get_time = [
|
||
"adb", "-s", device_id,
|
||
"shell", "date"
|
||
]
|
||
|
||
result = subprocess.run(cmd_get_time, capture_output=True, text=True, timeout=10)
|
||
|
||
if result.returncode == 0:
|
||
current_time = result.stdout.strip()
|
||
self.logger.info(f"设备 {device_id} 当前时间: {current_time}")
|
||
return current_time
|
||
else:
|
||
self.logger.error(f"设备 {device_id} 获取时间失败: {result.stderr}")
|
||
return None
|
||
|
||
except subprocess.TimeoutExpired:
|
||
self.logger.error(f"设备 {device_id} 获取时间命令执行超时")
|
||
return None
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 获取时间时发生错误: {str(e)}")
|
||
return None
|
||
|
||
def check_wifi_status(self, device_id):
|
||
"""
|
||
检查设备WiFi状态
|
||
|
||
返回:
|
||
str: "enabled"表示已开启, "disabled"表示已关闭, None表示获取失败
|
||
"""
|
||
try:
|
||
cmd_check_wifi = [
|
||
"adb", "-s", device_id,
|
||
"shell", "dumpsys", "wifi", "|", "grep", "Wi-Fi"
|
||
]
|
||
|
||
result = subprocess.run(cmd_check_wifi, capture_output=True, text=True, timeout=10)
|
||
|
||
if result.returncode == 0:
|
||
wifi_status = result.stdout.strip()
|
||
if "enabled" in wifi_status.lower():
|
||
self.logger.info(f"设备 {device_id} WiFi状态: 已开启")
|
||
return "enabled"
|
||
elif "disabled" in wifi_status.lower():
|
||
self.logger.info(f"设备 {device_id} WiFi状态: 已关闭")
|
||
return "disabled"
|
||
else:
|
||
self.logger.warning(f"设备 {device_id} 无法确定WiFi状态: {wifi_status}")
|
||
return None
|
||
else:
|
||
# 尝试另一种方法检查WiFi状态
|
||
cmd_check_wifi_alt = [
|
||
"adb", "-s", device_id,
|
||
"shell", "settings", "get", "global", "wifi_on"
|
||
]
|
||
|
||
result_alt = subprocess.run(cmd_check_wifi_alt, capture_output=True, text=True, timeout=10)
|
||
|
||
if result_alt.returncode == 0:
|
||
wifi_on = result_alt.stdout.strip()
|
||
if wifi_on == "1":
|
||
self.logger.info(f"设备 {device_id} WiFi状态: 已开启")
|
||
return "enabled"
|
||
elif wifi_on == "0":
|
||
self.logger.info(f"设备 {device_id} WiFi状态: 已关闭")
|
||
return "disabled"
|
||
else:
|
||
self.logger.warning(f"设备 {device_id} 无法确定WiFi状态: {wifi_on}")
|
||
return None
|
||
else:
|
||
self.logger.error(f"设备 {device_id} 检查WiFi状态失败: {result_alt.stderr}")
|
||
return None
|
||
|
||
except subprocess.TimeoutExpired:
|
||
self.logger.error(f"设备 {device_id} 检查WiFi状态命令执行超时")
|
||
return None
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 检查WiFi状态时发生错误: {str(e)}")
|
||
return None
|
||
|
||
def take_screenshot(self, filename_prefix="screenshot", date_str=None, time_str=None):
|
||
"""
|
||
通过Appium驱动截取设备屏幕
|
||
|
||
参数:
|
||
filename_prefix: 截图文件前缀
|
||
|
||
返回:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
# 获取项目名称
|
||
project_name = global_variable.get_current_project_name() or "默认项目"
|
||
|
||
# 获取当前日期(如果没有提供)
|
||
if not date_str:
|
||
date_str = datetime.now().strftime("%Y%m%d")
|
||
|
||
# 获取当前时间(如果没有提供),并确保格式合法(不含冒号)
|
||
if not time_str:
|
||
time_str = datetime.now().strftime("%H%M%S")
|
||
else:
|
||
# 移除时间中的冒号,确保文件名合法
|
||
time_str = time_str.replace(":", "")
|
||
|
||
# 创建D盘下的截图目录结构:D:\uploadInfo\picture\项目名\年月日
|
||
screenshots_dir = os.path.join("D:\\", "uploadInfo", "picture", project_name, date_str)
|
||
|
||
# 确保目录存在
|
||
try:
|
||
os.makedirs(screenshots_dir, exist_ok=True)
|
||
self.logger.info(f"截图目录: {screenshots_dir}")
|
||
except Exception as dir_error:
|
||
self.logger.error(f"创建截图目录失败: {str(dir_error)}")
|
||
return False
|
||
line_code = global_variable.get_upload_breakpoint_dict().get(filename_prefix)
|
||
if not line_code:
|
||
self.logger.error(f"未找到与断点名称 {filename_prefix} 对应的线路编码")
|
||
line_code = "unknown"
|
||
|
||
# 截图保存
|
||
screenshot_file = os.path.join(
|
||
screenshots_dir,
|
||
f"{line_code}_{filename_prefix}_{time_str}.png"
|
||
)
|
||
|
||
# 尝试保存截图
|
||
try:
|
||
success = self.driver.save_screenshot(screenshot_file)
|
||
if success:
|
||
self.logger.info(f"截图已保存: {screenshot_file}")
|
||
# 验证文件是否真的存在
|
||
if os.path.exists(screenshot_file):
|
||
self.logger.info(f"截图文件验证存在: {screenshot_file}")
|
||
else:
|
||
self.logger.warning(f"截图文件保存成功但验证不存在: {screenshot_file}")
|
||
return True
|
||
else:
|
||
self.logger.error(f"Appium截图保存失败: {screenshot_file}")
|
||
return False
|
||
except Exception as save_error:
|
||
self.logger.error(f"保存截图时发生错误: {str(save_error)}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"截图时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def write_screenshot_status(self, breakpoint_name, success=True):
|
||
"""
|
||
将截图状态写入D盘的txt文件
|
||
|
||
参数:
|
||
breakpoint_name: 断点名称
|
||
success: 截图是否成功
|
||
|
||
返回:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
# D盘txt文件路径
|
||
txt_file_path = "D:\\uploadInfo\\screenshot_status.txt"
|
||
|
||
# 获取当前时间
|
||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 状态信息
|
||
status = "成功" if success else "失败"
|
||
status_line = f"{current_time} - {breakpoint_name} - 截图{status}\n"
|
||
|
||
# 追加写入文件
|
||
with open(txt_file_path, 'a', encoding='utf-8') as f:
|
||
f.write(status_line)
|
||
|
||
self.logger.info(f"截图状态已写入文件: {txt_file_path} - {status_line.strip()}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"写入截图状态文件时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def update_file_status(self, username, from_status, to_status):
|
||
"""
|
||
安全地更新 time.txt 中该用户的状态
|
||
例如: 将 'true' 改为 'running', 或将 'running' 改为 'done'
|
||
"""
|
||
TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
|
||
if not os.path.exists(TIME_FILE_PATH):
|
||
return False
|
||
|
||
success = False
|
||
file_lock = threading.Lock()
|
||
with file_lock:
|
||
try:
|
||
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
|
||
# new_lines = []
|
||
# for line in lines:
|
||
# clean_line = line.strip()
|
||
# # 匹配逻辑:包含用户名 且 以 from_status 结尾
|
||
# if f" {username} " in line and clean_line.endswith(from_status):
|
||
# line = line.replace(from_status, to_status)
|
||
# success = True
|
||
# new_lines.append(line)
|
||
new_lines = []
|
||
for line in lines:
|
||
# 使用正则确保精准匹配用户名和结尾状态
|
||
# 匹配规则:行内包含该用户名,且该行以 from_status 结尾
|
||
if re.search(rf'\b{username}\b', line) and line.strip().endswith(from_status):
|
||
# 只替换行尾的那个状态词
|
||
line = re.sub(rf'{from_status}$', to_status, line.rstrip()) + '\n'
|
||
success = True
|
||
new_lines.append(line)
|
||
|
||
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
|
||
f.writelines(new_lines)
|
||
|
||
if success:
|
||
print(f"📝 [文件更新] 用户 {username}: {from_status} -> {to_status}")
|
||
return success
|
||
except Exception as e:
|
||
print(f"❌ 更新文件状态失败 ({username}): {e}")
|
||
return False
|
||
|
||
def update_upload_info_status(self, status):
|
||
"""
|
||
更新D:/uploadInfo文件夹下的time.txt文件的状态
|
||
|
||
参数:
|
||
status: 状态值,如"ok"或"again"
|
||
|
||
返回:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
# time.txt文件路径
|
||
time_file_path = "D:\\uploadInfo\\time.txt"
|
||
|
||
# 确保文件夹存在
|
||
os.makedirs(os.path.dirname(time_file_path), exist_ok=True)
|
||
|
||
# 写入状态
|
||
with open(time_file_path, 'w', encoding='utf-8') as f:
|
||
f.write(status)
|
||
|
||
self.logger.info(f"已更新上传状态文件: {time_file_path} -> {status}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"更新上传状态文件时发生错误: {str(e)}")
|
||
return False
|
||
|
||
|
||
def handle_confirmation_dialog(self, device_id, timeout=2):
|
||
"""
|
||
处理确认弹窗,点击"是"按钮
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
timeout: 等待弹窗的超时时间
|
||
|
||
Returns:
|
||
bool: 是否成功处理弹窗
|
||
"""
|
||
# 等待弹窗出现(最多等待2秒)
|
||
try:
|
||
dialog_message = WebDriverWait(self.driver, timeout).until(
|
||
EC.presence_of_element_located((AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']"))
|
||
)
|
||
|
||
self.logger.info(f"设备 {device_id} 检测到确认弹窗")
|
||
|
||
# 查找并点击"是"按钮
|
||
confirm_button = self.driver.find_element(
|
||
AppiumBy.XPATH,
|
||
"//android.widget.Button[@text='是' and @resource-id='android:id/button1']"
|
||
)
|
||
|
||
if confirm_button.is_displayed() and confirm_button.is_enabled():
|
||
self.logger.info(f"设备 {device_id} 点击确认弹窗的'是'按钮")
|
||
confirm_button.click()
|
||
time.sleep(0.5)
|
||
return True
|
||
else:
|
||
self.logger.error(f"设备 {device_id} '是'按钮不可点击")
|
||
return False
|
||
|
||
except TimeoutException:
|
||
# 超时未找到弹窗,认为没有弹窗,返回成功
|
||
self.logger.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功")
|
||
return True
|
||
|
||
def click_back_button(self, device_id):
|
||
"""点击手机系统返回按钮"""
|
||
try:
|
||
self.driver.back()
|
||
self.logger.info("已点击手机系统返回按钮")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"点击手机系统返回按钮失败: {str(e)}")
|
||
return False
|
||
|
||
def handle_back_button_with_confirmation(self, device_id, timeout=10):
|
||
"""
|
||
处理返回按钮的确认弹窗
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
timeout: 等待弹窗的超时时间
|
||
|
||
Returns:
|
||
bool: 是否成功处理返回确认弹窗
|
||
"""
|
||
logging.info(f"进入handle_back_button_with_confirmation函数")
|
||
try:
|
||
self.logger.info(f"设备 {device_id} 等待返回确认弹窗出现")
|
||
|
||
start_time = time.time()
|
||
while time.time() - start_time < timeout:
|
||
try:
|
||
# 检查是否存在确认弹窗 - 使用多种定位策略提高兼容性
|
||
dialog_selectors = [
|
||
"//android.widget.TextView[@text='是否退出测量界面?']",
|
||
"//android.widget.TextView[contains(@text, '退出测量界面')]",
|
||
"//android.widget.TextView[contains(@text, '是否退出')]"
|
||
]
|
||
|
||
dialog_message = None
|
||
for selector in dialog_selectors:
|
||
try:
|
||
dialog_message = self.driver.find_element(AppiumBy.XPATH, selector)
|
||
if dialog_message.is_displayed():
|
||
break
|
||
except NoSuchElementException:
|
||
continue
|
||
|
||
if dialog_message and dialog_message.is_displayed():
|
||
self.logger.info(f"设备 {device_id} 检测到返回确认弹窗")
|
||
|
||
# 查找并点击"是"按钮 - 使用多种定位策略
|
||
confirm_selectors = [
|
||
"//android.widget.Button[@text='是' and @resource-id='android:id/button1']",
|
||
"//android.widget.Button[@text='是']",
|
||
"//android.widget.Button[@resource-id='android:id/button1']",
|
||
"//android.widget.Button[contains(@text, '是')]"
|
||
]
|
||
|
||
confirm_button = None
|
||
for selector in confirm_selectors:
|
||
try:
|
||
confirm_button = self.driver.find_element(AppiumBy.XPATH, selector)
|
||
if confirm_button.is_displayed() and confirm_button.is_enabled():
|
||
break
|
||
except NoSuchElementException:
|
||
continue
|
||
|
||
if confirm_button and confirm_button.is_displayed() and confirm_button.is_enabled():
|
||
self.logger.info(f"设备 {device_id} 点击确认弹窗的'是'按钮")
|
||
confirm_button.click()
|
||
time.sleep(0.5)
|
||
|
||
# 验证弹窗是否消失
|
||
try:
|
||
self.driver.find_element(AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']")
|
||
self.logger.warning(f"设备 {device_id} 确认弹窗可能未正确关闭")
|
||
return False
|
||
except NoSuchElementException:
|
||
self.logger.info(f"设备 {device_id} 确认弹窗已成功关闭")
|
||
return True
|
||
else:
|
||
self.logger.error(f"设备 {device_id} 未找到可点击的'是'按钮")
|
||
return False
|
||
|
||
except NoSuchElementException:
|
||
# 弹窗未找到,继续等待
|
||
pass
|
||
except Exception as e:
|
||
self.logger.warning(f"设备 {device_id} 查找确认弹窗时出现异常: {str(e)}")
|
||
|
||
time.sleep(0.5)
|
||
|
||
self.logger.error(f"设备 {device_id} 等待返回确认弹窗超时")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 处理返回确认弹窗时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def handle_adjustment_result_dialog(self):
|
||
"""处理平差结果确认弹窗"""
|
||
try:
|
||
self.logger.info("开始检测平差结果弹窗")
|
||
|
||
# 等待弹窗出现(最多等待5秒)
|
||
warning_dialog = WebDriverWait(self.driver, 5).until(
|
||
EC.presence_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
|
||
)
|
||
|
||
# 验证弹窗内容
|
||
alert_title = warning_dialog.find_element(AppiumBy.ID, "android:id/alertTitle")
|
||
alert_message = warning_dialog.find_element(AppiumBy.ID, "android:id/message")
|
||
|
||
self.logger.info(f"检测到弹窗 - 标题: {alert_title.text}, 消息: {alert_message.text}")
|
||
|
||
# 确认是目标弹窗
|
||
if "警告" in alert_title.text and "是否保留测量成果" in alert_message.text:
|
||
self.logger.info("确认是平差结果确认弹窗")
|
||
|
||
# 点击"是 保留成果"按钮
|
||
yes_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button1")
|
||
if yes_button.text == "是 保留成果":
|
||
yes_button.click()
|
||
self.logger.info("已点击'是 保留成果'按钮")
|
||
|
||
# 等待弹窗消失
|
||
WebDriverWait(self.driver, 5).until(
|
||
EC.invisibility_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
|
||
)
|
||
self.logger.info("弹窗已关闭")
|
||
return True
|
||
else:
|
||
self.logger.error(f"按钮文本不匹配,期望'是 保留成果',实际: {yes_button.text}")
|
||
return False
|
||
else:
|
||
self.logger.warning("弹窗内容不匹配,不是目标弹窗")
|
||
return False
|
||
|
||
except TimeoutException:
|
||
self.logger.info("未检测到平差结果弹窗,继续流程")
|
||
return True # 没有弹窗也是正常情况
|
||
except Exception as e:
|
||
self.logger.error(f"处理平差结果弹窗时出错: {str(e)}")
|
||
return False
|
||
|
||
def check_measurement_list(self, device_id):
|
||
"""
|
||
检查是否存在测量列表
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
|
||
Returns:
|
||
bool: 如果不存在测量列表返回True,存在返回False
|
||
"""
|
||
try:
|
||
# 等待线路列表容器出现
|
||
self.wait.until(
|
||
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
|
||
)
|
||
self.logger.info("线路列表容器已找到")
|
||
|
||
# 如果存在MEASURE_LIST_ID,说明有测量列表,不需要执行后续步骤
|
||
self.logger.info(f"设备 {device_id} 存在测量列表,无需执行后续返回操作")
|
||
return False
|
||
|
||
except TimeoutException:
|
||
# 等待超时,说明没有测量列表
|
||
self.logger.info(f"设备 {device_id} 未找到测量列表,可以继续执行后续步骤")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 检查测量列表时发生错误: {str(e)}")
|
||
return True
|
||
|
||
def handle_back_navigation(self, breakpoint_name, device_id):
|
||
"""
|
||
完整的返回导航处理流程
|
||
|
||
Args:
|
||
breakpoint_name: 断点名称
|
||
device_id: 设备ID
|
||
|
||
Returns:
|
||
bool: 整个返回导航流程是否成功
|
||
"""
|
||
try:
|
||
# time.sleep(2)
|
||
self.logger.info(f"已点击平差处理按钮,检查是否在测量页面")
|
||
|
||
# 检测是否存在测量列表(修正逻辑)
|
||
has_measurement_list = self.check_measurement_list(device_id)
|
||
if not has_measurement_list:
|
||
self.logger.info(f"设备 {device_id} 存在测量列表,重新执行平差流程")
|
||
|
||
# 把断点名称给find_keyword
|
||
if not self.find_keyword(breakpoint_name):
|
||
self.logger.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名")
|
||
return False
|
||
|
||
if not self.handle_measurement_dialog():
|
||
self.logger.error(f"设备 {device_id} 处理测量弹窗失败")
|
||
return False
|
||
|
||
if not self.check_apply_btn():
|
||
self.logger.error(f"设备 {device_id} 检查平差处理按钮失败")
|
||
return False
|
||
|
||
# 滑动列表到底部
|
||
if not self.scroll_list_to_bottom(device_id):
|
||
self.logger.error(f"设备 {device_id} 下滑列表到底部失败")
|
||
return False
|
||
|
||
# 2. 点击最后一个spinner
|
||
if not self.click_last_spinner_with_retry(device_id):
|
||
self.logger.error(f"设备 {device_id} 点击最后一个spinner失败")
|
||
return False
|
||
|
||
# 3. 再下滑一次
|
||
if not self.scroll_down_once(device_id):
|
||
self.logger.warning(f"设备 {device_id} 再次下滑失败,但继续执行")
|
||
|
||
|
||
|
||
# 4. 点击平差处理按钮
|
||
if not self.click_adjustment_button(device_id):
|
||
self.logger.error(f"设备 {device_id} 点击平差处理按钮失败")
|
||
return False
|
||
|
||
self.logger.info(f"重新选择断点并点击平差处理按钮成功")
|
||
return True
|
||
|
||
else:
|
||
self.logger.info(f"不在测量页面,继续执行后续返回操作")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 处理返回导航时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def execute_back_navigation_steps(self, device_id):
|
||
"""
|
||
执行实际的返回导航步骤
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
|
||
Returns:
|
||
bool: 导航是否成功
|
||
"""
|
||
try:
|
||
# 1. 首先点击返回按钮
|
||
if not self.click_back_button(device_id):
|
||
self.logger.error(f"设备 {device_id} 点击返回按钮失败")
|
||
return False
|
||
|
||
# 2. 处理返回确认弹窗
|
||
self.logger.info(f"已点击返回按钮,等待处理返回确认弹窗")
|
||
if not self.handle_confirmation_dialog(device_id):
|
||
self.logger.error(f"设备 {device_id} 处理返回确认弹窗失败")
|
||
return False
|
||
|
||
|
||
# 3. 验证是否成功返回到上一页面
|
||
time.sleep(0.5) # 等待页面跳转完成
|
||
|
||
# 可以添加页面验证逻辑,比如检查是否返回到预期的页面
|
||
# 这里可以根据实际应用添加特定的页面元素验证
|
||
|
||
self.logger.info(f"设备 {device_id} 返回导航流程完成")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 执行返回导航步骤时发生错误: {str(e)}")
|
||
return False
|
||
|
||
|
||
def scroll_list_to_bottom(self, device_id, max_swipes=60):
|
||
"""
|
||
下滑列表到最底端
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
max_swipes: 最大下滑次数
|
||
|
||
Returns:
|
||
bool: 是否滑动到底部
|
||
"""
|
||
try:
|
||
self.logger.info(f"设备 {device_id} 开始下滑列表到底部")
|
||
|
||
# 获取列表元素
|
||
list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list")
|
||
|
||
same_content_count = 0
|
||
|
||
# 初始化第一次的子元素文本
|
||
initial_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView")
|
||
current_child_texts = "|".join([
|
||
elem.text.strip() for elem in initial_child_elements
|
||
if elem.text and elem.text.strip()
|
||
])
|
||
|
||
for i in range(max_swipes):
|
||
# 执行下滑操作
|
||
self.driver.execute_script("mobile: scrollGesture", {
|
||
'elementId': list_view.id,
|
||
'direction': 'down',
|
||
'percent': 0.8,
|
||
'duration': 500
|
||
})
|
||
|
||
time.sleep(0.5)
|
||
|
||
# 获取滑动后的子元素文本
|
||
new_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView")
|
||
new_child_texts = "|".join([
|
||
elem.text.strip() for elem in new_child_elements
|
||
if elem.text and elem.text.strip()
|
||
])
|
||
|
||
# 判断内容是否变化:若连续3次相同,认为到达底部
|
||
if new_child_texts == current_child_texts:
|
||
same_content_count += 1
|
||
if same_content_count >= 2:
|
||
self.logger.info(f"设备 {device_id} 列表已滑动到底部,共滑动 {i+1} 次")
|
||
return True
|
||
else:
|
||
same_content_count = 0 # 内容变化,重置计数
|
||
current_child_texts = new_child_texts # 更新上一次内容
|
||
|
||
self.logger.debug(f"设备 {device_id} 第 {i+1} 次下滑完成,当前子元素文本: {new_child_texts[:50]}...") # 打印部分文本
|
||
|
||
self.logger.warning(f"设备 {device_id} 达到最大下滑次数 {max_swipes},可能未完全到底部")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 下滑列表时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def click_last_spinner_with_retry(self, device_id, max_retries=2):
|
||
"""带重试机制的点击方法"""
|
||
for attempt in range(max_retries):
|
||
try:
|
||
if self.click_last_spinner(device_id):
|
||
return True
|
||
self.logger.warning(f"设备 {device_id} 第{attempt + 1}次点击失败,准备重试")
|
||
time.sleep(0.5) # 重试前等待
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 第{attempt + 1}次尝试失败: {str(e)}")
|
||
|
||
self.logger.error(f"设备 {device_id} 所有重试次数已用尽")
|
||
return False
|
||
|
||
def click_last_spinner(self, device_id):
|
||
"""
|
||
点击最后一个spinner
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
|
||
Returns:
|
||
bool: 是否成功点击
|
||
"""
|
||
try:
|
||
self.logger.info(f"设备 {device_id} 查找最后一个spinner")
|
||
|
||
# 查找所有的spinner元素
|
||
spinners = self.driver.find_elements(AppiumBy.ID, "com.bjjw.cjgc:id/spinner")
|
||
|
||
if not spinners:
|
||
self.logger.error(f"设备 {device_id} 未找到任何spinner元素")
|
||
return False
|
||
|
||
# 获取最后一个spinner
|
||
last_spinner = spinners[-1]
|
||
|
||
if not (last_spinner.is_displayed() and last_spinner.is_enabled()):
|
||
self.logger.error(f"设备 {device_id} 最后一个spinner不可点击")
|
||
return False
|
||
|
||
# 点击操作
|
||
self.logger.info(f"设备 {device_id} 点击最后一个spinner")
|
||
last_spinner.click()
|
||
|
||
# 执行额外一次下滑操作
|
||
self.scroll_down_once(device_id)
|
||
|
||
max_retries = 3 # 最大重试次数
|
||
retry_count = 0
|
||
wait_timeout = 5 # 增加等待时间到5秒
|
||
|
||
while retry_count < max_retries:
|
||
try:
|
||
# 确保device_id正确设置,使用全局变量作为备用
|
||
if not hasattr(self, 'device_id') or not self.device_id:
|
||
# 优先使用传入的device_id,其次使用全局变量
|
||
self.device_id = device_id if device_id else global_variable.get_device_id()
|
||
|
||
# 使用self.device_id,确保有默认值
|
||
actual_device_id = self.device_id if self.device_id else global_variable.get_device_id()
|
||
|
||
if not check_session_valid(self.driver, actual_device_id):
|
||
self.logger.warning(f"设备 {actual_device_id} 会话无效,尝试重新连接驱动...")
|
||
try:
|
||
# 使用正确的设备ID进行重连
|
||
new_driver, new_wait = reconnect_driver(actual_device_id, self.driver)
|
||
if new_driver:
|
||
self.driver = new_driver
|
||
self.wait = new_wait
|
||
self.logger.info(f"设备 {actual_device_id} 驱动重连成功")
|
||
else:
|
||
self.logger.error(f"设备 {actual_device_id} 驱动重连失败")
|
||
retry_count += 1
|
||
continue
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {actual_device_id} 驱动重连异常: {str(e)}")
|
||
retry_count += 1
|
||
continue
|
||
|
||
# 点击spinner(如果是重试,需要重新获取元素)
|
||
if retry_count > 0:
|
||
spinners = self.driver.find_elements(AppiumBy.CLASS_NAME, "android.widget.Spinner")
|
||
if not spinners:
|
||
self.logger.error(f"设备 {device_id} 未找到spinner元素")
|
||
retry_count += 1
|
||
continue
|
||
last_spinner = spinners[-1]
|
||
if not (last_spinner.is_displayed() and last_spinner.is_enabled()):
|
||
self.logger.error(f"设备 {device_id} spinner不可点击")
|
||
retry_count += 1
|
||
continue
|
||
self.logger.info(f"设备 {device_id} 重新点击spinner")
|
||
last_spinner.click()
|
||
# 重试时也执行下滑操作
|
||
self.scroll_down_once(device_id)
|
||
|
||
# 等待下拉菜单出现,增加等待时间到5秒
|
||
wait = WebDriverWait(self.driver, wait_timeout)
|
||
detail_show = wait.until(
|
||
EC.presence_of_element_located((AppiumBy.ID, "com.bjjw.cjgc:id/detailshow"))
|
||
)
|
||
|
||
if detail_show.is_displayed():
|
||
self.logger.info(f"设备 {device_id} spinner点击成功,下拉菜单已展开")
|
||
return True
|
||
else:
|
||
self.logger.error(f"设备 {device_id} 下拉菜单未显示")
|
||
retry_count += 1
|
||
continue
|
||
|
||
except Exception as wait_error:
|
||
error_msg = str(wait_error)
|
||
self.logger.error(f"设备 {device_id} 等待下拉菜单超时 (第{retry_count+1}次尝试): {error_msg}")
|
||
|
||
# 检查是否是连接断开相关的错误
|
||
if not check_session_valid(self.driver, self.device_id):
|
||
self.logger.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
|
||
# if any(keyword in error_msg for keyword in ['socket hang up', 'Could not proxy command']):
|
||
# self.logger.warning(f"设备 {device_id} 检测到连接相关错误,尝试重连...")
|
||
if not reconnect_driver(self.device_id, self.driver):
|
||
self.logger.error(f"设备 {device_id} 驱动重连失败")
|
||
|
||
retry_count += 1
|
||
if retry_count < max_retries:
|
||
self.logger.info(f"设备 {device_id} 将在1秒后进行第{retry_count+1}次重试")
|
||
time.sleep(1) # 等待1秒后重试
|
||
|
||
self.logger.error(f"设备 {device_id} 经过{max_retries}次重试后仍无法展开下拉菜单")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 点击最后一个spinner时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def scroll_down_once(self, device_id):
|
||
"""
|
||
再次下滑一次
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
|
||
Returns:
|
||
bool: 是否成功下滑
|
||
"""
|
||
try:
|
||
self.logger.info(f"设备 {device_id} 执行额外一次下滑")
|
||
|
||
# 获取列表元素
|
||
list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list")
|
||
|
||
# 执行下滑操作
|
||
self.driver.execute_script("mobile: scrollGesture", {
|
||
'elementId': list_view.id,
|
||
'direction': 'down',
|
||
'percent': 0.5
|
||
})
|
||
|
||
time.sleep(0.2)
|
||
self.logger.info(f"设备 {device_id} 额外下滑完成")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 额外下滑时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def click_adjustment_button(self, device_id):
|
||
"""
|
||
点击平差处理按钮
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
|
||
Returns:
|
||
bool: 是否成功点击
|
||
"""
|
||
try:
|
||
self.logger.info(f"设备 {device_id} 查找平差处理按钮")
|
||
|
||
# 查找平差处理按钮
|
||
adjustment_button = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn")
|
||
|
||
# 验证按钮文本
|
||
button_text = adjustment_button.text
|
||
if "平差处理" not in button_text:
|
||
self.logger.warning(f"设备 {device_id} 按钮文本不匹配,期望'平差处理',实际: {button_text}")
|
||
|
||
if adjustment_button.is_displayed() and adjustment_button.is_enabled():
|
||
self.logger.info(f"设备 {device_id} 点击平差处理按钮")
|
||
adjustment_button.click()
|
||
time.sleep(3) # 等待平差处理完成
|
||
return True
|
||
else:
|
||
self.logger.error(f"设备 {device_id} 平差处理按钮不可点击")
|
||
return False
|
||
|
||
except NoSuchElementException:
|
||
self.logger.error(f"设备 {device_id} 未找到平差处理按钮")
|
||
return False
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 点击平差处理按钮时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def add_breakpoint_to_upload_list(self, breakpoint_name, line_num):
|
||
"""添加平差完成的断点到上传列表和字典"""
|
||
if breakpoint_name and breakpoint_name not in global_variable.get_upload_breakpoint_list():
|
||
global_variable.get_upload_breakpoint_list().append(breakpoint_name)
|
||
global_variable.get_upload_breakpoint_dict()[breakpoint_name] = {
|
||
'breakpoint_name': breakpoint_name,
|
||
'line_num': line_num
|
||
}
|
||
|
||
logging.info(f"成功添加断点 '{breakpoint_name}' 到上传列表")
|
||
logging.info(f"断点详细信息: 线路编码={line_num}")
|
||
return True
|
||
else:
|
||
logging.warning(f"断点名为空或已存在于列表中")
|
||
return False
|
||
|
||
|
||
def handle_confirmation_dialog_save(self, device_id, timeout=2):
|
||
"""
|
||
处理确认弹窗,点击"是"按钮
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
timeout: 等待弹窗的超时时间
|
||
|
||
Returns:
|
||
bool: 是否成功处理弹窗
|
||
"""
|
||
# 等待弹窗出现(最多等待2秒)
|
||
try:
|
||
dialog_message = WebDriverWait(self.driver, timeout).until(
|
||
EC.presence_of_element_located((AppiumBy.ID, "android:id/content"))
|
||
)
|
||
|
||
self.logger.info(f"设备 {device_id} 检测到确认弹窗")
|
||
|
||
# 查找并点击"是"按钮
|
||
confirm_button = self.driver.find_element(
|
||
AppiumBy.ID, "android:id/button1"
|
||
)
|
||
|
||
if confirm_button.is_displayed() and confirm_button.is_enabled():
|
||
self.logger.info(f"设备 {device_id} 点击确认弹窗的'是'按钮")
|
||
confirm_button.click()
|
||
time.sleep(0.5)
|
||
return True
|
||
else:
|
||
self.logger.error(f"设备 {device_id} '是'按钮不可点击")
|
||
return False
|
||
|
||
except TimeoutException:
|
||
# 超时未找到弹窗,认为没有弹窗,返回成功
|
||
self.logger.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功")
|
||
return True
|
||
|
||
def load_line_time_mapping_dict(self, filename="20251022.1.CZSCZQ-3fhg0410.txt", log_directory="D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510", poll_interval=120, max_wait_time=18000):
|
||
"""
|
||
加载指定文件中的线路编码和时间到全局字典
|
||
参数:
|
||
filename: 文件名 (例如: "20251022.1.CZSCZQ-3fhg0410.txt")
|
||
log_directory: 文件所在目录
|
||
poll_interval: 轮询间隔时间(秒),默认2分钟
|
||
max_wait_time: 最大等待时间(秒),默认5小时
|
||
"""
|
||
|
||
try:
|
||
current_year_month = datetime.now().strftime("%Y%m")
|
||
# 将Logs\\后的内容替换为实际年月
|
||
log_directory = log_directory.split("Logs\\")[0] + "Logs\\" + current_year_month
|
||
|
||
# 获取当前年月日(例如:20251023)
|
||
current_date = datetime.now().strftime("%Y%m%d")
|
||
# 拼接格式:年月日.1.用户名.txt
|
||
filename = f"{current_date}.1.{global_variable.get_username()}.txt"
|
||
file_path = os.path.join(log_directory, filename)
|
||
|
||
# 轮询等待文件出现
|
||
start_time = time.time()
|
||
wait_count = 0
|
||
|
||
while not os.path.exists(file_path):
|
||
wait_count += 1
|
||
elapsed_time = time.time() - start_time
|
||
|
||
if elapsed_time >= max_wait_time:
|
||
self.logger.error(f"等待文件超时 ({max_wait_time}秒),文件仍未出现: {file_path}")
|
||
return False
|
||
|
||
self.logger.info(f"第{wait_count}次检查 - 文件不存在,等待 {poll_interval} 秒后重试... (已等待 {elapsed_time:.0f} 秒)")
|
||
time.sleep(poll_interval)
|
||
|
||
# 文件存在,继续处理
|
||
self.logger.info(f"文件已找到,正在加载: {file_path}")
|
||
|
||
# 临时字典,用于存储当前文件中的线路编码和时间
|
||
temp_mapping = {}
|
||
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
for line_num, line in enumerate(f, 1):
|
||
line = line.strip()
|
||
# 解析日志行格式: "2025-10-22 16:22:50.171, INFO, 已修改线路时间:L205413, 结束时间:2025-10-22 09:18:17.460"
|
||
if "已修改线路时间" in line and "结束时间" in line:
|
||
# 提取线路编码
|
||
line_code_match = re.search(r'已修改线路时间[::]\s*([^,]+)', line)
|
||
# 提取结束时间
|
||
end_time_match = re.search(r'结束时间[::]\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', line)
|
||
|
||
if line_code_match and end_time_match:
|
||
line_code = line_code_match.group(1).strip()
|
||
end_time_str = end_time_match.group(1)
|
||
|
||
# 解析日期时间
|
||
try:
|
||
end_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S")
|
||
# 存入临时字典(同一个线路编码,后面的会覆盖前面的,实现取最后一条)
|
||
temp_mapping[line_code] = end_time
|
||
self.logger.info(f"第{line_num}行: 找到线路编码 {line_code} 时间: {end_time}")
|
||
except ValueError as e:
|
||
self.logger.warning(f"第{line_num}行: 解析时间格式错误: {end_time_str}, 错误: {e}")
|
||
continue
|
||
else:
|
||
self.logger.warning(f"第{line_num}行: 无法解析线路编码或结束时间")
|
||
|
||
# 将临时字典的内容更新到全局字典
|
||
for line_code, end_time in temp_mapping.items():
|
||
global_variable.get_line_time_mapping_dict()[line_code] = end_time
|
||
self.logger.info(f"更新全局字典: {line_code} -> {end_time}")
|
||
|
||
total_wait_time = time.time() - start_time
|
||
self.logger.info(f"文件加载完成,等待时间: {total_wait_time:.0f}秒,共处理 {len(temp_mapping)} 条记录")
|
||
self.logger.info(f"当前全局字典总数: {len(global_variable.get_line_time_mapping_dict())} 条记录")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"加载文件 {filename} 时出错: {str(e)}")
|
||
return False
|
||
|
||
def screenshot_page_manager(self, device_id):
|
||
"""执行截图页面管理操作"""
|
||
try:
|
||
# 加载指定文件中的线路编码和时间到全局字典
|
||
if not self.load_line_time_mapping_dict("20251022.1.CZSCZQ-3fhg0410.txt", "D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510"):
|
||
self.logger.error(f"设备 {device_id} 加载线路时间映射字典失败")
|
||
return False
|
||
|
||
# time.sleep(5)
|
||
|
||
# 循环检查数据数量是否一致,直到获取到完整数据
|
||
retry_count = 0
|
||
while True:
|
||
# 获取断点列表和线路时间字典的数量
|
||
breakpoint_count = len(global_variable.get_upload_breakpoint_dict())
|
||
line_time_count = len(global_variable.get_line_time_mapping_dict())
|
||
self.logger.info(f"设备 {device_id} 断点列表数量: {breakpoint_count}, 文件中获取的线路时间数量: {line_time_count}")
|
||
|
||
# 如果断点列表为空,无法比较,直接跳出循环
|
||
if breakpoint_count == 0:
|
||
self.logger.warning(f"设备 {device_id} 断点列表为空,无法进行数量比较")
|
||
break
|
||
|
||
# 如果数量一致,获取到完整数据,跳出循环
|
||
if line_time_count == breakpoint_count:
|
||
self.logger.info(f"设备 {device_id} 数据数量一致,已获取完整数据")
|
||
break
|
||
|
||
# 数量不一致,等待三分钟后再次获取文件
|
||
retry_count += 1
|
||
self.logger.warning(f"设备 {device_id} 数据数量不一致: 断点列表({breakpoint_count}) != 线路时间({line_time_count}),第{retry_count}次重试,等待1分钟后重新加载文件")
|
||
time.sleep(60) # 等待3分钟
|
||
|
||
# 重新加载文件
|
||
if not self.load_line_time_mapping_dict("20251022.1.CZSCZQ-3fhg0410.txt", "D:\\soft\\安卓时间修改-v0.7.13-1\\Logs\\202510"):
|
||
self.logger.error(f"设备 {device_id} 重新加载线路时间映射字典失败")
|
||
else:
|
||
self.logger.info(f"设备 {device_id} 重新加载完成,新的线路时间数量: {len(global_variable.get_line_time_mapping_dict())}")
|
||
|
||
# # 禁用WiFi
|
||
# if not self.disable_wifi(device_id):
|
||
# self.logger.error(f"设备 {device_id} 禁用WiFi失败")
|
||
# return False
|
||
|
||
# 获取GLOBAL_BREAKPOINT_DICT中的断点名称和对应的线路编码
|
||
# 检查GLOBAL_BREAKPOINT_DICT是否为空,如果为空则初始化一些测试数据
|
||
if not global_variable.get_upload_breakpoint_dict():
|
||
self.logger.warning("global_variable.get_upload_breakpoint_dict()为空,正在初始化测试数据")
|
||
# 添加一些测试断点数据,实际使用时应该从其他地方加载
|
||
# 注意:这里的值应该是字典,与section_mileage_config_page.py中的数据结构保持一致
|
||
global_variable.set_breakpoint_dict({
|
||
"CZSCZQ-3-康定2号隧道-DK297+201-DK297+199-山区": {
|
||
'breakpoint_name': "CZSCZQ-3-康定2号隧道-DK297+201-DK297+199-山区",
|
||
'line_num': "L205413"
|
||
},
|
||
"CZSCZQ-3-康定2号隧道-DK296+701-DK296+699-山区": {
|
||
'breakpoint_name': "CZSCZQ-3-康定2号隧道-DK296+701-DK296+699-山区",
|
||
'line_num': "L205414"
|
||
}
|
||
})
|
||
|
||
# 开始循环
|
||
all_success = True
|
||
for breakpoint_name in global_variable.get_upload_breakpoint_dict().keys():
|
||
self.logger.info(f"开始处理要平差截图的断点 {breakpoint_name}")
|
||
# 把断点名称给find_keyword
|
||
if not self.find_keyword(breakpoint_name):
|
||
self.logger.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名")
|
||
all_success = False
|
||
continue
|
||
|
||
if not self.handle_measurement_dialog():
|
||
self.logger.error(f"设备 {device_id} 处理测量弹窗失败")
|
||
all_success = False
|
||
continue
|
||
|
||
if not self.check_apply_btn():
|
||
self.logger.error(f"设备 {device_id} 检查平差处理按钮失败")
|
||
all_success = False
|
||
self.click_back_button(device_id)
|
||
continue
|
||
|
||
# 根据断点名称在get_upload_breakpoint_dict()中获取线路编码
|
||
line_code = global_variable.get_upload_breakpoint_dict().get(breakpoint_name)
|
||
if not line_code:
|
||
self.logger.error(f"设备 {device_id} 未找到断点 {breakpoint_name} 对应的线路编码")
|
||
all_success = False
|
||
continue
|
||
|
||
# 根据线路编码查找对应的时间
|
||
date_str, time_str = self.get_line_end_time(line_code)
|
||
if not time_str or not date_str:
|
||
self.logger.error(f"设备 {device_id} 未找到线路 {line_code} 对应的时间")
|
||
all_success = False
|
||
continue
|
||
|
||
# 修改时间
|
||
if not self.set_device_time(device_id, time_str, date_str):
|
||
self.logger.error(f"设备 {device_id} 设置设备时间失败")
|
||
all_success = False
|
||
continue
|
||
|
||
# 滑动列表到底部
|
||
if not self.scroll_list_to_bottom(device_id):
|
||
self.logger.error(f"设备 {device_id} 下滑列表到底部失败")
|
||
all_success = False
|
||
continue
|
||
|
||
# 2. 点击最后一个spinner
|
||
if not self.click_last_spinner_with_retry(device_id):
|
||
self.logger.error(f"设备 {device_id} 点击最后一个spinner失败")
|
||
all_success = False
|
||
continue
|
||
|
||
# 3. 再下滑一次
|
||
if not self.scroll_down_once(device_id):
|
||
self.logger.warning(f"设备 {device_id} 再次下滑失败,但继续执行")
|
||
|
||
# 平差处理完成后截图
|
||
time.sleep(0.2) # 等待平差处理按钮点击后的界面变化
|
||
logging.info("断点保存到上传列表成功,开始截图")
|
||
# png_time = date_str + " " + time_str
|
||
if not self.take_screenshot(breakpoint_name, date_str, time_str):
|
||
self.logger.error(f"设备 {device_id} 截图失败")
|
||
self.write_screenshot_status(breakpoint_name, success=False)
|
||
all_success = False
|
||
else:
|
||
self.write_screenshot_status(breakpoint_name, success=True)
|
||
|
||
# 点击返回按钮并处理弹窗
|
||
if not self.execute_back_navigation_steps(device_id):
|
||
self.logger.error(f"设备 {device_id} 处理返回按钮确认失败")
|
||
all_success = False
|
||
# 启用WiFi
|
||
# if not self.enable_wifi(device_id):
|
||
# self.logger.error(f"设备 {device_id} 启用WiFi失败")
|
||
# return False
|
||
|
||
# 根据截图结果更新time.txt文件状态
|
||
status = "ok" if all_success else "again"
|
||
# self.update_upload_info_status(status)
|
||
username = global_variable.get_username()
|
||
self.update_file_status(username, "running", status)
|
||
self.logger.info(f"{username} 截图完成状态为 {status}")
|
||
|
||
self.logger.info(f"设备 {device_id} 截图页面操作执行完成,状态: {status}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"设备 {device_id} 执行截图页面操作时出错: {str(e)}")
|
||
# 保存错误截图
|
||
# error_screenshot_file = os.path.join(
|
||
# results_dir,
|
||
# f"screenshot_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
|
||
# )
|
||
# self.driver.save_screenshot(error_screenshot_file)
|
||
# self.logger.info(f"错误截图已保存: {error_screenshot_file}")
|
||
return False |