Files
cjgc_data/page_objects/upload_config_page.py
2026-03-12 17:03:56 +08:00

2227 lines
102 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#上传配置页面
# \page_objects\test_upload_config_page.py
from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import logging
import time
import os
import re
import pandas as pd
from datetime import datetime
from typing import Dict, Optional, List
from page_objects.more_download_page import MoreDownloadPage
from globals.driver_utils import check_session_valid, reconnect_driver, go_main_click_tabber_button # 导入会话检查和重连函数
import globals.apis as apis
import globals.global_variable as global_variable
class UploadConfigPage:
def __init__(self, driver, wait, device_id):
self.driver = driver
self.wait = wait
self.logger = logging.getLogger(__name__)
self.more_download_page = MoreDownloadPage(driver, wait,device_id)
self.device_id = device_id
def go_upload_config_page(self):
"""点击img_2_layout上传页面按钮"""
try:
# 在执行操作前检查会话是否有效
if not check_session_valid(self.driver, self.device_id):
self.logger.warning("会话已失效,尝试重新连接...")
self.driver, self.wait = reconnect_driver(self.device_id, self.driver)
self.logger.info("重新连接成功")
# 首先获取当前页面信息进行调试
try:
current_activity = self.driver.current_activity
self.logger.info(f"当前Activity: {current_activity}")
except Exception as e:
self.logger.error(f"获取当前activity时出错: {str(e)}")
# 尝试返回到主页面(如果不在主页面)
self.logger.info("尝试返回到主页面...")
max_back_presses = 5 # 最多按返回键次数
back_press_count = 0
while back_press_count < max_back_presses:
try:
# 检查是否已经在主页面(通过检查主页面特征元素)
# 先尝试查找上传页面按钮
try:
main_page_indicator = self.driver.find_element(
AppiumBy.ID, "com.bjjw.cjgc:id/img_2_layout"
)
if main_page_indicator.is_displayed():
self.logger.info("已在主页面,找到上传按钮")
break
except:
# 未找到主页面元素,继续返回
pass
# 按返回键
self.driver.back()
back_press_count += 1
self.logger.info(f"已按返回键 {back_press_count}")
time.sleep(1) # 等待页面响应
except Exception as e:
self.logger.error(f"按返回键时出错: {str(e)}")
break
# 现在尝试点击上传页面按钮
self.logger.info("尝试点击上传页面按钮")
try:
# 使用较短的等待时间,因为我们需要快速响应
upload_page_btn = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/img_2_layout"))
)
upload_page_btn.click()
self.logger.info("已点击img_2_layout进入上传页面")
time.sleep(2) # 增加等待时间确保页面加载完成
return True
except TimeoutException:
self.logger.warning("快速查找上传按钮超时,尝试使用更长的等待时间")
# 使用更长的等待时间再次尝试
upload_page_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/img_2_layout"))
)
upload_page_btn.click()
self.logger.info("已点击img_2_layout进入上传页面")
time.sleep(2) # 增加等待时间确保页面加载完成
return True
except TimeoutException:
self.logger.error("等待上传页面按钮可点击超时")
return False
except Exception as e:
self.logger.error(f"点击上传页面按钮时出错: {str(e)}")
return False
def check_change_amount_on_page(self):
"""直接检查上传配置页面中是否包含变化量属性"""
try:
# 查找页面中是否包含"变化量"文本
change_amount_elements = self.driver.find_elements(
AppiumBy.XPATH,
"//*[contains(@text, '变化量')]"
)
# 如果找到包含"变化量"文本的元素返回True
if change_amount_elements:
self.logger.info("页面中包含变化量属性")
return True
else:
self.logger.info("页面中未找到变化量属性")
return False
except Exception as e:
self.logger.error(f"检查变化量属性时出错: {str(e)}")
return False
def click_upload_by_breakpoint_name(self, breakpoint_name):
"""根据断点名称点击上传按钮"""
logging.info(f"需要点击上传按钮,断点名称:{breakpoint_name}")
try:
search_text = ""
# 提取关键部分进行模糊匹配
if breakpoint_name.endswith('平原') and '-' in breakpoint_name:
# 从右边分割一次,取第一部分(去掉末尾的“-xxx”
search_text = breakpoint_name.rsplit('-', 1)[0]
self.logger.info(f"处理后搜索文本:{search_text}")
else:
search_text = breakpoint_name
# 找到包含指定断点名称的itemContainer
item_container_xpath = f"//android.widget.LinearLayout[@resource-id='com.bjjw.cjgc:id/itemContainer']//android.widget.TextView[@resource-id='com.bjjw.cjgc:id/title' and @text='{search_text}']/ancestor::android.widget.LinearLayout[@resource-id='com.bjjw.cjgc:id/itemContainer']"
# 等待itemContainer出现
item_container = self.wait.until(
EC.presence_of_element_located((AppiumBy.XPATH, item_container_xpath))
)
# self.logger.info(f"找到包含断点 {breakpoint_name} 的itemContainer")
# 在itemContainer中查找上传按钮
upload_btn = item_container.find_element(
AppiumBy.ID,
"com.bjjw.cjgc:id/upload_btn"
)
# 点击上传按钮
upload_btn.click()
self.logger.info(f"已点击断点 {breakpoint_name} 的上传按钮")
# 等待上传操作开始
time.sleep(3)
# # 检查上传是否开始
# try:
# upload_indicator = WebDriverWait(self.driver, 20).until(
# EC.presence_of_element_located((AppiumBy.XPATH, "//*[contains(@text, '上传') or contains(@text, 'Upload')]"))
# )
# self.logger.info(f"上传操作已开始: {upload_indicator.text}")
# except TimeoutException:
# self.logger.warning("未检测到明确的上传开始提示,但按钮点击已完成")
return True
except TimeoutException:
self.logger.error(f"等待断点 {breakpoint_name} 的上传按钮可点击超时")
return False
except Exception as e:
self.logger.error(f"根据断点名称点击上传按钮时出错: {str(e)}")
return False
def handle_upload_dialog(self):
"""处理上传弹窗,点击已同步按钮"""
try:
# 等待弹窗出现
# time.sleep(2)
# 检查弹窗是否出现
dialog_indicators = [
(AppiumBy.ID, "android:id/alertTitle"),
(AppiumBy.XPATH, "//android.widget.TextView[@text='提示']"),
(AppiumBy.ID, "android:id/message")
]
dialog_appeared = False
for by, value in dialog_indicators:
try:
element = self.driver.find_element(by, value)
if element.is_displayed():
dialog_appeared = True
self.logger.info("检测到上传确认弹窗")
break
except:
continue
if not dialog_appeared:
self.logger.warning("未检测到上传确认弹窗,可能不需要确认")
return True
# 点击"已同步"按钮
synced_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "android:id/button2"))
)
synced_btn.click()
self.logger.info("已点击'已同步'按钮")
# 等待弹窗消失
time.sleep(2)
# 检查弹窗是否消失
try:
# 检查弹窗是否还存在
dialog_still_exists = False
for by, value in dialog_indicators:
try:
element = self.driver.find_element(by, value)
if element.is_displayed():
dialog_still_exists = True
break
except:
continue
if not dialog_still_exists:
self.logger.info("上传确认弹窗已消失")
return True
else:
self.logger.warning("上传确认弹窗仍然存在")
return False
except Exception as e:
self.logger.info("上传确认弹窗可能已消失")
return True
except TimeoutException:
self.logger.error("等待上传对话框可点击超时")
return False
except Exception as e:
self.logger.error(f"处理上传弹窗时出错: {str(e)}")
return False
def execute_download_operation(self):
"""执行下载操作按返回键并点击img_5_layout"""
try:
# 按一次返回键
self.driver.back()
self.logger.info("已按返回键")
time.sleep(1)
# 点击img_5_layout更多下载按钮
more_download_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/img_5_layout"))
)
more_download_btn.click()
self.logger.info("已点击更多下载按钮")
# 等待页面加载
time.sleep(1)
# 调用更多下载页面的方法对象
try:
logging.info(f"设备开始执行更多下载页面测试")
# 执行更多下载页面管理操作
success = self.more_download_page.more_download_page_manager()
if success:
logging.info(f"设备更多下载页面测试执行成功")
# 按一次返回键
self.driver.back()
self.logger.info("已按返回键,返回到更多下载页面,准备点击上传导航按钮")
return True
else:
logging.error(f"设备更多下载页面测试执行失败")
return False
except Exception as e:
logging.error(f"设备运行更多下载页面测试时出错: {str(e)}")
# self.take_screenshot("more_download_test_error.png")
return False
except TimeoutException:
self.logger.error("等待下载操作元素可点击超时")
return False
except Exception as e:
self.logger.error(f"执行下载操作时出错: {str(e)}")
return False
def get_point_data(self):
"""
获取三个测点的数据
"""
point_data = []
try:
# 等待页面加载
time.sleep(1)
# 方法1: 通过resource-id获取所有测点名称
point_name_elements = self.driver.find_elements(
AppiumBy.ID,
'com.bjjw.cjgc:id/improve_point_name'
)
# 方法2: 通过text内容获取测点数据元素
point_value_elements = self.driver.find_elements(
AppiumBy.ID,
'com.bjjw.cjgc:id/point_values'
)
self.logger.info(f"找到 {len(point_name_elements)} 个测点")
self.logger.info(f"找到 {len(point_value_elements)} 个数据元素")
# 提取每个测点的数据
for i, (name_element, value_element) in enumerate(zip(point_name_elements, point_value_elements)):
point_info = {}
# 获取测点名称
point_name = name_element.text
point_info['point_name'] = point_name
# 获取测点数据
point_value = value_element.text
point_info['point_value'] = point_value
# 解析详细数据
try:
# 解析数据格式: "本期:13679.07623m; \n上期:13679.07621m; \n变化量:0.02mm; \n测量时间:2025-10-10 15:47:25.049"
data_parts = point_value.split(';')
parsed_data = {}
for part in data_parts:
part = part.strip()
if '本期:' in part:
parsed_data['current_value'] = part.replace('本期:', '').strip()
elif '上期:' in part:
parsed_data['previous_value'] = part.replace('上期:', '').strip()
elif '变化量:' in part:
parsed_data['change_amount'] = part.replace('变化量:', '').strip()
elif '测量时间:' in part:
parsed_data['measurement_time'] = part.replace('测量时间:', '').strip()
point_info['parsed_data'] = parsed_data
except Exception as e:
self.logger.warning(f"解析数据时出错: {e}")
point_info['parsed_data'] = {}
point_data.append(point_info)
self.logger.info(f"测点 {i+1}: {point_name}")
# self.logger.info(f"完整数据: {point_value}")
except Exception as e:
self.logger.error(f"获取数据时出错: {e}")
return point_data
def get_specific_point_data(self, point_name):
"""
获取特定测点的数据
"""
try:
# 通过文本内容查找特定测点
point_name_element = self.driver.find_element(
AppiumBy.XPATH,
f'//android.widget.TextView[@resource-id="com.bjjw.cjgc:id/improve_point_name" and @text="{point_name}"]'
)
# 找到对应的数据元素 - 可能需要根据实际结构调整XPath
point_value_element = point_name_element.find_element(
AppiumBy.XPATH,
'./following::android.widget.TextView[@resource-id="com.bjjw.cjgc:id/point_values"]'
)
return point_value_element.text
except Exception as e:
self.logger.error(f"获取特定测点 {point_name} 数据时出错: {e}")
return None
def swipe_up(self, start_y=1500, end_y=300, duration=500):
"""
从指定起始Y坐标滑动到结束Y坐标向上滑动页面
参数:
start_y: 起始Y坐标 (默认1500)
end_y: 结束Y坐标 (默认300)
duration: 滑动持续时间(毫秒) (默认500)
"""
try:
# 直接使用固定的X坐标屏幕中间
# 避免调用 get_window_size() 方法,防止 Appium 服务器崩溃
x = 540 # 假设屏幕宽度为 1080取中间值
# 执行滑动操作
self.driver.swipe(x, start_y, x, end_y, duration)
self.logger.info(f"页面已从Y:{start_y}滑动到Y:{end_y}")
# 滑动后等待页面稳定
time.sleep(1)
return True
except Exception as e:
self.logger.error(f"滑动页面时出错: {e}")
return False
def swipe_down(self, start_y=175, end_y=1310, duration=500):
"""
从指定起始Y坐标滑动到结束Y坐标向下滑动页面
参数:
start_y: 起始Y坐标 (默认407)
end_y: 结束Y坐标 (默认1617)
duration: 滑动持续时间(毫秒) (默认500)
"""
try:
# 直接使用固定的X坐标屏幕中间
# 避免调用 get_window_size() 方法,防止 Appium 服务器崩溃
x = 540 # 假设屏幕宽度为 1080取中间值
# 执行滑动操作(向下滑动)
self.driver.swipe(x, start_y, x, end_y, duration)
self.logger.info(f"页面已从Y:{start_y}滑动到Y:{end_y}(向下滑动)")
# 滑动后等待页面稳定
time.sleep(1)
return True
except Exception as e:
self.logger.error(f"向下滑动页面时出错: {e}")
return False
def is_on_upload_config_page(self):
"""通过"保存上传"按钮来确定是否在上传配置页面"""
try:
# 使用"保存上传"按钮的resource-id来检查
save_upload_btn_locator = (AppiumBy.ID, "com.bjjw.cjgc:id/improve_save_btn")
self.wait.until(EC.presence_of_element_located(save_upload_btn_locator))
self.logger.info("已确认在上传配置页面")
return True
except TimeoutException:
self.logger.warning("未找到保存上传按钮,不在上传配置页面")
return False
except Exception as e:
self.logger.error(f"检查上传配置页面时发生意外错误: {str(e)}")
return False
def collect_all_point_data(self, results_dir):
"""循环滑动收集所有测点数据,直到没有新数据出现"""
all_point_data = []
seen_point_names = set() # 用于跟踪已经见过的测点名称
max_scroll_attempts = 20 # 最大滑动次数,防止无限循环
scroll_attempt = 0
self.logger.info("开始循环滑动收集所有测点数据...")
while scroll_attempt < max_scroll_attempts:
scroll_attempt += 1
self.logger.info(f"{scroll_attempt} 次尝试获取数据...")
# 获取当前屏幕的测点数据
current_point_data = self.get_point_data()
if not current_point_data:
self.logger.info("当前屏幕没有测点数据,停止滑动")
break
# 统计新发现的测点
new_points_count = 0
for point in current_point_data:
point_name = point.get('point_name')
if point_name and point_name not in seen_point_names:
# 新测点,添加到结果集
all_point_data.append(point)
seen_point_names.add(point_name)
new_points_count += 1
self.logger.info(f"本次获取到 {len(current_point_data)} 个测点,其中 {new_points_count} 个是新测点")
# 如果没有新数据,停止滑动
if new_points_count == 0:
self.logger.info("没有发现新测点,停止滑动")
break
# 滑动到下一页
self.logger.info("滑动到下一页...")
if not self.swipe_up():
self.logger.warning("滑动失败,停止收集")
break
# 等待页面稳定
time.sleep(1)
self.logger.info(f"数据收集完成,共获取 {len(all_point_data)} 个测点数据")
return all_point_data
def collect_check_all_point_data(self, max_variation):
"""循环滑动收集所有测点数据,直到没有新数据出现"""
all_point_data = []
seen_point_names = set() # 用于跟踪已经见过的测点名称
max_scroll_attempts = 100 # 最大滑动次数,防止无限循环
scroll_attempt = 0
self.logger.info("开始循环滑动收集所有测点数据...")
while scroll_attempt < max_scroll_attempts:
scroll_attempt += 1
self.logger.info(f"{scroll_attempt} 次尝试获取数据...")
# 获取当前屏幕的测点数据
current_point_data = self.get_point_data()
if not current_point_data:
self.logger.info("当前屏幕没有测点数据,停止滑动")
break
# 统计新发现的测点
new_points_count = 0
for point in current_point_data:
point_name = point.get('point_name')
if point_name and point_name not in seen_point_names:
# 新测点,添加到结果集
all_point_data.append(point)
seen_point_names.add(point_name)
new_points_count += 1
self.logger.info(f"本次获取到 {len(current_point_data)} 个测点,其中 {new_points_count} 个是新测点")
# 如果没有新数据,停止滑动
if new_points_count == 0:
self.logger.info("没有发现新测点,停止滑动")
break
# 滑动到下一页
self.logger.info("滑动到下一页...")
if not self.swipe_up():
self.logger.warning("滑动失败,停止收集")
break
# 等待页面稳定
time.sleep(0.2)
self.logger.info(f"数据收集完成,共获取 {len(all_point_data)} 个测点数据")
# 直接比对每个测点的变化量
if max_variation is None:
self.logger.error("获取用户最大变化量失败")
max_variation = 2
# return False
self.logger.info(f"开始比对测点变化量,最大允许变化量: {max_variation}mm")
for i, point in enumerate(all_point_data, 1):
point_name = point.get('point_name', '未知')
point_value = point.get('point_value', '')
# 从完整数据中提取变化量(格式如:变化量:-0.67mm;
change_amount_match = re.search(r'变化量:([-\d.]+)mm', point_value)
if change_amount_match:
try:
change_amount = float(change_amount_match.group(1))
# self.logger.info(f"测点 {point_name} 变化量: {change_amount}mm, 最大允许变化量: {max_variation}mm")
# 比较绝对值,因为变化量可能是负数
if abs(change_amount) > max_variation:
self.logger.error(f"测点 {point_name} 变化量 {change_amount}mm 超过最大允许值 {max_variation}mm")
return False
except ValueError as e:
self.logger.error(f"解析测点 {point_name} 的变化量失败: {str(e)},原始数据: {point_value}")
return False
else:
self.logger.error(f"在测点 {point_name} 的数据中未找到变化量信息,原始数据: {point_value}")
return False
self.logger.info(f"所有测点变化量均在允许范围内(≤{max_variation}mm")
return True
# def _load_user_data(self):
# """加载用户数据从Excel文件"""
# try:
# # 默认路径:当前脚本的上一级目录下的"上传人员信息.xlsx"
# current_dir = os.path.dirname(os.path.abspath(__file__))
# parent_dir = os.path.dirname(current_dir)
# excel_path = os.path.join(parent_dir, "上传人员信息.xlsx")
# if not os.path.exists(excel_path):
# logging.error(f"Excel文件不存在: {excel_path}")
# return False
# # 读取Excel文件
# df = pd.read_excel(excel_path, sheet_name='Sheet1')
# # 处理合并单元格 - 前向填充标段列
# df['标段'] = df['标段'].fillna(method='ffill')
# # 清理数据:去除空行和无效数据
# df = df.dropna(subset=['测量人员信息'])
# df = df[df['测量人员信息'].str.strip() != '']
# # 创建姓名到身份证的映射
# for _, row in df.iterrows():
# name = row['测量人员信息']
# id_card = str(row.iloc[4]).strip() # 第5列是身份证号
# # 处理身份证号格式(如果是浮点数转为整数)
# if id_card.endswith('.0'):
# id_card = id_card[:-2]
# global_variable.GLOBAL_NAME_TO_ID_MAP[name] = id_card
# global_variable.GLOBAL_NAME_TO_ID_MAP = df
# logging.info(f"成功加载用户数据,共 {len(df)} 条记录,{len(global_variable.GLOBAL_NAME_TO_ID_MAP)} 个唯一姓名")
# return True
# except Exception as e:
# logging.error(f"加载用户数据失败: {str(e)}")
# return False
def _load_user_data(self):
"""加载用户数据从Excel文件只提取名字和身份证到字典"""
try:
# 默认路径:当前脚本的上一级目录下的"上传人员信息.xlsx"
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
excel_path = os.path.join(parent_dir, "上传人员信息.xlsx")
if not os.path.exists(excel_path):
logging.error(f"Excel文件不存在: {excel_path}")
return False
# 读取Excel文件
df = pd.read_excel(excel_path, sheet_name='Sheet1')
# 处理合并单元格 - 前向填充标段列
# df['标段'] = df['标段'].fillna(method='ffill')
df['标段'] = df['标段'].ffill()
# 清理数据:去除空行和无效数据
df = df.dropna(subset=['测量人员信息'])
df = df[df['测量人员信息'].str.strip() != '']
# 创建姓名到身份证的映射字典
name_id_map = {}
for _, row in df.iterrows():
name = str(row['测量人员信息']).strip()
# 第5列是身份证号索引为4
id_card = str(row.iloc[4]).strip() if pd.notna(row.iloc[4]) else ""
# 处理身份证号格式(如果是浮点数转为整数)
if id_card.endswith('.0'):
id_card = id_card[:-2]
# 只将有效的姓名和身份证号添加到字典
if name and id_card and len(id_card) >= 15: # 身份证号至少15位
name_id_map[name] = id_card
else:
logging.warning(f"跳过无效数据: 姓名='{name}', 身份证='{id_card}'")
# 将字典保存到全局变量
global_variable.GLOBAL_NAME_TO_ID_MAP = name_id_map
logging.info(f"成功加载用户数据,共 {len(df)} 条记录,{len(name_id_map)} 个有效姓名-身份证映射")
# 打印前几个映射用于调试
sample_names = list(name_id_map.keys())[:5]
for name in sample_names:
logging.debug(f"映射示例: {name} -> {name_id_map[name]}")
return True
except Exception as e:
logging.error(f"加载用户数据失败: {str(e)}")
return False
def get_first_sjname_and_id(self, linecode: str, work_conditions: Dict) -> Optional[Dict]:
"""
获取线路的第一个数据员姓名和身份证号
Args:
linecode: 线路编码
Returns:
返回字典: {"name": 姓名, "id_card": 身份证号}
"""
if not work_conditions:
logging.error(f"无法获取线路 {linecode} 的工况信息")
return None
# 直接取第一个测点的数据员
first_point_info = next(iter(work_conditions.values()))
sjname = first_point_info.get('sjName')
if not sjname:
logging.error("第一个测点没有数据员信息")
return None
logging.info(f"使用第一个数据员: {sjname}")
# 获取身份证号码
id_card = global_variable.GLOBAL_NAME_TO_ID_MAP.get(sjname)
logging.info(f"id_card: {id_card}")
if not id_card:
logging.error(f"未找到数据员 {sjname} 对应的身份证号")
return None
return {"name": sjname, "id_card": id_card}
# def set_all_points_and_fill_form(self, results_dir, name, user_id, condition_dict):
# """点击设置所有测点按钮并填写表单"""
# try:
# self.logger.info("开始设置所有测点并填写表单...")
# # 点击"设置所有测点"按钮
# set_all_points_btn = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/display_setallpoint_btn"))
# )
# set_all_points_btn.click()
# self.logger.info("已点击'设置所有测点'按钮")
# # 等待表单加载
# time.sleep(1)
# # 填写司镜人员姓名
# name_input = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/sname"))
# )
# name_input.clear()
# name_input.send_keys(name)
# self.logger.info(f"已填写司镜人员姓名: {name}")
# # 填写司镜人员身份证号
# id_card_input = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/scard"))
# )
# id_card_input.clear()
# id_card_input.send_keys(user_id)
# self.logger.info(f"已填写司镜人员身份证号: {user_id}")
# # 选择测点状态
# if not self._select_point_status("正常"):
# self.logger.error("选择测点状态失败")
# return False
# # 选择工况信息
# if not self._select_work_condition(condition_dict):
# self.logger.error("选择工况信息失败")
# return False
# # 等待操作完成
# time.sleep(1)
# # 检查操作结果
# try:
# # 检查是否有成功提示或错误提示
# # 这里可以根据实际应用的上传反馈机制进行调整
# success_indicator = WebDriverWait(self.driver, 10).until(
# EC.presence_of_element_located((AppiumBy.XPATH, "//*[contains(@text, '成功') or contains(@text, '完成')]"))
# )
# self.logger.info(f"操作完成: {success_indicator.text}")
# except TimeoutException:
# self.logger.warning("未检测到明确的操作成功提示,但操作已完成")
# return True
# except TimeoutException:
# self.logger.error("等待设置所有测点表单元素可点击超时")
# return False
# except Exception as e:
# self.logger.error(f"设置所有测点并填写表单时出错: {str(e)}")
# # 保存错误截图
# error_screenshot_file = os.path.join(
# results_dir,
# f"form_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
# )
# self.driver.save_screenshot(error_screenshot_file)
# self.logger.info(f"错误截图已保存: {error_screenshot_file}")
# return False
def _select_point_status(self, status="正常"):
logging.info(f"开始选择测点状态: {status}")
"""选择测点状态"""
try:
# 点击"选择测点状态"按钮
status_button = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/all_point_pstate_sp"))
)
status_button.click()
# 选择指定的状态
status_option = self.wait.until(
EC.element_to_be_clickable((AppiumBy.XPATH, f"//android.widget.TextView[@resource-id='android:id/text1' and @text='{status}']"))
)
status_option.click()
return True
except TimeoutException:
self.logger.error("等待测点状态选择超时")
return False
except Exception as e:
self.logger.error(f"选择测点状态时出错: {str(e)}")
return False
# def _select_work_condition(self, condition_dict: Dict[str, List[str]]):
# logging.info(f"开始选择工况信息: {condition_dict}")
# """选择工况信息"""
# # 所有可能的工况选择按钮ID
# condition_button_ids = [
# "com.bjjw.cjgc:id/all_point_workinfo_sp_qiaoliang", # 梁桥
# "com.bjjw.cjgc:id/all_point_workinfo_sp_luji", # 路基
# "com.bjjw.cjgc:id/all_point_workinfo_sp_handong", # 涵洞
# "com.bjjw.cjgc:id/all_point_workinfo_sp_suidao" # 隧道
# ]
# try:
# # 遍历所有可能的工况选择按钮
# for button_id in condition_button_ids:
# try:
# # 点击"选择工况信息"按钮
# condition_button = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, button_id))
# )
# condition_button.click()
# logging.info(f"成功点击工况选择按钮: {button_id}")
# # 选择指定的工况选项
# if self._select_condition_option(condition):
# logging.info(f"成功选择工况: {condition} 对于按钮: {button_id}")
# else:
# logging.warning(f"未能选择工况: {condition} 对于按钮: {button_id}")
# # 继续尝试其他按钮,不立即返回失败
# except TimeoutException:
# logging.debug(f"未找到或无法点击工况选择按钮: {button_id},继续尝试下一个")
# continue
# except Exception as e:
# logging.warning(f"点击按钮 {button_id} 时出错: {str(e)},继续尝试下一个")
# continue
# return True
# except TimeoutException:
# self.logger.error("等待工况信息选择超时")
# return False
# except Exception as e:
# self.logger.error(f"选择工况信息时出错: {str(e)}")
# return False
# def _select_condition_option(self, condition, max_scroll_attempts=5):
# """选择具体的工况选项"""
# try:
# # 滑动查找指定的工况选项
# for attempt in range(max_scroll_attempts):
# try:
# # 尝试查找并点击指定的工况选项
# condition_option = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.XPATH,
# f"//android.widget.TextView[@resource-id='android:id/text1' and @text='{condition}']"))
# )
# condition_option.click()
# logging.info(f"成功选择工况选项: {condition}")
# return True
# except TimeoutException:
# logging.debug(f"第 {attempt + 1} 次查找未找到工况选项: {condition},尝试滑动")
# # 执行滑动操作
# self._scroll_to_find_condition()
# logging.error(f"经过 {max_scroll_attempts} 次滑动仍未找到工况选项: {condition}")
# return False
# except Exception as e:
# logging.error(f"选择工况选项时出错: {str(e)}")
# return False
# def set_all_points_and_fill_form(self, results_dir, name, user_id, condition_dict):
# """点击设置所有测点按钮并填写表单"""
# try:
# self.logger.info("开始设置所有测点并填写表单...")
# # 点击"设置所有测点"按钮
# set_all_points_btn = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/display_setallpoint_btn"))
# )
# set_all_points_btn.click()
# self.logger.info("已点击'设置所有测点'按钮")
# # 等待表单加载
# time.sleep(1)
# # 填写司镜人员姓名
# name_input = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/sname"))
# )
# name_input.clear()
# name_input.send_keys(name)
# self.logger.info(f"已填写司镜人员姓名: {name}")
# # 填写司镜人员身份证号
# id_card_input = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/scard"))
# )
# id_card_input.clear()
# id_card_input.send_keys(user_id)
# self.logger.info(f"已填写司镜人员身份证号: {user_id}")
# # 选择测点状态
# if not self._select_point_status("正常"):
# self.logger.error("选择测点状态失败")
# return False
# # 选择工况信息 - 传入condition_dict
# if not self._select_work_condition(condition_dict):
# self.logger.error("选择工况信息失败")
# return False
# # 等待操作完成
# time.sleep(1)
# # 检查操作结果
# try:
# # 检查是否有成功提示或错误提示
# success_indicator = WebDriverWait(self.driver, 10).until(
# EC.presence_of_element_located((AppiumBy.XPATH, "//*[contains(@text, '成功') or contains(@text, '完成')]"))
# )
# self.logger.info(f"操作完成: {success_indicator.text}")
# except TimeoutException:
# self.logger.warning("未检测到明确的操作成功提示,但操作已完成")
# return True
# except TimeoutException:
# self.logger.error("等待设置所有测点表单元素可点击超时")
# return False
# except Exception as e:
# self.logger.error(f"设置所有测点并填写表单时出错: {str(e)}")
# return False
def set_all_points_and_fill_form(self, results_dir, name, user_id, main_condition_dict, minor_conditions_list):
"""点击设置所有测点按钮并填写表单"""
try:
self.logger.info("开始设置所有测点并填写表单...")
# 点击"设置所有测点"按钮
set_all_points_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/display_setallpoint_btn"))
)
set_all_points_btn.click()
self.logger.info("已点击'设置所有测点'按钮")
# 等待表单加载
time.sleep(1)
# 填写司镜人员姓名
name_input = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/sname"))
)
name_input.clear()
name_input.send_keys(name)
self.logger.info(f"已填写司镜人员姓名: {name}")
# 填写司镜人员身份证号
id_card_input = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/scard"))
)
id_card_input.clear()
id_card_input.send_keys(user_id)
self.logger.info(f"已填写司镜人员身份证号: {user_id}")
# 选择测点状态
if not self._select_point_status("正常"):
self.logger.error("选择测点状态失败")
return False
# 选择工况信息 - 现在传入两个参数
if not self._select_work_condition(main_condition_dict, minor_conditions_list):
self.logger.error("选择工况信息失败")
return False
# 等待操作完成
time.sleep(1)
# 检查操作结果
try:
success_indicator = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((AppiumBy.XPATH, "//*[contains(@text, '成功') or contains(@text, '完成')]"))
)
self.logger.info(f"操作完成: {success_indicator.text}")
except TimeoutException:
self.logger.warning("未检测到明确的操作成功提示,但操作已完成")
return True
except TimeoutException:
self.logger.error("等待设置所有测点表单元素可点击超时")
return False
except Exception as e:
self.logger.error(f"设置所有测点并填写表单时出错: {str(e)}")
return False
# def _select_work_condition(self, condition_dict: Dict[str, List[str]]):
# """根据工况字典选择工况信息
# Args:
# condition_dict: 工况字典,格式为 {work_type: [workinfoname1, workinfoname2, ...]}
# work_type: 工点类型编码1-隧道2-区间路基3-桥, 4-涵洞)
# """
# self.logger.info(f"开始选择工况信息: {condition_dict}")
# # 工点类型编码与界面控件ID的映射
# work_type_mapping = {
# "1": { # 隧道
# "button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_suidao",
# "name": "隧道"
# },
# "2": { # 区间路基
# "button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_luji",
# "name": "路基"
# },
# "3": { # 桥
# "button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_qiaoliang",
# "name": "梁桥"
# },
# "4": { # 涵洞
# "button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_handong",
# "name": "涵洞"
# }
# }
# try:
# success_count = 0
# total_count = len(condition_dict)
# # 遍历condition_dict中的每个工点类型
# for work_type, workinfo_names in condition_dict.items():
# # 确保work_type是字符串类型与映射表匹配
# work_type_str = str(work_type).strip()
# if work_type_str not in work_type_mapping:
# self.logger.warning(f"未知的工点类型编码: {work_type_str},跳过")
# continue
# mapping = work_type_mapping[work_type_str]
# button_id = mapping["button_id"]
# work_type_name = mapping["name"]
# # 获取该工点类型的第一个工况名称(主要工况)
# if workinfo_names:
# workinfo_name = workinfo_names[0] # 使用第一个工况名称
# self.logger.info(f"为{work_type_name}({work_type_str})选择工况: {workinfo_name}")
# try:
# # 点击工况选择按钮
# condition_button = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, button_id))
# )
# condition_button.click()
# self.logger.info(f"成功点击{work_type_name}工况选择按钮")
# # 选择具体的工况选项
# if self._select_condition_option(workinfo_name):
# self.logger.info(f"成功为{work_type_name}选择工况: {workinfo_name}")
# success_count += 1
# else:
# self.logger.warning(f"未能为{work_type_name}选择工况: {workinfo_name}")
# except TimeoutException:
# self.logger.error(f"等待{work_type_name}工况选择按钮超时")
# except Exception as e:
# self.logger.error(f"点击{work_type_name}工况按钮时出错: {str(e)}")
# # 如果有多个工况名称,处理次要工况
# if len(workinfo_names) > 1:
# self.logger.info(f"{work_type_name}有{len(workinfo_names)-1}个次要工况需要处理: {workinfo_names[1:]}")
# self._handle_minor_work_conditions(work_type_str, workinfo_names[1:])
# else:
# self.logger.warning(f"工点类型{work_type_name}({work_type_str})没有可用的工况名称")
# self.logger.info(f"工况选择完成: 成功{success_count}/{total_count}个工点类型")
# return success_count > 0 # 只要有一个成功就返回True
# except Exception as e:
# self.logger.error(f"选择工况信息时发生未知错误: {str(e)}")
# return False
def _select_work_condition(self, main_condition_dict: Dict[str, List[str]], minor_conditions_list: List[Dict]):
"""根据主要工况字典和次要工况列表选择工况信息"""
self.logger.info("开始选择工况信息")
self.logger.info(f"主要工况: {main_condition_dict}")
self.logger.info(f"次要工况数量: {len(minor_conditions_list)}")
# 工点类型编码与界面控件ID的映射
work_type_mapping = {
"1": { # 隧道
"button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_suidao",
"name": "隧道"
},
"2": { # 区间路基
"button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_luji",
"name": "路基"
},
"3": { # 桥
"button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_qiaoliang",
"name": "梁桥"
},
"4": { # 涵洞
"button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_handong",
"name": "涵洞"
}
}
try:
success_count = 0
# 第一步:为每个工点类型选择主要工况
for work_type, workinfo_names in main_condition_dict.items():
work_type_str = str(work_type).strip()
if work_type_str not in work_type_mapping:
self.logger.warning(f"未知的工点类型编码: {work_type_str},跳过")
continue
mapping = work_type_mapping[work_type_str]
button_id = mapping["button_id"]
work_type_name = mapping["name"]
if workinfo_names:
workinfo_name = workinfo_names[0] # 主要工况
self.logger.info(f"{work_type_name}({work_type_str})选择主要工况: {workinfo_name}")
try:
# 点击工况选择按钮
condition_button = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, button_id))
)
condition_button.click()
self.logger.info(f"成功点击{work_type_name}工况选择按钮")
# 选择主要的工况选项
if self._select_condition_option(workinfo_name):
self.logger.info(f"成功为{work_type_name}选择主要工况: {workinfo_name}")
success_count += 1
else:
self.logger.warning(f"未能为{work_type_name}选择主要工况: {workinfo_name}")
except TimeoutException:
self.logger.error(f"等待{work_type_name}工况选择按钮超时")
except Exception as e:
self.logger.error(f"点击{work_type_name}工况按钮时出错: {str(e)}")
# 第二步:如果有次要工况,滑动页面处理
if minor_conditions_list:
self.logger.info(f"开始处理 {len(minor_conditions_list)} 个次要工况")
minor_success_count = self._handle_minor_work_conditions(minor_conditions_list)
self.logger.info(f"次要工况处理完成: 成功 {minor_success_count}/{len(minor_conditions_list)}")
else:
self.logger.info("没有次要工况需要处理")
self.logger.info(f"工况选择完成: 主要工况成功{success_count}/{len(main_condition_dict)}")
return success_count > 0 # 只要有一个主要工况成功就返回True
except Exception as e:
self.logger.error(f"选择工况信息时发生未知错误: {str(e)}")
return False
# # 选择次要工况信息
# def _select_minor_conditions_option(self, option_name: str, work_type_name: str) -> bool:
# """根据测点名字选择对应的下拉列表中的选项"""
# try:
# # 找到选项元素
# option = self.wait.until(
# EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/point_pstate_sp"))
# )
# option.click()
# # 选择次要的工况选项
# if self._select_condition_option(option_name):
# self.logger.info(f"成功为{option_name}选择次要工况: {work_type_name}")
# success_count += 1
# return True
# except TimeoutException:
# self.logger.error(f"未找到选项: {option_name}")
# return False
# except Exception as e:
# self.logger.error(f"选择选项时出错: {str(e)}")
# return False
def _handle_minor_work_conditions(self, minor_conditions_list: List[Dict]) -> int:
"""处理次要工况:滑动页面,找到对应测点并设置工况"""
success_count = 0
processed_points = set() # 记录已处理的测点,避免重复处理
logging.info(f"要处理的次要工况:{minor_conditions_list}")
# 提取所有需要处理的测点ID
target_point_ids = {condition['point_id'] for condition in minor_conditions_list}
self.logger.info(f"目标测点ID: {list(target_point_ids)}")
# 点击"关闭设置所有"按钮
close_set_all_points_btn = self.wait.until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/display_setallpoint_btn"))
)
close_set_all_points_btn.click()
self.logger.info("已点击'关闭设置所有测点'按钮")
try:
max_scroll_attempts = 20 # 最大滑动次数防止无限循环
scroll_attempt = 0
previous_points = set() # 记录上一页的测点
remaining_conditions = minor_conditions_list.copy()
while scroll_attempt < max_scroll_attempts:
scroll_attempt += 1
self.logger.info(f"{scroll_attempt} 次滑动处理次要工况...")
# 获取当前页面的测点
current_points = set(self.collect_points_on_page())
self.logger.info(f"当前页面共找到 {len(current_points)} 个测点: {list(current_points)}")
# 检查是否有新测点
if current_points == previous_points:
self.logger.info("当前页面没有新的测点,停止滑动")
break
# 检查当前页面是否有目标测点
current_target_points = current_points & target_point_ids
if not current_target_points:
self.logger.info(f"当前页面没有目标测点,继续滑动查找...")
# 滑动到下一页
if not self.swipe_down():
self.logger.warning("滑动失败,停止处理")
break
# 等待页面稳定
time.sleep(1)
previous_points = current_points
continue
self.logger.info(f"当前页面找到目标测点: {list(current_target_points)}")
# 如果当前测点列表的测点名在minor_conditions_list中就点击选择测点对应工况
# 处理当前页面的次要工况
page_success_count = self._process_minor_conditions_on_current_page(
current_points, remaining_conditions, processed_points
)
success_count += page_success_count
# 更新剩余待处理的工况列表
remaining_conditions = [
condition for condition in remaining_conditions
if condition['point_id'] not in processed_points
]
self.logger.info(f"剩余待处理工况: {len(remaining_conditions)}")
if remaining_conditions:
remaining_points = [cond['point_id'] for cond in remaining_conditions]
self.logger.info(f"剩余测点: {remaining_points}")
# 更新已处理的测点记录
previous_points = current_points
# 如果所有次要工况都已处理完成,提前退出
if len(processed_points) >= len(minor_conditions_list):
self.logger.info(f"所有 {len(minor_conditions_list)} 个次要工况已处理完成")
break
# 滑动到下一页
self.logger.info("滑动到下一页继续查找...")
if not self.swipe_down():
self.logger.warning("滑动失败,停止处理")
break
# 等待页面稳定
time.sleep(1)
self.logger.info(f"次要工况处理完成: 成功 {success_count}/{len(minor_conditions_list)}")
return success_count
except Exception as e:
self.logger.error(f"处理次要工况时出错: {str(e)}")
return success_count
def _process_minor_conditions_on_current_page(self, current_points: set, minor_conditions_list: List[Dict], processed_points: set) -> int:
"""处理当前页面上的次要工况"""
page_success_count = 0
try:
self.logger.info("开始处理当前页面上的次要工况...")
# 1. 先定位所有测点的根容器
point_containers = self.driver.find_elements(
AppiumBy.ID,
"com.bjjw.cjgc:id/layout_popup_top"
)
self.logger.info(f"当前页面找到 {len(point_containers)} 个测点容器")
if not point_containers:
self.logger.warning("当前页面未找到任何测点容器")
return 0
# 2. 构建当前页面测点的映射关系:测点名称 -> 容器元素
point_container_map = {}
for container in point_containers:
try:
name_element = container.find_element(
AppiumBy.ID,
"com.bjjw.cjgc:id/improve_point_name"
)
point_name = name_element.text.strip()
if point_name and point_name in current_points:
point_container_map[point_name] = container
self.logger.debug(f"映射测点: {point_name}")
except NoSuchElementException:
continue
except Exception as e:
self.logger.debug(f"解析测点容器时出错: {str(e)}")
continue
# 3. 遍历需要处理的次要工况列表
for minor_condition in minor_conditions_list:
point_id = minor_condition['point_id']
workinfoname = minor_condition['workinfoname']
work_type = minor_condition['work_type']
# 检查是否已经处理过
if point_id in processed_points:
self.logger.debug(f"测点 {point_id} 已处理过,跳过")
continue
# 检查是否在当前页面
if point_id not in current_points:
self.logger.debug(f"测点 {point_id} 不在当前页面")
continue
# 检查是否有对应的容器
if point_id not in point_container_map:
self.logger.warning(f"测点 {point_id} 在当前页面但未找到对应容器")
continue
try:
self.logger.info(f"开始处理测点 {point_id} 的次要工况: {workinfoname}")
# 4. 在当前测点的容器内查找工况选择按钮
container = point_container_map[point_id]
workinfo_button = container.find_element(
AppiumBy.ID,
"com.bjjw.cjgc:id/point_workinfo_sp"
)
# 验证按钮是否可见和可用
if workinfo_button.is_displayed() and workinfo_button.is_enabled():
workinfo_button.click()
self.logger.info(f"已点击测点 {point_id} 的工况选择按钮")
# 选择对应的工况选项
if self._select_minor_conditions_option(workinfoname, work_type):
page_success_count += 1
processed_points.add(point_id)
self.logger.info(f"成功为测点 {point_id} 设置次要工况: {workinfoname}")
else:
self.logger.warning(f"为测点 {point_id} 选择工况选项失败")
else:
self.logger.warning(f"测点 {point_id} 的工况选择按钮不可用")
except NoSuchElementException:
self.logger.warning(f"未找到测点 {point_id} 的工况选择按钮")
except Exception as e:
self.logger.error(f"处理测点 {point_id} 时出错: {str(e)}")
self.logger.info(f"当前页面成功处理 {page_success_count} 个测点的次要工况")
return page_success_count
except Exception as e:
self.logger.error(f"处理当前页面次要工况时发生异常: {str(e)}")
return page_success_count
def _select_minor_conditions_option(self, option_name: str, work_type_name: str) -> bool:
"""根据工况名称选择对应的下拉列表中的选项"""
try:
self.logger.info(f"开始选择次要工况选项: {option_name}")
# 方法1: 通过文本精确匹配
try:
option_xpath = f"//android.widget.TextView[@text='{option_name}']"
option_element = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((AppiumBy.XPATH, option_xpath))
)
option_element.click()
self.logger.info(f"通过文本匹配成功选择工况: {option_name}")
return True
except TimeoutException:
self.logger.debug(f"通过文本 '{option_name}' 未找到工况选项")
# 方法2: 通过列表项ID查找
try:
list_items = self.driver.find_elements(AppiumBy.ID, "android:id/text1")
for item in list_items:
if item.text == option_name:
item.click()
self.logger.info(f"通过列表项成功选择工况: {option_name}")
return True
except Exception as e:
self.logger.debug(f"通过列表项查找失败: {str(e)}")
# 方法3: 滑动查找
max_scroll_attempts = 3
for attempt in range(max_scroll_attempts):
try:
option_element = self.driver.find_element(
AppiumBy.XPATH,
f"//android.widget.TextView[@text='{option_name}']"
)
option_element.click()
self.logger.info(f"通过滑动后查找成功选择工况: {option_name}")
return True
except NoSuchElementException:
self.logger.debug(f"{attempt + 1} 次滑动查找未找到选项: {option_name}")
# 执行滑动
self._scroll_condition_options()
except Exception as e:
self.logger.debug(f"滑动查找时出错: {str(e)}")
break
self.logger.error(f"所有方法都无法找到工况选项: {option_name}")
return False
except TimeoutException:
self.logger.error(f"等待工况选项可点击超时: {option_name}")
return False
except Exception as e:
self.logger.error(f"选择工况选项时出错: {str(e)}")
return False
def _scroll_condition_options(self):
"""滑动工况选项列表"""
try:
# 直接使用固定的坐标值
# 避免调用 get_window_size() 方法,防止 Appium 服务器崩溃
start_x = 540 # 假设屏幕宽度为 1080取中间值
start_y = 1400 # 假设屏幕高度为 2000取 70% 位置
end_y = 600 # 假设屏幕高度为 2000取 30% 位置
# 执行滑动
self.driver.swipe(start_x, start_y, start_x, end_y, 500)
self.logger.debug("执行滑动操作查找更多工况选项")
time.sleep(1) # 等待滑动完成
except Exception as e:
self.logger.error(f"滑动工况选项列表时出错: {str(e)}")
# def _handle_minor_work_conditions(self, minor_conditions_list: List[Dict]) -> int:
# """处理次要工况:滑动页面,找到对应测点并设置工况"""
# success_count = 0
# processed_points = set() # 记录已处理的测点,避免重复处理
# logging.info(f"要处理的次要工况:{minor_conditions_list}")
# try:
# while True:
# points_on_page_before = []
# # 获取当前页面的测点
# points_on_page_after = self.collect_points_on_page()
# self.logger.info(f"当前页面共找到 {len(points_on_page_after)} 个测点")
# if points_on_page_after == points_on_page_before:
# self.logger.info("当前页面没有新的测点,停止滑动")
# break
# else:
# points_on_page_before = points_on_page_after
# # 如果当前页面测点在次要工况字典中,则进行点击"com.bjjw.cjgc:id/point_workinfo_sp",选择对应的工况
# _select_minor_conditions_option
# self.swipe_down()
# # # 滑动收集页面中的所有测点
# # all_points_on_page = self.collect_all_points_on_page()
# # self.logger.info(f"页面中共找到 {len(all_points_on_page)} 个测点")
# # # 为每个次要工况找到对应的测点并设置工况
# # for minor_condition in minor_conditions_list:
# # point_id = minor_condition['point_id']
# # target_workinfoname = minor_condition['workinfoname']
# # work_type = minor_condition['work_type']
# # if point_id in processed_points:
# # continue
# # # 在页面中查找对应的测点
# # target_point = None
# # for point_data in all_points_on_page:
# # if point_data.get('point_id') == point_id:
# # target_point = point_data
# # break
# # if target_point:
# # # 找到测点,设置工况
# # if self._set_single_point_work_condition(target_point, target_workinfoname, work_type):
# # success_count += 1
# # processed_points.add(point_id)
# # self.logger.info(f"成功为测点 {point_id} 设置次要工况: {target_workinfoname}")
# # else:
# # self.logger.warning(f"为测点 {point_id} 设置次要工况失败")
# # else:
# # self.logger.warning(f"未在页面中找到测点 {point_id},可能不在当前视图内")
# # return success_count
# except Exception as e:
# self.logger.error(f"处理次要工况时出错: {str(e)}")
# return success_count
def collect_points_on_page(self) -> List[str]:
"""
收集当前页面中存在的测点ID列表
要求:必须同时存在"com.bjjw.cjgc:id/improve_point_name""com.bjjw.cjgc:id/point_workinfo_sp"才返回
"""
point_ids = []
try:
self.logger.info("开始收集当前页面中的测点ID...")
# 1. 先定位所有测点的根容器RelativeLayout确保在同一容器内查找子元素
point_containers = self.driver.find_elements(
AppiumBy.ID,
"com.bjjw.cjgc:id/layout_popup_top" # 测点根容器的resource-id
)
self.logger.info(f"找到 {len(point_containers)} 个测点根容器")
for container in point_containers:
try:
# 2. 在当前根容器内查找测点名称元素
name_element = container.find_element(
AppiumBy.ID,
"com.bjjw.cjgc:id/improve_point_name"
)
point_name = name_element.text
if not point_name:
self.logger.debug("测点名称为空,跳过")
continue
# 3. 在当前根容器内查找工况按钮(按实际层级定位)
# 层级RelativeLayout -> LinearLayout -> LinearLayout -> Button
workinfo_button = container.find_element(
AppiumBy.XPATH,
".//android.widget.LinearLayout/android.widget.LinearLayout/android.widget.Button[@resource-id='com.bjjw.cjgc:id/point_workinfo_sp']"
)
# 4. 验证按钮是否可见
if workinfo_button.is_displayed():
point_ids.append(point_name)
self.logger.debug(f"找到有效测点: {point_name}")
except NoSuchElementException as e:
# 若名称或工况按钮不存在,跳过当前容器
self.logger.debug(f"测点容器中缺少必要元素: {str(e)}")
continue
except Exception as e:
self.logger.debug(f"解析测点容器时出错: {str(e)}")
continue
self.logger.info(f"当前页面共找到 {len(point_ids)} 个有效测点: {point_ids}")
return point_ids
except Exception as e:
self.logger.error(f"收集页面测点ID时出错: {str(e)}")
return []
def collect_all_points_on_page(self) -> List[Dict]:
"""滑动收集页面中所有测点的信息"""
all_points = []
seen_point_names = set()
max_scroll_attempts = 50
scroll_attempt = 0
self.logger.info("开始滑动收集页面中所有测点...")
while scroll_attempt < max_scroll_attempts:
scroll_attempt += 1
self.logger.info(f"{scroll_attempt} 次滑动收集...")
# 获取当前屏幕的测点
current_points = self._get_current_screen_points_detail()
if not current_points:
self.logger.info("当前屏幕没有测点数据")
break
# 添加新发现的测点
new_points_count = 0
for point in current_points:
point_name = point.get('point_name')
if point_name and point_name not in seen_point_names:
all_points.append(point)
seen_point_names.add(point_name)
new_points_count += 1
self.logger.info(f"本次获取到 {len(current_points)} 个测点,其中 {new_points_count} 个是新测点")
if new_points_count == 0:
self.logger.info("没有发现新测点,停止滑动")
break
# 滑动到下一页
if not self.swipe_up():
self.logger.warning("滑动失败,停止收集")
break
time.sleep(0.5)
self.logger.info(f"共收集到 {len(all_points)} 个测点的详细信息")
return all_points
def _get_current_screen_points_detail(self) -> List[Dict]:
"""获取当前屏幕测点的详细信息"""
points = []
try:
# 查找所有测点容器
point_containers = self.driver.find_elements(
AppiumBy.XPATH,
"//android.widget.LinearLayout[@resource-id='com.bjjw.cjgc:id/layout_popup_top']"
)
for container in point_containers:
try:
point_info = {}
# 获取测点名称作为point_id的替代
name_element = container.find_element(
AppiumBy.ID, "com.bjjw.cjgc:id/improve_point_name"
)
point_name = name_element.text
point_info['point_name'] = point_name
point_info['point_id'] = point_name # 使用名称作为ID或者根据实际情况调整
# 获取测点元素引用,用于后续操作
point_info['element'] = container
# 获取当前工况信息
try:
workinfo_element = container.find_element(
AppiumBy.ID, "com.bjjw.cjgc:id/point_workinfo_sp"
)
point_info['current_workinfo'] = workinfo_element.text
point_info['workinfo_element'] = workinfo_element
except NoSuchElementException:
point_info['current_workinfo'] = ""
point_info['workinfo_element'] = None
points.append(point_info)
except Exception as e:
self.logger.debug(f"解析单个测点信息时出错: {str(e)}")
continue
except Exception as e:
self.logger.error(f"获取当前屏幕测点详细信息时出错: {str(e)}")
return points
def _set_single_point_work_condition(self, point_data: Dict, workinfo_name: str, work_type: str) -> bool:
"""为单个测点设置工况信息"""
try:
point_name = point_data.get('point_name')
self.logger.info(f"开始为测点 {point_name} 设置工况: {workinfo_name}")
# 使用保存的元素引用点击工况选择按钮
workinfo_element = point_data.get('workinfo_element')
if workinfo_element:
workinfo_element.click()
time.sleep(1) # 等待选项弹出
# 选择指定的工况
if self._select_condition_option(workinfo_name):
self.logger.info(f"成功为测点 {point_name} 设置工况: {workinfo_name}")
return True
else:
self.logger.warning(f"为测点 {point_name} 选择工况选项失败")
return False
else:
self.logger.warning(f"未找到测点 {point_name} 的工况选择按钮")
return False
except Exception as e:
self.logger.error(f"为测点 {point_name} 设置工况时出错: {str(e)}")
return False
def _select_condition_option(self, condition_name: str) -> bool:
"""选择具体的工况选项
Args:
condition_name: 工况名称
Returns:
bool: 是否选择成功
"""
try:
self.logger.info(f"开始选择工况选项: {condition_name}")
# 方法1: 通过文本查找并点击
try:
option_xpath = f"//android.widget.TextView[@text='{condition_name}']"
option_element = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((AppiumBy.XPATH, option_xpath))
)
option_element.click()
self.logger.info(f"通过文本定位成功选择工况: {condition_name}")
return True
except TimeoutException:
self.logger.debug(f"通过文本'{condition_name}'未找到工况选项")
# 方法2: 通过列表项查找
try:
# 假设工况选项在列表中,点击第一个可用的选项
list_item = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((AppiumBy.ID, "android:id/text1"))
)
list_item.click()
self.logger.info("通过列表项选择工况")
return True
except TimeoutException:
self.logger.debug("未找到列表项形式的工况选项")
# 方法3: 尝试点击屏幕特定位置(备选方案)
try:
# 直接使用固定的屏幕中央坐标
# 避免调用 get_window_size() 方法,防止 Appium 服务器崩溃
x = 540 # 假设屏幕宽度为 1080取中间值
y = 1000 # 假设屏幕高度为 2000取中间值
# 点击屏幕中央(假设选项在中间)
self.driver.tap([(x, y)])
self.logger.info("通过点击屏幕中央选择工况")
return True
except Exception as e:
self.logger.debug(f"点击屏幕中央失败: {str(e)}")
self.logger.error(f"所有方法都无法选择工况选项: {condition_name}")
return False
except Exception as e:
self.logger.error(f"选择工况选项时出错: {str(e)}")
return False
def _scroll_to_find_condition(self):
"""滑动查找工况选项的辅助方法"""
try:
# 直接使用固定的坐标值
# 避免调用 get_window_size() 方法,防止 Appium 服务器崩溃
start_x = 540 # 假设屏幕宽度为 1080取中间值
start_y = 1400 # 假设屏幕高度为 2000取 70% 位置
end_y = 600 # 假设屏幕高度为 2000取 30% 位置
# 执行滑动
self.driver.swipe(start_x, start_y, start_x, end_y, 500)
logging.debug("执行滑动操作")
except Exception as e:
logging.error(f"滑动操作时出错: {str(e)}")
def aging_down_data(self, breakpoint_name=None, retry_count=0):
"""跳转上传配置页面,根据断点名称点击上传按钮,检查是否在上传配置页面
->变化量属性,执行下载操作"""
try:
self.logger.info("开始执行上传配置页面操作aging")
# # 跳转到上传配置页面
# if not self.go_upload_config_page():
# self.logger.error("跳转上传配置页面失败")
# return False
# 跳转到上传配置页面
if not go_main_click_tabber_button(self.driver, self.device_id, "com.bjjw.cjgc:id/img_2_layout"):
logging.error(f"设备 {self.device_id} 跳转到上传配置页面失败")
return False
# 根据断点名称点击上传按钮
if not self.click_upload_by_breakpoint_name(breakpoint_name):
self.logger.error("点击上传按钮失败")
return False
if not self.handle_upload_dialog():
self.logger.error("处理上传对话框失败")
return False
self.logger.info("开始执行上传配置页面测试")
# 检查是否在上传配置页面
if not self.is_on_upload_config_page():
self.logger.info("不在上传配置页面,尝试导航...")
return False
# 检查是否有变化量属性
if not self.check_change_amount_on_page():
self.logger.info("页面中缺少变化量属性,执行下载操作")
# 执行下载操作
if not self.execute_download_operation():
self.logger.error("下载操作执行失败")
return False
time.sleep(1)
self.logger.info("已返回上传配置页面")
# 如果是第一次重试再次调用aging_down_data
if retry_count < 1:
self.logger.info(f"{retry_count+1}次重试再次执行aging_down_data流程")
return self.aging_down_data(breakpoint_name, retry_count + 1)
else:
# 第二次仍然缺少变化量属性,返回错误
self.logger.error("已执行两次下载操作,页面仍然缺少变化量属性")
return False
else:
self.logger.info("页面中包含变化量属性,继续执行后续操作")
return True
except Exception as e:
self.logger.error(f"设备执行上传重复下载数据操作失败:{e}")
return False
def click_save_upload_and_handle_dialogs(self):
"""点击保存上传并处理弹窗"""
try:
self.logger.info("开始点击保存上传并处理弹窗")
# 点击保存上传按钮
save_upload_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/improve_save_btn"))
)
save_upload_btn.click()
self.logger.info("已点击保存上传按钮")
# 处理警告弹窗
time.sleep(1)
if not self.handle_warning_dialog():
self.logger.error("处理警告弹窗失败")
return False
return True
except TimeoutException:
self.logger.error("点击保存上传按钮超时")
return False
except Exception as e:
self.logger.error(f"点击保存上传并处理弹窗时出错: {str(e)}")
return False
def handle_warning_dialog(self):
"""处理警告弹窗"""
try:
self.logger.info("检查并处理警告弹窗")
# 等待弹窗出现
warning_dialog = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((AppiumBy.ID, "android:id/parentPanel"))
)
# 获取弹窗文本确认是目标弹窗
alert_title = warning_dialog.find_element(AppiumBy.ID, "android:id/alertTitle")
alert_message = warning_dialog.find_element(AppiumBy.ID, "android:id/message")
self.logger.info(f"检测到弹窗 - 标题: {alert_title.text}, 消息: {alert_message.text}")
# 根据业务逻辑选择"是"或"否"
# 这里选择"是"来上传本次数据
yes_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button1")
yes_button.click()
self.logger.info("已点击''按钮确认上传")
# no_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button3")
# no_button.click()
# self.driver.back()
# self.logger.info("已点击'否'按钮取消上传和返回按钮")
return True
except TimeoutException:
self.logger.info("未检测到警告弹窗,继续流程")
return True # 没有弹窗也是正常情况
except Exception as e:
self.logger.error(f"处理警告弹窗时出错: {str(e)}")
return False
def wait_for_upload_completion(self):
"""等待上传完成"""
try:
# time.sleep(2)
self.logger.info("开始等待上传完成")
# 等待弹窗显示
upload_list = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((AppiumBy.ID, "android:id/customPanel"))
)
#等待弹窗消失
WebDriverWait(self.driver, 20).until(
EC.invisibility_of_element_located((AppiumBy.ID, "android:id/customPanel"))
)
self.logger.info("已返回到上传列表页面,上传已完成")
return True
except TimeoutException:
self.logger.error("等待上传完成超时")
return False
except Exception as e:
self.logger.error(f"等待上传完成时出错: {str(e)}")
return False
# def parse_work_conditions(self, work_conditions: Optional[Dict[str, Dict]]) -> List[Dict[str, str]]:
# """
# 解析工况信息获取所有work_type和对应的workinfoname
# Args:
# work_conditions: 从get_work_conditions_by_linecode获取的工况数据
# Returns:
# 返回工况列表,格式为 [{"work_type": "1", "workinfoname": "墩台混凝土施工"}, ...]
# """
# condition_list = []
# if not work_conditions:
# logging.warning("工况数据为空")
# return condition_list
# try:
# # 使用集合来去重,避免重复的工况组合
# unique_conditions = set()
# for point_id, condition_data in work_conditions.items():
# work_type = condition_data.get('work_type', '')
# workinfoname = condition_data.get('workinfoname', '')
# # 过滤空值
# if work_type and workinfoname:
# # 创建唯一标识符
# condition_key = f"{work_type}_{workinfoname}"
# if condition_key not in unique_conditions:
# unique_conditions.add(condition_key)
# condition_list.append({
# "work_type": work_type,
# "workinfoname": workinfoname
# })
# logging.info(f"解析出 {len(condition_list)} 种不同的工况组合")
# return condition_list
# except Exception as e:
# logging.error(f"解析工况信息时发生错误: {str(e)}")
# return []
def parse_work_conditions(self, work_conditions):
"""
解析工况信息,区分主要工况和次要工况
返回: (主要工况字典, 次要工况列表)
"""
if not work_conditions:
logging.warning("工况数据为空")
return {}, []
try:
# 统计每个work_type下各个workinfoname的出现次数
work_type_stats = {}
point_work_conditions = {} # 记录每个测点的工况信息
for point_id, condition_data in work_conditions.items():
work_type = str(condition_data.get('work_type', ''))
workinfoname = condition_data.get('workinfoname', '')
sjname = condition_data.get('sjName', '')
# 记录测点工况信息
point_work_conditions[point_id] = {
'work_type': work_type,
'workinfoname': workinfoname,
'sjname': sjname
}
# 统计出现次数
if work_type and workinfoname:
if work_type not in work_type_stats:
work_type_stats[work_type] = {}
if workinfoname in work_type_stats[work_type]:
work_type_stats[work_type][workinfoname] += 1
else:
work_type_stats[work_type][workinfoname] = 1
# 分离主要工况和次要工况
main_condition_dict = {} # 主要工况字典 {work_type: [主要workinfoname]}
minor_conditions_list = [] # 次要工况列表 [{point_id, work_type, workinfoname}]
# 定义阈值:出现次数少于这个值的认为是次要工况
minor_threshold = 3 # 可以根据实际情况调整
for work_type, workinfoname_counts in work_type_stats.items():
if workinfoname_counts:
# 按出现次数从多到少排序
sorted_workinfonames = sorted(workinfoname_counts.items(), key=lambda x: x[1], reverse=True)
# 主要工况:取出现次数最多的
if sorted_workinfonames:
main_workinfoname = sorted_workinfonames[0][0]
main_condition_dict[work_type] = [main_workinfoname]
logging.info(f"work_type {work_type} 的主要工况: '{main_workinfoname}' (出现{sorted_workinfonames[0][1]}次)")
# 次要工况:收集所有出现次数较少的工况及其对应测点
for workinfoname, count in sorted_workinfonames:
if count <= minor_threshold:
# 找到使用这个次要工况的所有测点
for point_id, point_info in point_work_conditions.items():
if (point_info['work_type'] == work_type and
point_info['workinfoname'] == workinfoname):
minor_conditions_list.append({
'point_id': point_id,
'work_type': work_type,
'workinfoname': workinfoname,
'sjname': point_info['sjname'],
'count': count
})
logging.info(f"解析结果: 主要工况 {len(main_condition_dict)} 种,次要工况 {len(minor_conditions_list)} 个测点")
# 打印次要工况详情用于调试
for minor_condition in minor_conditions_list:
logging.info(f"次要工况 - 测点 {minor_condition['point_id']}: work_type={minor_condition['work_type']}, workinfoname='{minor_condition['workinfoname']}'")
return main_condition_dict, minor_conditions_list
except Exception as e:
logging.error(f"解析工况信息时发生错误: {str(e)}")
return {}, []
def get_work_type_name(work_type: str) -> str:
"""
根据工点类型编码获取类型名称
Args:
work_type: 工点类型编码1-隧道2-区间路基3-桥, 4-涵洞)
Returns:
工点类型名称
"""
work_type_mapping = {
"1": "隧道",
"2": "区间路基",
"3": "",
"4": "涵洞"
}
return work_type_mapping.get(work_type, f"未知类型({work_type})")
def upload_config_page_manager(self, results_dir, breakpoint_name=None, line_num=None):
"""执行上传配置页面管理操作"""
try:
# 保存参数为实例属性
self.results_dir = results_dir
self.breakpoint_name = breakpoint_name
self.line_num = line_num
self.logger.info("开始执行上传配置页面操作manager")
# 跳转到上传配置页面
# if not self.go_upload_config_page():
# self.logger.error("跳转上传配置页面失败")
# return False
# 跳转到上传配置页面
if not go_main_click_tabber_button(self.driver, self.device_id, "com.bjjw.cjgc:id/img_2_layout"):
logging.error(f"设备 {self.device_id} 跳转到测量页面失败")
return False
# 根据断点名称点击上传按钮
if not self.click_upload_by_breakpoint_name(breakpoint_name):
self.logger.error("点击上传按钮失败")
return False
if not self.handle_upload_dialog():
self.logger.error("处理上传对话框失败")
return False
self.logger.info("开始执行上传配置页面测试")
# 检查是否在上传配置页面
if not self.is_on_upload_config_page():
self.logger.info("不在上传配置页面,尝试导航...")
return False
# 检查是否有变化量属性,有就执行下面代码,没有就执行重新下载函数
# 直接检查页面中是否有"变化量"属性
if not self.check_change_amount_on_page():
self.logger.info("页面中缺少变化量属性,执行下载操作")
# 执行下载操作
if not self.execute_download_operation():
self.logger.error("下载操作执行失败")
return False
# 下载操作完成后,点击上传导航按钮跳转到上传页面执行更多下载操作并检查状态
if not self.aging_down_data(breakpoint_name):
self.logger.error("三次下载都失败,属性不存在变量")
return False
else:
self.logger.info("页面中包含变化量属性,继续执行后续操作")
user_id = global_variable.GLOBAL_USERNAME
if user_id is None:
self.logger.error("获取用户ID失败")
return False
max_variation = apis.get_user_max_variation(user_id)
if max_variation is None:
self.logger.error("获取用户最大变化量失败")
return False
# # 循环滑动收集所有测点数据
# logging.info("准备循环滑动收集所有测点数据")
# all_point_data = self.collect_all_point_data(results_dir)
# # 保存测试结果
# result_file = os.path.join(results_dir, f"{self.line_num}_{datetime.now().strftime('%Y%m%d')}.txt")
# with open(result_file, 'w', encoding='utf-8') as f:
# f.write(f"测试时间: {datetime.now().strftime('%Y-%m-%d')}\n")
# f.write(f"获取到的测点数: {len(all_point_data)}\n\n")
# for i, point in enumerate(all_point_data, 1):
# f.write(f"测点 {i}: {point.get('point_name', '未知')}\n")
# parsed = point.get('parsed_data', {})
# # if parsed:
# # f.write(f" 本期数值: {parsed.get('current_value', 'N/A')}\n")
# # f.write(f" 上期数值: {parsed.get('previous_value', 'N/A')}\n")
# # f.write(f" 变化量: {parsed.get('change_amount', 'N/A')}\n")
# # f.write(f" 测量时间: {parsed.get('measurement_time', 'N/A')}\n")
# f.write(f"完整数据:\n{point.get('point_value', 'N/A')}\n\n")
# self.logger.info(f"测试结果已保存到: {result_file}")
# 给ai接口发送文件等待接口返回是否能上传数据
# 不能就点击手机返回按钮,能就继续填写表单
# if not self.check_ai_upload_permission(result_file):
# self.logger.info("AI接口返回不允许上传点击返回按钮")
# self.driver.back()
# return True # 返回True继续下一个断点的上传配置。
if not self.collect_check_all_point_data(max_variation):
self.logger.error(f"断点 '{breakpoint_name}' 上传失败")
self.driver.back()
return False # 返回False继续下一个断点的上传配置。
# 获取线路的所有工况信息
work_conditions = apis.get_work_conditions_by_linecode(self.line_num)
# work_conditions = {'1962527': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后第1个月', 'work_type': 2},
# '0299815Z2': {'sjName': '王顺', 'workinfoname': '冬休', 'work_type': 2},
# '0299820H1': {'sjName': '王顺', 'workinfoname': '架桥机(运梁车) 首次通过后', 'work_type': 4},
# '0431248D1': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后第1个月', 'work_type': 4},
# '0431248D2': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后第1个月', 'work_type': 4},
# '0299815Z1': {'sjName': '王顺', 'workinfoname': '架桥机(运梁车) 首次通过前', 'work_type': 2},
# '0431289D2': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后第1个月', 'work_type': 4},
# '0431330D1': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后第1个月', 'work_type': 2},
# '0431330D2': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后第1个月', 'work_type': 2},
# '0431370D1': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后第1个月', 'work_type': 2}}
self.logger.info(f"获取线路工况信息成功: {work_conditions}")
if not work_conditions:
self.logger.error("获取工况信息失败")
return False
# 提取人员姓名和身份证
if not self._load_user_data():
self.logger.error("加载用户数据失败")
return False
# 获取第一个数据员姓名和身份证号
user_info = self.get_first_sjname_and_id(self.line_num, work_conditions)
self.logger.info(f"获取到的第一个数据员姓名和身份证号为:{user_info}")
if not user_info:
self.logger.error(f"无法获取线路 '{self.line_num}' 的数据员信息")
return False
# # 解析为需要的列表格式
# condition_list = self.parse_work_conditions(work_conditions)
# self.logger.info(f"设备获取到的工作条件列表为:{condition_list}")
# # 转换为condition_dict格式如果需要
# condition_dict = {}
# for condition in condition_list:
# work_type = condition['work_type']
# workinfoname = condition['workinfoname']
# if work_type not in condition_dict:
# condition_dict[work_type] = []
# if workinfoname not in condition_dict[work_type]:
# condition_dict[work_type].append(workinfoname)
# self.logger.info(f"设备获取到的工作条件字典为:{condition_dict}")
# # 设置所有测点并填写表单
# if not self.set_all_points_and_fill_form(results_dir, user_info.get("name"), user_info.get("id_card"), condition_dict):
# self.logger.error("设置所有测点并填写表单失败")
# return False
# 解析工况信息,现在返回两个值:主要工况字典和次要工况列表
main_condition_dict, minor_conditions_list = self.parse_work_conditions(work_conditions)
self.logger.info(f"主要工况: {main_condition_dict}")
self.logger.info(f"次要工况数量: {len(minor_conditions_list)}")
# 设置所有测点并填写表单 - 传入两个参数
if not self.set_all_points_and_fill_form(results_dir, user_info.get("name"), user_info.get("id_card"), main_condition_dict, minor_conditions_list):
self.logger.error("设置所有测点并填写表单失败")
return False
# # 表达填写完成,点击"保存上传"并处理弹窗
# if not self.click_save_upload_and_handle_dialogs():
# self.logger.error("点击保存上传并处理弹窗失败")
# return False
# 等待上传查看loading弹窗。没有就下一个
if not self.wait_for_upload_completion():
self.logger.error("等待上传完成失败")
return False
self.logger.info("上传配置页面操作执行完成")
# 把上传成功的断点写入全局变量GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST
global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST.append(breakpoint_name)
return True
except Exception as e:
self.logger.error(f"执行上传配置页面操作时出错: {str(e)}")
# # 保存错误截图
# error_screenshot_file = os.path.join(
# results_dir,
# f"upload_config_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
# )
# self.driver.save_screenshot(error_screenshot_file)
# self.logger.info(f"错误截图已保存: {error_screenshot_file}")
return False