Files
cjgc_data/check_upload.py

1042 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import time
import requests
import pandas as pd
from io import BytesIO
from datetime import datetime
import os
import sys
import subprocess
import argparse
import globals.global_variable as global_variable
import globals.driver_utils as driver_utils # 导入驱动工具模块
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
import globals.ids as ids
from page_objects.upload_config_page import UploadConfigPage
class checkUpload:
def __init__(self, driver=None, wait=None,device_id=None):
"""初始化checkUpload对象"""
if device_id is None:
self.device_id = driver_utils.get_device_id()
else:
self.device_id = device_id
if driver is None or wait is None:
self.driver, self.wait = driver_utils.init_appium_driver(self.device_id)
else:
self.driver = driver
self.wait = wait
try:
if not driver_utils.check_session_valid(self.driver, self.device_id):
logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
self.driver, self.wait = driver_utils.reconnect_driver(self.device_id, self.driver)
if not self.driver:
logging.error(f"设备 {self.device_id} 驱动重连失败")
return False
except Exception as inner_e:
logging.warning(f"设备 {self.device_id} 检查会话状态时出错: {str(inner_e)}")
return False
# 检查应用是否成功启动
if driver_utils.is_app_launched(self.driver):
logging.info(f"设备 {self.device_id} 沉降观测App已成功启动")
else:
logging.warning(f"设备 {self.device_id} 应用可能未正确启动")
driver_utils.check_app_status(self.driver)
self.upload_config_page = UploadConfigPage(self.driver, self.wait, self.device_id)
def get_measure_data(self):
# 模拟获取测量数据
pass
def add_transition_point(self):
"""添加转点"""
try:
if not driver_utils.check_session_valid(self.driver, self.device_id):
logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
self.driver, self.wait = driver_utils.reconnect_driver(self.device_id, self.driver)
if not self.driver:
logging.error(f"设备 {self.device_id} 驱动重连失败")
return False
except Exception as inner_e:
logging.warning(f"设备 {self.device_id} 检查会话状态时出错: {str(inner_e)}")
return False
try:
# 查找并点击添加转点按钮(双击确保点到)
add_transition_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/btn_add_ZPoint"))
)
add_transition_btn.click()
time.sleep(0.3)
add_transition_btn.click()
logging.info("已点击添加转点按钮")
return True
except TimeoutException:
logging.error("等待添加转点按钮超时")
return False
except Exception as e:
logging.error(f"添加转点时出错: {str(e)}")
return False
def get_excel_from_url(self, url):
"""
从URL获取Excel文件并解析为字典
Excel只有一列数据A列每行是站点值
Args:
url: Excel文件的URL地址
Returns:
dict: 解析后的站点数据字典 {行号: 值}失败返回None
"""
try:
print(f"正在从URL获取数据: {url}")
response = requests.get(url, timeout=30)
response.raise_for_status() # 检查请求是否成功
# 使用pandas读取Excel数据指定没有表头只读第一个sheet
excel_data = pd.read_excel(
BytesIO(response.content),
header=None, # 没有表头
sheet_name=0, # 只读取第一个sheet
dtype=str # 全部作为字符串读取
)
station_dict = {}
# 解析Excel数据使用行号+1作为站点编号A列的值作为站点值
print("解析Excel数据使用行号作为站点编号...")
for index, row in excel_data.iterrows():
station_num = index + 1 # 行号从1开始作为站点编号
station_value = str(row[0]).strip() if pd.notna(row[0]) else ""
if station_value: # 只保存非空值
station_dict[station_num] = station_value
print(f"成功解析Excel{len(station_dict)}条数据")
return station_dict
except requests.exceptions.RequestException as e:
print(f"请求URL失败: {e}")
return None
except Exception as e:
print(f"解析Excel失败: {e}")
return None
def check_upload_exists(self, station_data: dict, station_num: int) -> str:
"""
根据站点编号检查该站点的值是否以Z开头
Args:
station_data: 站点数据字典 {编号: 值}
station_num: 要检查的站点编号
Returns:
str: 如果站点存在且以Z开头返回"add",否则返回"pass"
"""
if station_num not in station_data:
print(f"站点{station_num}不存在")
return "error"
value = station_data[station_num]
str_value = str(value).strip()
is_z = str_value.upper().startswith('Z')
result = "add" if is_z else "pass"
print(f"站点{station_num}: {value} -> {result}")
return result
def run(self):
# 滑动列表到底部
if not self.scroll_list_to_bottom(self.device_id):
logging.error(f"设备 {self.device_id} 下滑列表到底部失败")
return False
# 2. 点击最后一个spinner
if not self.click_last_spinner_with_retry(self.device_id):
logging.error(f"设备 {self.device_id} 点击最后一个spinner失败")
return False
# 3. 再下滑一次
if not self.scroll_down_once(self.device_id):
logging.warning(f"设备 {self.device_id} 再次下滑失败,但继续执行")
# # 截图
# self.driver.save_screenshot("check_upload.png")
if not self.take_screenshot():
logging.error(f"设备 {self.device_id} 截图失败")
# 打完数据,截图完毕,点击平差处理按钮
if not self.click_adjustment_button(self.device_id):
logging.error(f"设备 {self.device_id} 点击平差处理按钮失败")
return False
item = global_variable.GLOBAL_CURRENT_PROJECT_NAME
if item.endswith('-平原'):
item = item[:-3] # 去掉最后3个字符"-平原"
# 检查是否在测量页面在就重新执行选择断点滑动列表到底部点击最后一个spinner 再下滑一次,点击平差处理按钮平差
if not self.handle_back_navigation(item, self.device_id):
logging.error(f"{item}平差失败")
# 检测并处理"是 保留成果"弹窗
if not self.handle_adjustment_result_dialog():
logging.error("处理平差结果弹窗失败")
# 检查在不在测量列表页面,不在就点击返回按钮并处理弹窗
if self.check_measurement_list(self.device_id):
logging.error(f"设备 {self.device_id} 未在测量列表页面")
# 点击返回按钮并处理弹窗
if not self.execute_back_navigation_steps(self.device_id):
logging.error(f"设备 {self.device_id} 处理返回按钮确认失败")
# # 点击返回按钮并处理弹窗
# if not self.execute_back_navigation_steps(self.device_id):
# logging.error(f"设备 {self.device_id} 处理返回按钮确认失败")
# 执行上传配置管理,传入当前断点名称
breakpoint_name = global_variable.GLOBAL_CURRENT_PROJECT_NAME
line_num = global_variable.GLOBAL_LINE_NUM
self.results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_results')
if self.upload_config_page.upload_config_page_manager(self.results_dir, breakpoint_name, line_num):
logging.info(f"设备 {self.device_id} 断点 '{breakpoint_name}' 上传成功")
upload_success_count += 1
else:
logging.error(f"设备 {self.device_id} 断点 '{breakpoint_name}' 上传失败")
for i in range(3):
if self.upload_config_page.upload_config_page_manager(self.results_dir, breakpoint_name, line_num):
logging.info(f"设备 {self.device_id} 断点 '{breakpoint_name}' 重试上传成功")
upload_success_count += 1
break
else:
logging.error(f"设备 {self.device_id} 断点 '{breakpoint_name}' 上传失败,第 {i+1} 次重试")
return True
def click_adjustment_button(self, device_id):
"""
点击平差处理按钮
Args:
device_id: 设备ID
Returns:
bool: 是否成功点击
"""
try:
logging.info(f"设备 {device_id} 查找平差处理按钮")
# 查找平差处理按钮
adjustment_button = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn")
# 验证按钮文本
button_text = adjustment_button.text
if "平差处理" not in button_text:
logging.warning(f"设备 {device_id} 按钮文本不匹配,期望'平差处理',实际: {button_text}")
if adjustment_button.is_displayed() and adjustment_button.is_enabled():
logging.info(f"设备 {device_id} 点击平差处理按钮")
adjustment_button.click()
time.sleep(3) # 等待平差处理完成
return True
else:
logging.error(f"设备 {device_id} 平差处理按钮不可点击")
return False
except NoSuchElementException:
logging.error(f"设备 {device_id} 未找到平差处理按钮")
return False
except Exception as e:
logging.error(f"设备 {device_id} 点击平差处理按钮时发生错误: {str(e)}")
return False
def handle_back_navigation(self, breakpoint_name, device_id):
"""
完整的返回导航处理流程
Args:
breakpoint_name: 断点名称
device_id: 设备ID
Returns:
bool: 整个返回导航流程是否成功
"""
try:
# time.sleep(2)
logging.info(f"已点击平差处理按钮,检查是否在测量页面")
# 检测是否存在测量列表
has_measurement_list = self.check_measurement_list(device_id)
if not has_measurement_list:
logging.info(f"设备 {device_id} 存在测量列表,重新执行平差流程")
# 把断点名称给find_keyword
if not self.find_keyword(breakpoint_name):
logging.error(f"设备 {device_id} 未找到包含 {breakpoint_name} 的文件名")
return False
if not self.handle_measurement_dialog():
logging.error(f"设备 {device_id} 处理测量弹窗失败")
return False
if not self.check_apply_btn():
logging.error(f"设备 {device_id} 检查平差处理按钮失败")
return False
# 4. 点击平差处理按钮
if not self.click_adjustment_button(device_id):
logging.error(f"设备 {device_id} 点击平差处理按钮失败")
return False
logging.info(f"重新选择断点并点击平差处理按钮成功")
return True
else:
logging.info(f"不在测量页面,继续执行后续返回操作")
return True
except Exception as e:
logging.error(f"设备 {device_id} 处理返回导航时发生错误: {str(e)}")
return False
def handle_adjustment_result_dialog(self):
"""处理平差结果确认弹窗"""
try:
logging.info("开始检测平差结果弹窗")
# 等待弹窗出现最多等待5秒
warning_dialog = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
)
# 验证弹窗内容
alert_title = warning_dialog.find_element(AppiumBy.ID, "android:id/alertTitle")
alert_message = warning_dialog.find_element(AppiumBy.ID, "android:id/message")
logging.info(f"检测到弹窗 - 标题: {alert_title.text}, 消息: {alert_message.text}")
# 确认是目标弹窗
if "警告" in alert_title.text and "是否保留测量成果" in alert_message.text:
logging.info("确认是平差结果确认弹窗")
# 点击"是 保留成果"按钮
yes_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button1")
if yes_button.text == "是 保留成果":
yes_button.click()
logging.info("已点击'是 保留成果'按钮")
# 等待弹窗消失
WebDriverWait(self.driver, 5).until(
EC.invisibility_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
)
logging.info("弹窗已关闭")
return True
else:
logging.error(f"按钮文本不匹配,期望'是 保留成果',实际: {yes_button.text}")
return False
else:
logging.warning("弹窗内容不匹配,不是目标弹窗")
return False
except TimeoutException:
logging.info("未检测到平差结果弹窗,继续流程")
return True # 没有弹窗也是正常情况
except Exception as e:
logging.error(f"处理平差结果弹窗时出错: {str(e)}")
return False
def check_measurement_list(self, device_id):
"""
检查是否存在测量列表
Args:
device_id: 设备ID
Returns:
bool: 如果不存在测量列表返回True存在返回False
"""
try:
# 等待线路列表容器出现
self.wait.until(
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
)
logging.info("线路列表容器已找到")
# 如果存在MEASURE_LIST_ID说明有测量列表不需要执行后续步骤
logging.info(f"设备 {device_id} 存在测量列表,无需执行后续返回操作")
return False
except TimeoutException:
# 等待超时,说明没有测量列表
logging.info(f"设备 {device_id} 未找到测量列表,可以继续执行后续步骤")
return True
except Exception as e:
logging.error(f"设备 {device_id} 检查测量列表时发生错误: {str(e)}")
return True
def find_keyword(self, fixed_filename):
"""查找指定关键词并点击,支持向下和向上滑动查找"""
try:
# if not check_session_valid(self.driver, self.device_id):
# logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
# if not reconnect_driver(self.device_id, self.driver):
# logging.error(f"设备 {self.device_id} 驱动重连失败")
# 等待线路列表容器出现
self.wait.until(
EC.presence_of_element_located((AppiumBy.ID, ids.MEASURE_LIST_ID))
)
logging.info("线路列表容器已找到")
max_scroll_attempts = 100 # 最大滚动尝试次数
scroll_count = 0
previous_items = set() # 记录前一次获取的项目集合,用于检测是否到达边界
# 首先尝试向下滑动查找
while scroll_count < max_scroll_attempts:
# 获取当前页面中的所有项目
current_items = self.get_current_items()
logging.info(f"当前页面找到 {len(current_items)} 个项目: {current_items}")
# 检查目标文件是否在当前页面中
if fixed_filename in current_items:
logging.info(f"找到目标文件: {fixed_filename}")
# 点击目标文件
if self.click_item_by_text(fixed_filename):
return True
else:
logging.error(f"点击目标文件失败: {fixed_filename}")
return False
# 检查是否到达底部:连续两次获取的项目相同
if current_items == previous_items and len(current_items) > 0:
logging.info("连续两次获取的项目相同,已到达列表底部")
break
# 更新前一次项目集合
previous_items = current_items.copy()
# 向下滑动列表以加载更多项目
if not self.scroll_list(direction="down"):
logging.error("向下滑动列表失败")
return False
scroll_count += 1
logging.info(f"{scroll_count} 次向下滑动,继续查找...")
# 如果向下滑动未找到,尝试向上滑动查找
logging.info("向下滑动未找到目标,开始向上滑动查找")
# 重置滚动计数
scroll_count = 0
while scroll_count < max_scroll_attempts:
# 向上滑动列表
# 如果返回False说明已经滑动到顶
if not self.scroll_list(direction="up"):
# 检查是否是因为滑动到顶而返回False
if "已滑动到列表顶部" in logging.handlers[0].buffer[-1].message:
logging.info("已滑动到列表顶部,停止向上滑动")
break
else:
logging.error("向上滑动列表失败")
return False
# 获取当前页面中的所有项目
current_items = self.get_current_items()
# logging.info(f"向上滑动后找到 {len(current_items)} 个项目: {current_items}")
# 检查目标文件是否在当前页面中
if fixed_filename in current_items:
logging.info(f"找到目标文件: {fixed_filename}")
# 点击目标文件
if self.click_item_by_text(fixed_filename):
return True
else:
logging.error(f"点击目标文件失败: {fixed_filename}")
return False
scroll_count += 1
logging.info(f"{scroll_count} 次向上滑动,继续查找...")
logging.warning(f"经过 {max_scroll_attempts * 2} 次滑动仍未找到目标文件")
return False
except TimeoutException:
logging.error("等待线路列表元素超时")
return False
except Exception as e:
logging.error(f"查找关键词时出错: {str(e)}")
return False
def get_current_items(self):
"""获取当前页面中的所有项目文本"""
try:
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
item_texts = []
for item in items:
try:
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
if title_element and title_element.text:
item_texts.append(title_element.text)
except NoSuchElementException:
continue
return item_texts
except Exception as e:
logging.error(f"获取当前项目失败: {str(e)}")
return []
def click_item_by_text(self, text):
"""点击指定文本的项目"""
try:
# 查找包含指定文本的项目
items = self.driver.find_elements(AppiumBy.ID, ids.MEASURE_LISTVIEW_ID)
for item in items:
try:
title_element = item.find_element(AppiumBy.ID, ids.MEASURE_NAME_TEXT_ID)
if title_element and title_element.text == text:
title_element.click()
logging.info(f"已点击项目: {text}")
return True
except NoSuchElementException:
continue
logging.warning(f"未找到可点击的项目: {text}")
return False
except Exception as e:
logging.error(f"点击项目失败: {str(e)}")
return False
def handle_measurement_dialog(self):
"""处理测量弹窗 - 选择继续测量"""
try:
logging.info("检查测量弹窗...")
# 直接尝试点击"继续测量"按钮
continue_btn = WebDriverWait(self.driver, 2).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/measure_continue_btn"))
)
continue_btn.click()
logging.info("已点击'继续测量'按钮")
return True
except TimeoutException:
logging.info("未找到继续测量按钮,可能没有弹窗")
return True # 没有弹窗也认为是成功的
except Exception as e:
logging.error(f"点击继续测量按钮时出错: {str(e)}")
return False
# 检查有没有平差处理按钮
def check_apply_btn(self):
"""检查是否有平差处理按钮"""
try:
apply_btn = WebDriverWait(self.driver, 1).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn"))
)
if apply_btn.is_displayed():
logging.info("进入平差页面")
else:
logging.info("没有找到'平差处理'按钮")
return True
except TimeoutException:
logging.info("未找到平差处理按钮")
return False # 没有弹窗也认为是成功的
except Exception as e:
logging.error(f"点击平差处理按钮时出错: {str(e)}")
return False
def execute_back_navigation_steps(self, device_id):
"""
执行实际的返回导航步骤
Args:
device_id: 设备ID
Returns:
bool: 导航是否成功
"""
try:
# 1. 首先点击返回按钮
if not self.click_back_button(device_id):
logging.error(f"设备 {device_id} 点击返回按钮失败")
return False
# 2. 处理返回确认弹窗
logging.info(f"已点击返回按钮,等待处理返回确认弹窗")
if not self.handle_confirmation_dialog(device_id):
logging.error(f"设备 {device_id} 处理返回确认弹窗失败")
# return False
# 3. 验证是否成功返回到上一页面
time.sleep(0.5) # 等待页面跳转完成
# 可以添加页面验证逻辑,比如检查是否返回到预期的页面
# 这里可以根据实际应用添加特定的页面元素验证
logging.info(f"设备 {device_id} 返回导航流程完成")
return True
except Exception as e:
logging.error(f"设备 {device_id} 执行返回导航步骤时发生错误: {str(e)}")
return False
def click_back_button(self, device_id):
"""点击手机系统返回按钮"""
try:
self.driver.back()
logging.info("已点击手机系统返回按钮")
return True
except Exception as e:
logging.error(f"点击手机系统返回按钮失败: {str(e)}")
return False
def handle_confirmation_dialog(self, device_id, timeout=2):
"""
处理确认弹窗,点击""按钮
Args:
device_id: 设备ID
timeout: 等待弹窗的超时时间
Returns:
bool: 是否成功处理弹窗
"""
# 等待弹窗出现最多等待2秒
try:
max_attempts = 2
for attempt in range(max_attempts):
try:
dialog_message = WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((AppiumBy.XPATH, "//android.widget.TextView[@text='是否退出测量界面?']"))
)
logging.info(f"设备 {device_id} 检测到确认弹窗 (第 {attempt + 1} 次)")
# 查找并点击"是"按钮
confirm_button = self.driver.find_element(
AppiumBy.XPATH,
"//android.widget.Button[@text='' and @resource-id='android:id/button1']"
)
if confirm_button.is_displayed() and confirm_button.is_enabled():
logging.info(f"设备 {device_id} 点击确认弹窗的''按钮 (第 {attempt + 1} 次)")
confirm_button.click()
time.sleep(0.5)
# 如果是第一次尝试,继续检查是否还有弹窗
if attempt < max_attempts - 1:
logging.info(f"设备 {device_id} 等待 1 秒后检查是否还有弹窗")
time.sleep(0.5)
continue
return True
else:
logging.error(f"设备 {device_id} ''按钮不可点击")
return False
except TimeoutException:
# 超时未找到弹窗,认为没有弹窗,返回成功
logging.info(f"设备 {device_id} 等待 {timeout} 秒未发现确认弹窗,可能没有弹窗,返回成功")
return True
except Exception as e:
logging.error(f"设备 {device_id} 处理确认弹窗时出错: {str(e)}")
return False
def scroll_list_to_bottom(self, device_id, max_swipes=60):
"""
下滑列表到最底端
Args:
device_id: 设备ID
max_swipes: 最大下滑次数
Returns:
bool: 是否滑动到底部
"""
try:
logging.info(f"设备 {device_id} 开始下滑列表到底部")
# 获取列表元素
list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list")
logging.info(f"时间戳1: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
same_content_count = 0
# 初始化第一次的子元素文本
initial_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView")
logging.info(f"时间戳2: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
# current_child_texts = "|".join([
# elem.text.strip() for elem in initial_child_elements
# if elem.text and elem.text.strip()
# ])
current_child_texts = "|".join(
elem_text.strip() for elem in initial_child_elements
if (elem_text := elem.text) and elem_text.strip()
)
logging.info(f"时间戳3: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
for i in range(max_swipes):
# 执行下滑操作
self.driver.execute_script("mobile: scrollGesture", {
'elementId': list_view.id,
'direction': 'down',
'percent': 0.8,
'duration': 500
})
# 获取滑动后的子元素文本
new_child_elements = list_view.find_elements(AppiumBy.CLASS_NAME, "android.widget.TextView")
new_child_texts = "|".join(
elem_text.strip() for elem in new_child_elements
if (elem_text := elem.text) and elem_text.strip()
)
# 判断内容是否变化若连续3次相同认为到达底部
if new_child_texts == current_child_texts:
same_content_count += 1
if same_content_count >= 2:
logging.info(f"设备 {device_id} 列表已滑动到底部,共滑动 {i+1}")
return True
else:
same_content_count = 0 # 内容变化,重置计数
current_child_texts = new_child_texts # 更新上一次内容
logging.debug(f"设备 {device_id}{i+1} 次下滑完成,当前子元素文本: {new_child_texts[:50]}...") # 打印部分文本
logging.warning(f"设备 {device_id} 达到最大下滑次数 {max_swipes},可能未完全到底部")
return True
except Exception as e:
logging.error(f"设备 {device_id} 下滑列表时发生错误: {str(e)}")
return False
def click_last_spinner_with_retry(self, device_id, max_retries=2):
"""带重试机制的点击方法"""
for attempt in range(max_retries):
try:
if self.click_last_spinner(device_id):
return True
logging.warning(f"设备 {device_id}{attempt + 1}次点击失败,准备重试")
time.sleep(0.5) # 重试前等待
except Exception as e:
logging.error(f"设备 {device_id}{attempt + 1}次尝试失败: {str(e)}")
logging.error(f"设备 {device_id} 所有重试次数已用尽")
return False
def click_last_spinner(self, device_id):
"""
点击最后一个spinner
Args:
device_id: 设备ID
Returns:
bool: 是否成功点击
"""
try:
logging.info(f"设备 {device_id} 查找最后一个spinner")
# 查找所有的spinner元素
spinners = self.driver.find_elements(AppiumBy.ID, "com.bjjw.cjgc:id/spinner")
if not spinners:
logging.error(f"设备 {device_id} 未找到任何spinner元素")
return False
# 获取最后一个spinner
last_spinner = spinners[-1]
if not (last_spinner.is_displayed() and last_spinner.is_enabled()):
logging.error(f"设备 {device_id} 最后一个spinner不可点击")
return False
# 点击操作
logging.info(f"设备 {device_id} 点击最后一个spinner")
last_spinner.click()
# 执行额外一次下滑操作
self.scroll_down_once(device_id)
max_retries = 3 # 最大重试次数
retry_count = 0
wait_timeout = 5 # 增加等待时间到5秒
while retry_count < max_retries:
try:
# 确保device_id正确设置使用全局变量作为备用
if not hasattr(self, 'device_id') or not self.device_id:
# 优先使用传入的device_id其次使用全局变量
self.device_id = device_id if device_id else global_variable.get_device_id()
# 使用self.device_id确保有默认值
actual_device_id = self.device_id if self.device_id else global_variable.get_device_id()
if not driver_utils.check_session_valid(self.driver, actual_device_id):
logging.warning(f"设备 {actual_device_id} 会话无效,尝试重新连接驱动...")
try:
# 使用正确的设备ID进行重连
new_driver, new_wait = reconnect_driver(actual_device_id, self.driver)
if new_driver:
self.driver = new_driver
self.wait = new_wait
logging.info(f"设备 {actual_device_id} 驱动重连成功")
else:
logging.error(f"设备 {actual_device_id} 驱动重连失败")
retry_count += 1
continue
except Exception as e:
logging.error(f"设备 {actual_device_id} 驱动重连异常: {str(e)}")
retry_count += 1
continue
# 点击spinner如果是重试需要重新获取元素
if retry_count > 0:
spinners = self.driver.find_elements(AppiumBy.CLASS_NAME, "android.widget.Spinner")
if not spinners:
logging.error(f"设备 {device_id} 未找到spinner元素")
retry_count += 1
continue
last_spinner = spinners[-1]
if not (last_spinner.is_displayed() and last_spinner.is_enabled()):
logging.error(f"设备 {device_id} spinner不可点击")
retry_count += 1
continue
logging.info(f"设备 {device_id} 重新点击spinner")
last_spinner.click()
# 重试时也执行下滑操作
self.scroll_down_once(device_id)
# 等待下拉菜单出现增加等待时间到5秒
wait = WebDriverWait(self.driver, wait_timeout)
detail_show = wait.until(
EC.presence_of_element_located((AppiumBy.ID, "com.bjjw.cjgc:id/detailshow"))
)
if detail_show.is_displayed():
logging.info(f"设备 {device_id} spinner点击成功下拉菜单已展开")
return True
else:
logging.error(f"设备 {device_id} 下拉菜单未显示")
retry_count += 1
continue
except Exception as wait_error:
error_msg = str(wait_error)
logging.error(f"设备 {device_id} 等待下拉菜单超时 (第{retry_count+1}次尝试): {error_msg}")
# 检查是否是连接断开相关的错误
if not driver_utils.check_session_valid(self.driver, self.device_id):
logging.warning(f"设备 {self.device_id} 会话无效,尝试重新连接驱动...")
# if any(keyword in error_msg for keyword in ['socket hang up', 'Could not proxy command']):
# logging.warning(f"设备 {device_id} 检测到连接相关错误,尝试重连...")
if not reconnect_driver(self.device_id, self.driver):
logging.error(f"设备 {device_id} 驱动重连失败")
retry_count += 1
if retry_count < max_retries:
logging.info(f"设备 {device_id} 将在1秒后进行第{retry_count+1}次重试")
time.sleep(1) # 等待1秒后重试
logging.error(f"设备 {device_id} 经过{max_retries}次重试后仍无法展开下拉菜单")
return False
except Exception as e:
logging.error(f"设备 {device_id} 点击最后一个spinner时发生错误: {str(e)}")
return False
def scroll_down_once(self, device_id):
"""
再次下滑一次
Args:
device_id: 设备ID
Returns:
bool: 是否成功下滑
"""
try:
logging.info(f"设备 {device_id} 执行额外一次下滑")
# 获取列表元素
list_view = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/auto_data_list")
# 执行下滑操作
self.driver.execute_script("mobile: scrollGesture", {
'elementId': list_view.id,
'direction': 'down',
'percent': 0.5
})
time.sleep(0.2)
logging.info(f"设备 {device_id} 额外下滑完成")
return True
except Exception as e:
logging.error(f"设备 {device_id} 额外下滑时发生错误: {str(e)}")
return False
def take_screenshot(self):
"""
通过Appium驱动截取设备屏幕
参数:
filename_prefix: 断点名称
返回:
bool: 操作是否成功
"""
try:
# 获取项目名称
project_name = global_variable.GLOBAL_USERNAME or "用户名"
filename_prefix = global_variable.GLOBAL_CURRENT_PROJECT_NAME or "平差页面截图"
# 获取当前日期
date_str = datetime.now().strftime("%Y%m%d")
# if not date_str:
# date_str = datetime.now().strftime("%Y%m%d")
# 获取当前时间(如果没有提供),并确保格式合法(不含冒号)
time_str = datetime.now().strftime("%H%M%S")
# 创建D盘下的截图目录结构D:\uploadInfo\picture\项目名\年月日
screenshots_dir = os.path.join("D:\\", "uploadInfo", "picture", project_name, date_str)
# 确保目录存在
try:
os.makedirs(screenshots_dir, exist_ok=True)
logging.info(f"截图目录: {screenshots_dir}")
except Exception as dir_error:
logging.error(f"创建截图目录失败: {str(dir_error)}")
return False
line_code = global_variable.GLOBAL_LINE_NUM
if not line_code:
logging.error(f"未找到与断点名称 {filename_prefix} 对应的线路编码")
line_code = "unknown"
# 截图保存
screenshot_file = os.path.join(
screenshots_dir,
f"{line_code}_{filename_prefix}_{time_str}.png"
)
# 尝试保存截图
try:
success = self.driver.save_screenshot(screenshot_file)
if success:
logging.info(f"截图已保存: {screenshot_file}")
# 验证文件是否真的存在
if os.path.exists(screenshot_file):
logging.info(f"截图文件验证存在: {screenshot_file}")
else:
logging.warning(f"截图文件保存成功但验证不存在: {screenshot_file}")
return True
else:
logging.error(f"Appium截图保存失败: {screenshot_file}")
return False
except Exception as save_error:
logging.error(f"保存截图时发生错误: {str(save_error)}")
return False
except Exception as e:
logging.error(f"截图时发生错误: {str(e)}")
return False
def get_excel_from_url(url):
"""
从URL获取Excel文件并解析为字典
Excel只有一列数据A列每行是站点值
Args:
url: Excel文件的URL地址
Returns:
dict: 解析后的站点数据字典 {行号: 值}失败返回None
"""
try:
print(f"正在从URL获取数据: {url}")
response = requests.get(url, timeout=30)
response.raise_for_status() # 检查请求是否成功
# 使用pandas读取Excel数据指定没有表头只读第一个sheet
excel_data = pd.read_excel(
BytesIO(response.content),
header=None, # 没有表头
sheet_name=0, # 只读取第一个sheet
dtype=str # 全部作为字符串读取
)
station_dict = {}
# 解析Excel数据使用行号+1作为站点编号A列的值作为站点值
print("解析Excel数据使用行号作为站点编号...")
for index, row in excel_data.iterrows():
station_num = index + 1 # 行号从1开始作为站点编号
station_value = str(row[0]).strip() if pd.notna(row[0]) else ""
if station_value: # 只保存非空值
station_dict[station_num] = station_value
print(f"成功解析Excel{len(station_dict)}条数据")
return station_dict
except requests.exceptions.RequestException as e:
print(f"请求URL失败: {e}")
return None
except Exception as e:
print(f"解析Excel失败: {e}")
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='check_upload 脚本 - 执行上传检查流程')
parser.add_argument('--user_name', type=str, default='', help='用户名')
parser.add_argument('--line_name', type=str, default='', help='项目/线路名称')
parser.add_argument('--line_num', type=str, default='', help='线路编码')
parser.add_argument('--account_id', type=int, default=0, help='账号ID')
args = parser.parse_args()
# 设置全局变量
global_variable.GLOBAL_USERNAME = args.user_name
global_variable.GLOBAL_CURRENT_PROJECT_NAME = args.line_name
global_variable.GLOBAL_LINE_NUM = args.line_num
global_variable.GLOBAL_ACCOUNT_ID = args.account_id
logging.info(f"全局变量已设置: user_name={global_variable.GLOBAL_USERNAME}, "
f"line_name={global_variable.GLOBAL_CURRENT_PROJECT_NAME}, "
f"line_num={global_variable.GLOBAL_LINE_NUM}, "
f"account_id={global_variable.GLOBAL_ACCOUNT_ID}")
check_upload = checkUpload()
success = check_upload.run()
if success:
print("上传检查执行成功")
else:
print("上传检查执行失败")