#上传配置页面 # \page_objects\test_upload_config_page.py from appium.webdriver.common.appiumby import AppiumBy from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException import logging import time import os import re import pandas as pd from datetime import datetime from typing import Dict, Optional, List from page_objects.more_download_page import MoreDownloadPage from globals.driver_utils import check_session_valid, reconnect_driver, go_main_click_tabber_button # 导入会话检查和重连函数 import globals.apis as apis import globals.global_variable as global_variable class UploadConfigPage: def __init__(self, driver, wait, device_id): self.driver = driver self.wait = wait self.logger = logging.getLogger(__name__) self.more_download_page = MoreDownloadPage(driver, wait,device_id) self.device_id = device_id def go_upload_config_page(self): """点击img_2_layout(上传页面按钮)""" try: # 在执行操作前检查会话是否有效 if not check_session_valid(self.driver, self.device_id): self.logger.warning("会话已失效,尝试重新连接...") self.driver, self.wait = reconnect_driver(self.device_id, self.driver) self.logger.info("重新连接成功") # 首先获取当前页面信息进行调试 try: current_activity = self.driver.current_activity self.logger.info(f"当前Activity: {current_activity}") except Exception as e: self.logger.error(f"获取当前activity时出错: {str(e)}") # 尝试返回到主页面(如果不在主页面) self.logger.info("尝试返回到主页面...") max_back_presses = 5 # 最多按返回键次数 back_press_count = 0 while back_press_count < max_back_presses: try: # 检查是否已经在主页面(通过检查主页面特征元素) # 先尝试查找上传页面按钮 try: main_page_indicator = self.driver.find_element( AppiumBy.ID, "com.bjjw.cjgc:id/img_2_layout" ) if main_page_indicator.is_displayed(): self.logger.info("已在主页面,找到上传按钮") break except: # 未找到主页面元素,继续返回 pass # 按返回键 self.driver.back() back_press_count += 1 self.logger.info(f"已按返回键 {back_press_count} 次") time.sleep(1) # 等待页面响应 except Exception as e: self.logger.error(f"按返回键时出错: {str(e)}") break # 现在尝试点击上传页面按钮 self.logger.info("尝试点击上传页面按钮") try: # 使用较短的等待时间,因为我们需要快速响应 upload_page_btn = WebDriverWait(self.driver, 5).until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/img_2_layout")) ) upload_page_btn.click() self.logger.info("已点击img_2_layout,进入上传页面") time.sleep(2) # 增加等待时间确保页面加载完成 return True except TimeoutException: self.logger.warning("快速查找上传按钮超时,尝试使用更长的等待时间") # 使用更长的等待时间再次尝试 upload_page_btn = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/img_2_layout")) ) upload_page_btn.click() self.logger.info("已点击img_2_layout,进入上传页面") time.sleep(2) # 增加等待时间确保页面加载完成 return True except TimeoutException: self.logger.error("等待上传页面按钮可点击超时") return False except Exception as e: self.logger.error(f"点击上传页面按钮时出错: {str(e)}") return False def check_change_amount_on_page(self): """直接检查上传配置页面中是否包含变化量属性""" try: # 查找页面中是否包含"变化量"文本 change_amount_elements = self.driver.find_elements( AppiumBy.XPATH, "//*[contains(@text, '变化量')]" ) # 如果找到包含"变化量"文本的元素,返回True if change_amount_elements: self.logger.info("页面中包含变化量属性") return True else: self.logger.info("页面中未找到变化量属性") return False except Exception as e: self.logger.error(f"检查变化量属性时出错: {str(e)}") return False def click_upload_by_breakpoint_name(self, breakpoint_name): """根据断点名称点击上传按钮""" logging.info(f"需要点击上传按钮,断点名称:{breakpoint_name}") try: search_text = "" # 提取关键部分进行模糊匹配 if breakpoint_name.endswith('平原') and '-' in breakpoint_name: # 从右边分割一次,取第一部分(去掉末尾的“-xxx”) search_text = breakpoint_name.rsplit('-', 1)[0] self.logger.info(f"处理后搜索文本:{search_text}") else: search_text = breakpoint_name # 找到包含指定断点名称的itemContainer item_container_xpath = f"//android.widget.LinearLayout[@resource-id='com.bjjw.cjgc:id/itemContainer']//android.widget.TextView[@resource-id='com.bjjw.cjgc:id/title' and @text='{search_text}']/ancestor::android.widget.LinearLayout[@resource-id='com.bjjw.cjgc:id/itemContainer']" # 等待itemContainer出现 item_container = self.wait.until( EC.presence_of_element_located((AppiumBy.XPATH, item_container_xpath)) ) # self.logger.info(f"找到包含断点 {breakpoint_name} 的itemContainer") # 在itemContainer中查找上传按钮 upload_btn = item_container.find_element( AppiumBy.ID, "com.bjjw.cjgc:id/upload_btn" ) # 点击上传按钮 upload_btn.click() self.logger.info(f"已点击断点 {breakpoint_name} 的上传按钮") # 等待上传操作开始 time.sleep(3) # # 检查上传是否开始 # try: # upload_indicator = WebDriverWait(self.driver, 20).until( # EC.presence_of_element_located((AppiumBy.XPATH, "//*[contains(@text, '上传') or contains(@text, 'Upload')]")) # ) # self.logger.info(f"上传操作已开始: {upload_indicator.text}") # except TimeoutException: # self.logger.warning("未检测到明确的上传开始提示,但按钮点击已完成") return True except TimeoutException: self.logger.error(f"等待断点 {breakpoint_name} 的上传按钮可点击超时") return False except Exception as e: self.logger.error(f"根据断点名称点击上传按钮时出错: {str(e)}") return False def handle_upload_dialog(self): """处理上传弹窗,点击已同步按钮""" try: # 等待弹窗出现 # time.sleep(2) # 检查弹窗是否出现 dialog_indicators = [ (AppiumBy.ID, "android:id/alertTitle"), (AppiumBy.XPATH, "//android.widget.TextView[@text='提示']"), (AppiumBy.ID, "android:id/message") ] dialog_appeared = False for by, value in dialog_indicators: try: element = self.driver.find_element(by, value) if element.is_displayed(): dialog_appeared = True self.logger.info("检测到上传确认弹窗") break except: continue if not dialog_appeared: self.logger.warning("未检测到上传确认弹窗,可能不需要确认") return True # 点击"已同步"按钮 synced_btn = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "android:id/button2")) ) synced_btn.click() self.logger.info("已点击'已同步'按钮") # 等待弹窗消失 time.sleep(2) # 检查弹窗是否消失 try: # 检查弹窗是否还存在 dialog_still_exists = False for by, value in dialog_indicators: try: element = self.driver.find_element(by, value) if element.is_displayed(): dialog_still_exists = True break except: continue if not dialog_still_exists: self.logger.info("上传确认弹窗已消失") return True else: self.logger.warning("上传确认弹窗仍然存在") return False except Exception as e: self.logger.info("上传确认弹窗可能已消失") return True except TimeoutException: self.logger.error("等待上传对话框可点击超时") return False except Exception as e: self.logger.error(f"处理上传弹窗时出错: {str(e)}") return False def execute_download_operation(self): """执行下载操作:按返回键并点击img_5_layout""" try: # 按一次返回键 self.driver.back() self.logger.info("已按返回键") time.sleep(1) # 点击img_5_layout(更多下载按钮) more_download_btn = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/img_5_layout")) ) more_download_btn.click() self.logger.info("已点击更多下载按钮") # 等待页面加载 time.sleep(1) # 调用更多下载页面的方法对象 try: logging.info(f"设备开始执行更多下载页面测试") # 执行更多下载页面管理操作 success = self.more_download_page.more_download_page_manager() if success: logging.info(f"设备更多下载页面测试执行成功") # 按一次返回键 self.driver.back() self.logger.info("已按返回键,返回到更多下载页面,准备点击上传导航按钮") return True else: logging.error(f"设备更多下载页面测试执行失败") return False except Exception as e: logging.error(f"设备运行更多下载页面测试时出错: {str(e)}") # self.take_screenshot("more_download_test_error.png") return False except TimeoutException: self.logger.error("等待下载操作元素可点击超时") return False except Exception as e: self.logger.error(f"执行下载操作时出错: {str(e)}") return False def get_point_data(self): """ 获取三个测点的数据 """ point_data = [] try: # 等待页面加载 time.sleep(1) # 方法1: 通过resource-id获取所有测点名称 point_name_elements = self.driver.find_elements( AppiumBy.ID, 'com.bjjw.cjgc:id/improve_point_name' ) # 方法2: 通过text内容获取测点数据元素 point_value_elements = self.driver.find_elements( AppiumBy.ID, 'com.bjjw.cjgc:id/point_values' ) self.logger.info(f"找到 {len(point_name_elements)} 个测点") self.logger.info(f"找到 {len(point_value_elements)} 个数据元素") # 提取每个测点的数据 for i, (name_element, value_element) in enumerate(zip(point_name_elements, point_value_elements)): point_info = {} # 获取测点名称 point_name = name_element.text point_info['point_name'] = point_name # 获取测点数据 point_value = value_element.text point_info['point_value'] = point_value # 解析详细数据 try: # 解析数据格式: "本期:13679.07623m; \n上期:13679.07621m; \n变化量:0.02mm; \n测量时间:2025-10-10 15:47:25.049" data_parts = point_value.split(';') parsed_data = {} for part in data_parts: part = part.strip() if '本期:' in part: parsed_data['current_value'] = part.replace('本期:', '').strip() elif '上期:' in part: parsed_data['previous_value'] = part.replace('上期:', '').strip() elif '变化量:' in part: parsed_data['change_amount'] = part.replace('变化量:', '').strip() elif '测量时间:' in part: parsed_data['measurement_time'] = part.replace('测量时间:', '').strip() point_info['parsed_data'] = parsed_data except Exception as e: self.logger.warning(f"解析数据时出错: {e}") point_info['parsed_data'] = {} point_data.append(point_info) self.logger.info(f"测点 {i+1}: {point_name}") # self.logger.info(f"完整数据: {point_value}") except Exception as e: self.logger.error(f"获取数据时出错: {e}") return point_data def get_specific_point_data(self, point_name): """ 获取特定测点的数据 """ try: # 通过文本内容查找特定测点 point_name_element = self.driver.find_element( AppiumBy.XPATH, f'//android.widget.TextView[@resource-id="com.bjjw.cjgc:id/improve_point_name" and @text="{point_name}"]' ) # 找到对应的数据元素 - 可能需要根据实际结构调整XPath point_value_element = point_name_element.find_element( AppiumBy.XPATH, './following::android.widget.TextView[@resource-id="com.bjjw.cjgc:id/point_values"]' ) return point_value_element.text except Exception as e: self.logger.error(f"获取特定测点 {point_name} 数据时出错: {e}") return None def swipe_up(self, start_y=1500, end_y=300, duration=500): """ 从指定起始Y坐标滑动到结束Y坐标(向上滑动页面) 参数: start_y: 起始Y坐标 (默认1500) end_y: 结束Y坐标 (默认300) duration: 滑动持续时间(毫秒) (默认500) """ try: # 获取屏幕尺寸 window_size = self.driver.get_window_size() screen_width = window_size['width'] # 计算X坐标(屏幕中间) x = screen_width // 2 # 执行滑动操作 self.driver.swipe(x, start_y, x, end_y, duration) self.logger.info(f"页面已从Y:{start_y}滑动到Y:{end_y}") # 滑动后等待页面稳定 time.sleep(1) return True except Exception as e: self.logger.error(f"滑动页面时出错: {e}") return False def swipe_down(self, start_y=175, end_y=1310, duration=500): """ 从指定起始Y坐标滑动到结束Y坐标(向下滑动页面) 参数: start_y: 起始Y坐标 (默认407) end_y: 结束Y坐标 (默认1617) duration: 滑动持续时间(毫秒) (默认500) """ try: # 获取屏幕尺寸 window_size = self.driver.get_window_size() screen_width = window_size['width'] # 计算X坐标(屏幕中间) x = screen_width // 2 # 执行滑动操作(向下滑动) self.driver.swipe(x, start_y, x, end_y, duration) self.logger.info(f"页面已从Y:{start_y}滑动到Y:{end_y}(向下滑动)") # 滑动后等待页面稳定 time.sleep(1) return True except Exception as e: self.logger.error(f"向下滑动页面时出错: {e}") return False def is_on_upload_config_page(self): """通过"保存上传"按钮来确定是否在上传配置页面""" try: # 使用"保存上传"按钮的resource-id来检查 save_upload_btn_locator = (AppiumBy.ID, "com.bjjw.cjgc:id/improve_save_btn") self.wait.until(EC.presence_of_element_located(save_upload_btn_locator)) self.logger.info("已确认在上传配置页面") return True except TimeoutException: self.logger.warning("未找到保存上传按钮,不在上传配置页面") return False except Exception as e: self.logger.error(f"检查上传配置页面时发生意外错误: {str(e)}") return False def collect_all_point_data(self, results_dir): """循环滑动收集所有测点数据,直到没有新数据出现""" all_point_data = [] seen_point_names = set() # 用于跟踪已经见过的测点名称 max_scroll_attempts = 20 # 最大滑动次数,防止无限循环 scroll_attempt = 0 self.logger.info("开始循环滑动收集所有测点数据...") while scroll_attempt < max_scroll_attempts: scroll_attempt += 1 self.logger.info(f"第 {scroll_attempt} 次尝试获取数据...") # 获取当前屏幕的测点数据 current_point_data = self.get_point_data() if not current_point_data: self.logger.info("当前屏幕没有测点数据,停止滑动") break # 统计新发现的测点 new_points_count = 0 for point in current_point_data: point_name = point.get('point_name') if point_name and point_name not in seen_point_names: # 新测点,添加到结果集 all_point_data.append(point) seen_point_names.add(point_name) new_points_count += 1 self.logger.info(f"本次获取到 {len(current_point_data)} 个测点,其中 {new_points_count} 个是新测点") # 如果没有新数据,停止滑动 if new_points_count == 0: self.logger.info("没有发现新测点,停止滑动") break # 滑动到下一页 self.logger.info("滑动到下一页...") if not self.swipe_up(): self.logger.warning("滑动失败,停止收集") break # 等待页面稳定 time.sleep(1) self.logger.info(f"数据收集完成,共获取 {len(all_point_data)} 个测点数据") return all_point_data def collect_check_all_point_data(self, max_variation): """循环滑动收集所有测点数据,直到没有新数据出现""" all_point_data = [] seen_point_names = set() # 用于跟踪已经见过的测点名称 max_scroll_attempts = 100 # 最大滑动次数,防止无限循环 scroll_attempt = 0 self.logger.info("开始循环滑动收集所有测点数据...") while scroll_attempt < max_scroll_attempts: scroll_attempt += 1 self.logger.info(f"第 {scroll_attempt} 次尝试获取数据...") # 获取当前屏幕的测点数据 current_point_data = self.get_point_data() if not current_point_data: self.logger.info("当前屏幕没有测点数据,停止滑动") break # 统计新发现的测点 new_points_count = 0 for point in current_point_data: point_name = point.get('point_name') if point_name and point_name not in seen_point_names: # 新测点,添加到结果集 all_point_data.append(point) seen_point_names.add(point_name) new_points_count += 1 self.logger.info(f"本次获取到 {len(current_point_data)} 个测点,其中 {new_points_count} 个是新测点") # 如果没有新数据,停止滑动 if new_points_count == 0: self.logger.info("没有发现新测点,停止滑动") break # 滑动到下一页 self.logger.info("滑动到下一页...") if not self.swipe_up(): self.logger.warning("滑动失败,停止收集") break # 等待页面稳定 time.sleep(0.2) self.logger.info(f"数据收集完成,共获取 {len(all_point_data)} 个测点数据") # 直接比对每个测点的变化量 if max_variation is None: self.logger.error("获取用户最大变化量失败") max_variation = 2 # return False self.logger.info(f"开始比对测点变化量,最大允许变化量: {max_variation}mm") for i, point in enumerate(all_point_data, 1): point_name = point.get('point_name', '未知') point_value = point.get('point_value', '') # 从完整数据中提取变化量(格式如:变化量:-0.67mm;) change_amount_match = re.search(r'变化量:([-\d.]+)mm', point_value) if change_amount_match: try: change_amount = float(change_amount_match.group(1)) # self.logger.info(f"测点 {point_name} 变化量: {change_amount}mm, 最大允许变化量: {max_variation}mm") # 比较绝对值,因为变化量可能是负数 if abs(change_amount) > max_variation: self.logger.error(f"测点 {point_name} 变化量 {change_amount}mm 超过最大允许值 {max_variation}mm") return False except ValueError as e: self.logger.error(f"解析测点 {point_name} 的变化量失败: {str(e)},原始数据: {point_value}") return False else: self.logger.error(f"在测点 {point_name} 的数据中未找到变化量信息,原始数据: {point_value}") return False self.logger.info(f"所有测点变化量均在允许范围内(≤{max_variation}mm)") return True # def _load_user_data(self): # """加载用户数据从Excel文件""" # try: # # 默认路径:当前脚本的上一级目录下的"上传人员信息.xlsx" # current_dir = os.path.dirname(os.path.abspath(__file__)) # parent_dir = os.path.dirname(current_dir) # excel_path = os.path.join(parent_dir, "上传人员信息.xlsx") # if not os.path.exists(excel_path): # logging.error(f"Excel文件不存在: {excel_path}") # return False # # 读取Excel文件 # df = pd.read_excel(excel_path, sheet_name='Sheet1') # # 处理合并单元格 - 前向填充标段列 # df['标段'] = df['标段'].fillna(method='ffill') # # 清理数据:去除空行和无效数据 # df = df.dropna(subset=['测量人员信息']) # df = df[df['测量人员信息'].str.strip() != ''] # # 创建姓名到身份证的映射 # for _, row in df.iterrows(): # name = row['测量人员信息'] # id_card = str(row.iloc[4]).strip() # 第5列是身份证号 # # 处理身份证号格式(如果是浮点数转为整数) # if id_card.endswith('.0'): # id_card = id_card[:-2] # global_variable.GLOBAL_NAME_TO_ID_MAP[name] = id_card # global_variable.GLOBAL_NAME_TO_ID_MAP = df # logging.info(f"成功加载用户数据,共 {len(df)} 条记录,{len(global_variable.GLOBAL_NAME_TO_ID_MAP)} 个唯一姓名") # return True # except Exception as e: # logging.error(f"加载用户数据失败: {str(e)}") # return False def _load_user_data(self): """加载用户数据从Excel文件,只提取名字和身份证到字典""" try: # 默认路径:当前脚本的上一级目录下的"上传人员信息.xlsx" current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) excel_path = os.path.join(parent_dir, "上传人员信息.xlsx") if not os.path.exists(excel_path): logging.error(f"Excel文件不存在: {excel_path}") return False # 读取Excel文件 df = pd.read_excel(excel_path, sheet_name='Sheet1') # 处理合并单元格 - 前向填充标段列 # df['标段'] = df['标段'].fillna(method='ffill') df['标段'] = df['标段'].ffill() # 清理数据:去除空行和无效数据 df = df.dropna(subset=['测量人员信息']) df = df[df['测量人员信息'].str.strip() != ''] # 创建姓名到身份证的映射字典 name_id_map = {} for _, row in df.iterrows(): name = str(row['测量人员信息']).strip() # 第5列是身份证号(索引为4) id_card = str(row.iloc[4]).strip() if pd.notna(row.iloc[4]) else "" # 处理身份证号格式(如果是浮点数转为整数) if id_card.endswith('.0'): id_card = id_card[:-2] # 只将有效的姓名和身份证号添加到字典 if name and id_card and len(id_card) >= 15: # 身份证号至少15位 name_id_map[name] = id_card else: logging.warning(f"跳过无效数据: 姓名='{name}', 身份证='{id_card}'") # 将字典保存到全局变量 global_variable.GLOBAL_NAME_TO_ID_MAP = name_id_map logging.info(f"成功加载用户数据,共 {len(df)} 条记录,{len(name_id_map)} 个有效姓名-身份证映射") # 打印前几个映射用于调试 sample_names = list(name_id_map.keys())[:5] for name in sample_names: logging.debug(f"映射示例: {name} -> {name_id_map[name]}") return True except Exception as e: logging.error(f"加载用户数据失败: {str(e)}") return False def get_first_sjname_and_id(self, linecode: str, work_conditions: Dict) -> Optional[Dict]: """ 获取线路的第一个数据员姓名和身份证号 Args: linecode: 线路编码 Returns: 返回字典: {"name": 姓名, "id_card": 身份证号} """ if not work_conditions: logging.error(f"无法获取线路 {linecode} 的工况信息") return None # 直接取第一个测点的数据员 first_point_info = next(iter(work_conditions.values())) sjname = first_point_info.get('sjName') if not sjname: logging.error("第一个测点没有数据员信息") return None logging.info(f"使用第一个数据员: {sjname}") # 获取身份证号码 id_card = global_variable.GLOBAL_NAME_TO_ID_MAP.get(sjname) logging.info(f"id_card: {id_card}") if not id_card: logging.error(f"未找到数据员 {sjname} 对应的身份证号") return None return {"name": sjname, "id_card": id_card} # def set_all_points_and_fill_form(self, results_dir, name, user_id, condition_dict): # """点击设置所有测点按钮并填写表单""" # try: # self.logger.info("开始设置所有测点并填写表单...") # # 点击"设置所有测点"按钮 # set_all_points_btn = self.wait.until( # EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/display_setallpoint_btn")) # ) # set_all_points_btn.click() # self.logger.info("已点击'设置所有测点'按钮") # # 等待表单加载 # time.sleep(1) # # 填写司镜人员姓名 # name_input = self.wait.until( # EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/sname")) # ) # name_input.clear() # name_input.send_keys(name) # self.logger.info(f"已填写司镜人员姓名: {name}") # # 填写司镜人员身份证号 # id_card_input = self.wait.until( # EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/scard")) # ) # id_card_input.clear() # id_card_input.send_keys(user_id) # self.logger.info(f"已填写司镜人员身份证号: {user_id}") # # 选择测点状态 # if not self._select_point_status("正常"): # self.logger.error("选择测点状态失败") # return False # # 选择工况信息 # if not self._select_work_condition(condition_dict): # self.logger.error("选择工况信息失败") # return False # # 等待操作完成 # time.sleep(1) # # 检查操作结果 # try: # # 检查是否有成功提示或错误提示 # # 这里可以根据实际应用的上传反馈机制进行调整 # success_indicator = WebDriverWait(self.driver, 10).until( # EC.presence_of_element_located((AppiumBy.XPATH, "//*[contains(@text, '成功') or contains(@text, '完成')]")) # ) # self.logger.info(f"操作完成: {success_indicator.text}") # except TimeoutException: # self.logger.warning("未检测到明确的操作成功提示,但操作已完成") # return True # except TimeoutException: # self.logger.error("等待设置所有测点表单元素可点击超时") # return False # except Exception as e: # self.logger.error(f"设置所有测点并填写表单时出错: {str(e)}") # # 保存错误截图 # error_screenshot_file = os.path.join( # results_dir, # f"form_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" # ) # self.driver.save_screenshot(error_screenshot_file) # self.logger.info(f"错误截图已保存: {error_screenshot_file}") # return False def _select_point_status(self, status="正常"): logging.info(f"开始选择测点状态: {status}") """选择测点状态""" try: # 点击"选择测点状态"按钮 status_button = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/all_point_pstate_sp")) ) status_button.click() # 选择指定的状态 status_option = self.wait.until( EC.element_to_be_clickable((AppiumBy.XPATH, f"//android.widget.TextView[@resource-id='android:id/text1' and @text='{status}']")) ) status_option.click() return True except TimeoutException: self.logger.error("等待测点状态选择超时") return False except Exception as e: self.logger.error(f"选择测点状态时出错: {str(e)}") return False def set_all_points_and_fill_form(self, results_dir, name, user_id, main_condition_dict, minor_conditions_list): """点击设置所有测点按钮并填写表单""" try: self.logger.info("开始设置所有测点并填写表单...") # 点击"设置所有测点"按钮 set_all_points_btn = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/display_setallpoint_btn")) ) set_all_points_btn.click() self.logger.info("已点击'设置所有测点'按钮") # 等待表单加载 time.sleep(1) # 填写司镜人员姓名 name_input = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/sname")) ) name_input.clear() name_input.send_keys(name) self.logger.info(f"已填写司镜人员姓名: {name}") # 填写司镜人员身份证号 id_card_input = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/scard")) ) id_card_input.clear() id_card_input.send_keys(user_id) self.logger.info(f"已填写司镜人员身份证号: {user_id}") # 选择测点状态 if not self._select_point_status("正常"): self.logger.error("选择测点状态失败") return False # 选择工况信息 - 现在传入两个参数 if not self._select_work_condition(main_condition_dict, minor_conditions_list): self.logger.error("选择工况信息失败") return False # 等待操作完成 time.sleep(1) # 检查操作结果 try: success_indicator = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((AppiumBy.XPATH, "//*[contains(@text, '成功') or contains(@text, '完成')]")) ) self.logger.info(f"操作完成: {success_indicator.text}") except TimeoutException: self.logger.warning("未检测到明确的操作成功提示,但操作已完成") return True except TimeoutException: self.logger.error("等待设置所有测点表单元素可点击超时") return False except Exception as e: self.logger.error(f"设置所有测点并填写表单时出错: {str(e)}") return False def _select_work_condition(self, main_condition_dict: Dict[str, List[str]], minor_conditions_list: List[Dict]): """根据主要工况字典和次要工况列表选择工况信息""" self.logger.info("开始选择工况信息") self.logger.info(f"主要工况: {main_condition_dict}") self.logger.info(f"次要工况数量: {len(minor_conditions_list)}") # 工点类型编码与界面控件ID的映射 work_type_mapping = { "1": { # 隧道 "button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_suidao", "name": "隧道" }, "2": { # 区间路基 "button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_luji", "name": "路基" }, "3": { # 桥 "button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_qiaoliang", "name": "梁桥" }, "4": { # 涵洞 "button_id": "com.bjjw.cjgc:id/all_point_workinfo_sp_handong", "name": "涵洞" } } try: success_count = 0 # 第一步:为每个工点类型选择主要工况 for work_type, workinfo_names in main_condition_dict.items(): work_type_str = str(work_type).strip() if work_type_str not in work_type_mapping: self.logger.warning(f"未知的工点类型编码: {work_type_str},跳过") continue mapping = work_type_mapping[work_type_str] button_id = mapping["button_id"] work_type_name = mapping["name"] if workinfo_names: workinfo_name = workinfo_names[0] # 主要工况 self.logger.info(f"为{work_type_name}({work_type_str})选择主要工况: {workinfo_name}") try: # 点击工况选择按钮 condition_button = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, button_id)) ) condition_button.click() self.logger.info(f"成功点击{work_type_name}工况选择按钮") # 选择主要的工况选项 if self._select_condition_option(workinfo_name): self.logger.info(f"成功为{work_type_name}选择主要工况: {workinfo_name}") success_count += 1 else: self.logger.warning(f"未能为{work_type_name}选择主要工况: {workinfo_name}") except TimeoutException: self.logger.error(f"等待{work_type_name}工况选择按钮超时") except Exception as e: self.logger.error(f"点击{work_type_name}工况按钮时出错: {str(e)}") # 第二步:如果有次要工况,滑动页面处理 if minor_conditions_list: self.logger.info(f"开始处理 {len(minor_conditions_list)} 个次要工况") minor_success_count = self._handle_minor_work_conditions(minor_conditions_list) self.logger.info(f"次要工况处理完成: 成功 {minor_success_count}/{len(minor_conditions_list)} 个") else: self.logger.info("没有次要工况需要处理") self.logger.info(f"工况选择完成: 主要工况成功{success_count}/{len(main_condition_dict)}个") return success_count > 0 # 只要有一个主要工况成功就返回True except Exception as e: self.logger.error(f"选择工况信息时发生未知错误: {str(e)}") return False def _handle_minor_work_conditions(self, minor_conditions_list: List[Dict]) -> int: """处理次要工况:滑动页面,找到对应测点并设置工况""" success_count = 0 processed_points = set() # 记录已处理的测点,避免重复处理 logging.info(f"要处理的次要工况:{minor_conditions_list}") # 提取所有需要处理的测点ID target_point_ids = {condition['point_id'] for condition in minor_conditions_list} self.logger.info(f"目标测点ID: {list(target_point_ids)}") # 点击"关闭设置所有"按钮 close_set_all_points_btn = self.wait.until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/display_setallpoint_btn")) ) close_set_all_points_btn.click() self.logger.info("已点击'关闭设置所有测点'按钮") try: max_scroll_attempts = 20 # 最大滑动次数防止无限循环 scroll_attempt = 0 previous_points = set() # 记录上一页的测点 remaining_conditions = minor_conditions_list.copy() while scroll_attempt < max_scroll_attempts: scroll_attempt += 1 self.logger.info(f"第 {scroll_attempt} 次滑动处理次要工况...") # 获取当前页面的测点 current_points = set(self.collect_points_on_page()) self.logger.info(f"当前页面共找到 {len(current_points)} 个测点: {list(current_points)}") # 检查是否有新测点 if current_points == previous_points: self.logger.info("当前页面没有新的测点,停止滑动") break # 检查当前页面是否有目标测点 current_target_points = current_points & target_point_ids if not current_target_points: self.logger.info(f"当前页面没有目标测点,继续滑动查找...") # 滑动到下一页 if not self.swipe_down(): self.logger.warning("滑动失败,停止处理") break # 等待页面稳定 time.sleep(1) previous_points = current_points continue self.logger.info(f"当前页面找到目标测点: {list(current_target_points)}") # 如果当前测点列表的测点名,在minor_conditions_list中,就点击选择测点对应工况 # 处理当前页面的次要工况 page_success_count = self._process_minor_conditions_on_current_page( current_points, remaining_conditions, processed_points ) success_count += page_success_count # 更新剩余待处理的工况列表 remaining_conditions = [ condition for condition in remaining_conditions if condition['point_id'] not in processed_points ] self.logger.info(f"剩余待处理工况: {len(remaining_conditions)} 个") if remaining_conditions: remaining_points = [cond['point_id'] for cond in remaining_conditions] self.logger.info(f"剩余测点: {remaining_points}") # 更新已处理的测点记录 previous_points = current_points # 如果所有次要工况都已处理完成,提前退出 if len(processed_points) >= len(minor_conditions_list): self.logger.info(f"所有 {len(minor_conditions_list)} 个次要工况已处理完成") break # 滑动到下一页 self.logger.info("滑动到下一页继续查找...") if not self.swipe_down(): self.logger.warning("滑动失败,停止处理") break # 等待页面稳定 time.sleep(1) self.logger.info(f"次要工况处理完成: 成功 {success_count}/{len(minor_conditions_list)} 个") return success_count except Exception as e: self.logger.error(f"处理次要工况时出错: {str(e)}") return success_count def _process_minor_conditions_on_current_page(self, current_points: set, minor_conditions_list: List[Dict], processed_points: set) -> int: """处理当前页面上的次要工况""" page_success_count = 0 try: self.logger.info("开始处理当前页面上的次要工况...") # 1. 先定位所有测点的根容器 point_containers = self.driver.find_elements( AppiumBy.ID, "com.bjjw.cjgc:id/layout_popup_top" ) self.logger.info(f"当前页面找到 {len(point_containers)} 个测点容器") if not point_containers: self.logger.warning("当前页面未找到任何测点容器") return 0 # 2. 构建当前页面测点的映射关系:测点名称 -> 容器元素 point_container_map = {} for container in point_containers: try: name_element = container.find_element( AppiumBy.ID, "com.bjjw.cjgc:id/improve_point_name" ) point_name = name_element.text.strip() if point_name and point_name in current_points: point_container_map[point_name] = container self.logger.debug(f"映射测点: {point_name}") except NoSuchElementException: continue except Exception as e: self.logger.debug(f"解析测点容器时出错: {str(e)}") continue # 3. 遍历需要处理的次要工况列表 for minor_condition in minor_conditions_list: point_id = minor_condition['point_id'] workinfoname = minor_condition['workinfoname'] work_type = minor_condition['work_type'] # 检查是否已经处理过 if point_id in processed_points: self.logger.debug(f"测点 {point_id} 已处理过,跳过") continue # 检查是否在当前页面 if point_id not in current_points: self.logger.debug(f"测点 {point_id} 不在当前页面") continue # 检查是否有对应的容器 if point_id not in point_container_map: self.logger.warning(f"测点 {point_id} 在当前页面但未找到对应容器") continue try: self.logger.info(f"开始处理测点 {point_id} 的次要工况: {workinfoname}") # 4. 在当前测点的容器内查找工况选择按钮 container = point_container_map[point_id] workinfo_button = container.find_element( AppiumBy.ID, "com.bjjw.cjgc:id/point_workinfo_sp" ) # 验证按钮是否可见和可用 if workinfo_button.is_displayed() and workinfo_button.is_enabled(): workinfo_button.click() self.logger.info(f"已点击测点 {point_id} 的工况选择按钮") # 选择对应的工况选项 if self._select_minor_conditions_option(workinfoname, work_type): page_success_count += 1 processed_points.add(point_id) self.logger.info(f"成功为测点 {point_id} 设置次要工况: {workinfoname}") else: self.logger.warning(f"为测点 {point_id} 选择工况选项失败") else: self.logger.warning(f"测点 {point_id} 的工况选择按钮不可用") except NoSuchElementException: self.logger.warning(f"未找到测点 {point_id} 的工况选择按钮") except Exception as e: self.logger.error(f"处理测点 {point_id} 时出错: {str(e)}") self.logger.info(f"当前页面成功处理 {page_success_count} 个测点的次要工况") return page_success_count except Exception as e: self.logger.error(f"处理当前页面次要工况时发生异常: {str(e)}") return page_success_count def _select_minor_conditions_option(self, option_name: str, work_type_name: str) -> bool: """根据工况名称选择对应的下拉列表中的选项""" try: self.logger.info(f"开始选择次要工况选项: {option_name}") # 方法1: 通过文本精确匹配 try: option_xpath = f"//android.widget.TextView[@text='{option_name}']" option_element = WebDriverWait(self.driver, 5).until( EC.element_to_be_clickable((AppiumBy.XPATH, option_xpath)) ) option_element.click() self.logger.info(f"通过文本匹配成功选择工况: {option_name}") return True except TimeoutException: self.logger.debug(f"通过文本 '{option_name}' 未找到工况选项") # 方法2: 通过列表项ID查找 try: list_items = self.driver.find_elements(AppiumBy.ID, "android:id/text1") for item in list_items: if item.text == option_name: item.click() self.logger.info(f"通过列表项成功选择工况: {option_name}") return True except Exception as e: self.logger.debug(f"通过列表项查找失败: {str(e)}") # 方法3: 滑动查找 max_scroll_attempts = 3 for attempt in range(max_scroll_attempts): try: option_element = self.driver.find_element( AppiumBy.XPATH, f"//android.widget.TextView[@text='{option_name}']" ) option_element.click() self.logger.info(f"通过滑动后查找成功选择工况: {option_name}") return True except NoSuchElementException: self.logger.debug(f"第 {attempt + 1} 次滑动查找未找到选项: {option_name}") # 执行滑动 self._scroll_condition_options() except Exception as e: self.logger.debug(f"滑动查找时出错: {str(e)}") break self.logger.error(f"所有方法都无法找到工况选项: {option_name}") return False except TimeoutException: self.logger.error(f"等待工况选项可点击超时: {option_name}") return False except Exception as e: self.logger.error(f"选择工况选项时出错: {str(e)}") return False def _scroll_condition_options(self): """滑动工况选项列表""" try: # 获取屏幕尺寸 window_size = self.driver.get_window_size() start_x = window_size['width'] * 0.5 start_y = window_size['height'] * 0.7 end_y = window_size['height'] * 0.3 # 执行滑动 self.driver.swipe(start_x, start_y, start_x, end_y, 500) self.logger.debug("执行滑动操作查找更多工况选项") time.sleep(1) # 等待滑动完成 except Exception as e: self.logger.error(f"滑动工况选项列表时出错: {str(e)}") def collect_points_on_page(self) -> List[str]: """ 收集当前页面中存在的测点ID列表 要求:必须同时存在"com.bjjw.cjgc:id/improve_point_name"和"com.bjjw.cjgc:id/point_workinfo_sp"才返回 """ point_ids = [] try: self.logger.info("开始收集当前页面中的测点ID...") # 1. 先定位所有测点的根容器(RelativeLayout),确保在同一容器内查找子元素 point_containers = self.driver.find_elements( AppiumBy.ID, "com.bjjw.cjgc:id/layout_popup_top" # 测点根容器的resource-id ) self.logger.info(f"找到 {len(point_containers)} 个测点根容器") for container in point_containers: try: # 2. 在当前根容器内查找测点名称元素 name_element = container.find_element( AppiumBy.ID, "com.bjjw.cjgc:id/improve_point_name" ) point_name = name_element.text if not point_name: self.logger.debug("测点名称为空,跳过") continue # 3. 在当前根容器内查找工况按钮(按实际层级定位) # 层级:RelativeLayout -> LinearLayout -> LinearLayout -> Button workinfo_button = container.find_element( AppiumBy.XPATH, ".//android.widget.LinearLayout/android.widget.LinearLayout/android.widget.Button[@resource-id='com.bjjw.cjgc:id/point_workinfo_sp']" ) # 4. 验证按钮是否可见 if workinfo_button.is_displayed(): point_ids.append(point_name) self.logger.debug(f"找到有效测点: {point_name}") except NoSuchElementException as e: # 若名称或工况按钮不存在,跳过当前容器 self.logger.debug(f"测点容器中缺少必要元素: {str(e)}") continue except Exception as e: self.logger.debug(f"解析测点容器时出错: {str(e)}") continue self.logger.info(f"当前页面共找到 {len(point_ids)} 个有效测点: {point_ids}") return point_ids except Exception as e: self.logger.error(f"收集页面测点ID时出错: {str(e)}") return [] def collect_all_points_on_page(self) -> List[Dict]: """滑动收集页面中所有测点的信息""" all_points = [] seen_point_names = set() max_scroll_attempts = 50 scroll_attempt = 0 self.logger.info("开始滑动收集页面中所有测点...") while scroll_attempt < max_scroll_attempts: scroll_attempt += 1 self.logger.info(f"第 {scroll_attempt} 次滑动收集...") # 获取当前屏幕的测点 current_points = self._get_current_screen_points_detail() if not current_points: self.logger.info("当前屏幕没有测点数据") break # 添加新发现的测点 new_points_count = 0 for point in current_points: point_name = point.get('point_name') if point_name and point_name not in seen_point_names: all_points.append(point) seen_point_names.add(point_name) new_points_count += 1 self.logger.info(f"本次获取到 {len(current_points)} 个测点,其中 {new_points_count} 个是新测点") if new_points_count == 0: self.logger.info("没有发现新测点,停止滑动") break # 滑动到下一页 if not self.swipe_up(): self.logger.warning("滑动失败,停止收集") break time.sleep(0.5) self.logger.info(f"共收集到 {len(all_points)} 个测点的详细信息") return all_points def _get_current_screen_points_detail(self) -> List[Dict]: """获取当前屏幕测点的详细信息""" points = [] try: # 查找所有测点容器 point_containers = self.driver.find_elements( AppiumBy.XPATH, "//android.widget.LinearLayout[@resource-id='com.bjjw.cjgc:id/layout_popup_top']" ) for container in point_containers: try: point_info = {} # 获取测点名称(作为point_id的替代) name_element = container.find_element( AppiumBy.ID, "com.bjjw.cjgc:id/improve_point_name" ) point_name = name_element.text point_info['point_name'] = point_name point_info['point_id'] = point_name # 使用名称作为ID,或者根据实际情况调整 # 获取测点元素引用,用于后续操作 point_info['element'] = container # 获取当前工况信息 try: workinfo_element = container.find_element( AppiumBy.ID, "com.bjjw.cjgc:id/point_workinfo_sp" ) point_info['current_workinfo'] = workinfo_element.text point_info['workinfo_element'] = workinfo_element except NoSuchElementException: point_info['current_workinfo'] = "" point_info['workinfo_element'] = None points.append(point_info) except Exception as e: self.logger.debug(f"解析单个测点信息时出错: {str(e)}") continue except Exception as e: self.logger.error(f"获取当前屏幕测点详细信息时出错: {str(e)}") return points def _set_single_point_work_condition(self, point_data: Dict, workinfo_name: str, work_type: str) -> bool: """为单个测点设置工况信息""" try: point_name = point_data.get('point_name') self.logger.info(f"开始为测点 {point_name} 设置工况: {workinfo_name}") # 使用保存的元素引用点击工况选择按钮 workinfo_element = point_data.get('workinfo_element') if workinfo_element: workinfo_element.click() time.sleep(1) # 等待选项弹出 # 选择指定的工况 if self._select_condition_option(workinfo_name): self.logger.info(f"成功为测点 {point_name} 设置工况: {workinfo_name}") return True else: self.logger.warning(f"为测点 {point_name} 选择工况选项失败") return False else: self.logger.warning(f"未找到测点 {point_name} 的工况选择按钮") return False except Exception as e: self.logger.error(f"为测点 {point_name} 设置工况时出错: {str(e)}") return False def _select_condition_option(self, condition_name: str) -> bool: """选择具体的工况选项 Args: condition_name: 工况名称 Returns: bool: 是否选择成功 """ try: self.logger.info(f"开始选择工况选项: {condition_name}") # 方法1: 通过文本查找并点击 try: option_xpath = f"//android.widget.TextView[@text='{condition_name}']" option_element = WebDriverWait(self.driver, 5).until( EC.element_to_be_clickable((AppiumBy.XPATH, option_xpath)) ) option_element.click() self.logger.info(f"通过文本定位成功选择工况: {condition_name}") return True except TimeoutException: self.logger.debug(f"通过文本'{condition_name}'未找到工况选项") # 方法2: 通过列表项查找 try: # 假设工况选项在列表中,点击第一个可用的选项 list_item = WebDriverWait(self.driver, 5).until( EC.element_to_be_clickable((AppiumBy.ID, "android:id/text1")) ) list_item.click() self.logger.info("通过列表项选择工况") return True except TimeoutException: self.logger.debug("未找到列表项形式的工况选项") # 方法3: 尝试点击屏幕特定位置(备选方案) try: # 获取屏幕尺寸 window_size = self.driver.get_window_size() x = window_size['width'] // 2 y = window_size['height'] // 2 # 点击屏幕中央(假设选项在中间) self.driver.tap([(x, y)]) self.logger.info("通过点击屏幕中央选择工况") return True except Exception as e: self.logger.debug(f"点击屏幕中央失败: {str(e)}") self.logger.error(f"所有方法都无法选择工况选项: {condition_name}") return False except Exception as e: self.logger.error(f"选择工况选项时出错: {str(e)}") return False def _scroll_to_find_condition(self): """滑动查找工况选项的辅助方法""" try: # 获取屏幕尺寸 window_size = self.driver.get_window_size() start_x = window_size['width'] * 0.5 start_y = window_size['height'] * 0.7 end_y = window_size['height'] * 0.3 # 执行滑动 self.driver.swipe(start_x, start_y, start_x, end_y, 500) logging.debug("执行滑动操作") except Exception as e: logging.error(f"滑动操作时出错: {str(e)}") def aging_down_data(self, breakpoint_name=None, retry_count=0): """跳转上传配置页面,根据断点名称点击上传按钮,检查是否在上传配置页面 ->变化量属性,执行下载操作""" try: self.logger.info("开始执行上传配置页面操作aging") # # 跳转到上传配置页面 # if not self.go_upload_config_page(): # self.logger.error("跳转上传配置页面失败") # return False # 跳转到上传配置页面 if not go_main_click_tabber_button(self.driver, self.device_id, "com.bjjw.cjgc:id/img_2_layout"): logging.error(f"设备 {self.device_id} 跳转到上传配置页面失败") return False # 根据断点名称点击上传按钮 if not self.click_upload_by_breakpoint_name(breakpoint_name): self.logger.error("点击上传按钮失败") return False if not self.handle_upload_dialog(): self.logger.error("处理上传对话框失败") return False self.logger.info("开始执行上传配置页面测试") # 检查是否在上传配置页面 if not self.is_on_upload_config_page(): self.logger.info("不在上传配置页面,尝试导航...") return False # 检查是否有变化量属性 if not self.check_change_amount_on_page(): self.logger.info("页面中缺少变化量属性,执行下载操作") # 执行下载操作 if not self.execute_download_operation(): self.logger.error("下载操作执行失败") return False time.sleep(1) self.logger.info("已返回上传配置页面") # 如果是第一次重试,再次调用aging_down_data if retry_count < 1: self.logger.info(f"第{retry_count+1}次重试,再次执行aging_down_data流程") return self.aging_down_data(breakpoint_name, retry_count + 1) else: # 第二次仍然缺少变化量属性,返回错误 self.logger.error("已执行两次下载操作,页面仍然缺少变化量属性") return False else: self.logger.info("页面中包含变化量属性,继续执行后续操作") return True except Exception as e: self.logger.error(f"设备执行上传重复下载数据操作失败:{e}") return False def click_save_upload_and_handle_dialogs(self): """点击保存上传并处理弹窗""" try: self.logger.info("开始点击保存上传并处理弹窗") # 点击保存上传按钮 save_upload_btn = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((AppiumBy.ID, "com.bjjw.cjgc:id/improve_save_btn")) ) save_upload_btn.click() self.logger.info("已点击保存上传按钮") # 处理警告弹窗 time.sleep(1) if not self.handle_warning_dialog(): self.logger.error("处理警告弹窗失败") return False return True except TimeoutException: self.logger.error("点击保存上传按钮超时") return False except Exception as e: self.logger.error(f"点击保存上传并处理弹窗时出错: {str(e)}") return False def handle_warning_dialog(self): """处理警告弹窗""" try: self.logger.info("检查并处理警告弹窗") # 等待弹窗出现 warning_dialog = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((AppiumBy.ID, "android:id/parentPanel")) ) # 获取弹窗文本确认是目标弹窗 alert_title = warning_dialog.find_element(AppiumBy.ID, "android:id/alertTitle") alert_message = warning_dialog.find_element(AppiumBy.ID, "android:id/message") self.logger.info(f"检测到弹窗 - 标题: {alert_title.text}, 消息: {alert_message.text}") # 根据业务逻辑选择"是"或"否" # 这里选择"是"来上传本次数据 yes_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button1") yes_button.click() self.logger.info("已点击'是'按钮确认上传") # no_button = warning_dialog.find_element(AppiumBy.ID, "android:id/button3") # no_button.click() # self.driver.back() # self.logger.info("已点击'否'按钮取消上传和返回按钮") return True except TimeoutException: self.logger.info("未检测到警告弹窗,继续流程") return True # 没有弹窗也是正常情况 except Exception as e: self.logger.error(f"处理警告弹窗时出错: {str(e)}") return False def wait_for_upload_completion(self): """等待上传完成""" try: # time.sleep(2) self.logger.info("开始等待上传完成") # 等待弹窗显示 upload_list = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((AppiumBy.ID, "android:id/customPanel")) ) #等待弹窗消失 WebDriverWait(self.driver, 20).until( EC.invisibility_of_element_located((AppiumBy.ID, "android:id/customPanel")) ) self.logger.info("已返回到上传列表页面,上传已完成") return True except TimeoutException: self.logger.error("等待上传完成超时") return False except Exception as e: self.logger.error(f"等待上传完成时出错: {str(e)}") return False def parse_work_conditions(self, work_conditions): """ 解析工况信息,区分主要工况和次要工况 返回: (主要工况字典, 次要工况列表) """ if not work_conditions: logging.warning("工况数据为空") return {}, [] try: # 统计每个work_type下各个workinfoname的出现次数 work_type_stats = {} point_work_conditions = {} # 记录每个测点的工况信息 for point_id, condition_data in work_conditions.items(): work_type = str(condition_data.get('work_type', '')) workinfoname = condition_data.get('workinfoname', '') sjname = condition_data.get('sjName', '') # 记录测点工况信息 point_work_conditions[point_id] = { 'work_type': work_type, 'workinfoname': workinfoname, 'sjname': sjname } # 统计出现次数 if work_type and workinfoname: if work_type not in work_type_stats: work_type_stats[work_type] = {} if workinfoname in work_type_stats[work_type]: work_type_stats[work_type][workinfoname] += 1 else: work_type_stats[work_type][workinfoname] = 1 # 分离主要工况和次要工况 main_condition_dict = {} # 主要工况字典 {work_type: [主要workinfoname]} minor_conditions_list = [] # 次要工况列表 [{point_id, work_type, workinfoname}] # 定义阈值:出现次数少于这个值的认为是次要工况 minor_threshold = 3 # 可以根据实际情况调整 for work_type, workinfoname_counts in work_type_stats.items(): if workinfoname_counts: # 按出现次数从多到少排序 sorted_workinfonames = sorted(workinfoname_counts.items(), key=lambda x: x[1], reverse=True) # 主要工况:取出现次数最多的 if sorted_workinfonames: main_workinfoname = sorted_workinfonames[0][0] main_condition_dict[work_type] = [main_workinfoname] logging.info(f"work_type {work_type} 的主要工况: '{main_workinfoname}' (出现{sorted_workinfonames[0][1]}次)") # 次要工况:收集所有出现次数较少的工况及其对应测点 for workinfoname, count in sorted_workinfonames: if count <= minor_threshold: # 找到使用这个次要工况的所有测点 for point_id, point_info in point_work_conditions.items(): if (point_info['work_type'] == work_type and point_info['workinfoname'] == workinfoname): minor_conditions_list.append({ 'point_id': point_id, 'work_type': work_type, 'workinfoname': workinfoname, 'sjname': point_info['sjname'], 'count': count }) logging.info(f"解析结果: 主要工况 {len(main_condition_dict)} 种,次要工况 {len(minor_conditions_list)} 个测点") # 打印次要工况详情用于调试 for minor_condition in minor_conditions_list: logging.info(f"次要工况 - 测点 {minor_condition['point_id']}: work_type={minor_condition['work_type']}, workinfoname='{minor_condition['workinfoname']}'") return main_condition_dict, minor_conditions_list except Exception as e: logging.error(f"解析工况信息时发生错误: {str(e)}") return {}, [] def get_work_type_name(work_type: str) -> str: """ 根据工点类型编码获取类型名称 Args: work_type: 工点类型编码(1-隧道,2-区间路基,3-桥, 4-涵洞) Returns: 工点类型名称 """ work_type_mapping = { "1": "隧道", "2": "区间路基", "3": "桥", "4": "涵洞" } return work_type_mapping.get(work_type, f"未知类型({work_type})") def upload_config_page_manager(self, results_dir, breakpoint_name=None, line_num=None): """执行上传配置页面管理操作""" try: # 保存参数为实例属性 self.results_dir = results_dir self.breakpoint_name = breakpoint_name self.line_num = line_num self.logger.info("开始执行上传配置页面操作manager") # 跳转到上传配置页面 if not go_main_click_tabber_button(self.driver, self.device_id, "com.bjjw.cjgc:id/img_2_layout"): logging.error(f"设备 {self.device_id} 跳转到测量页面失败") return False # 根据断点名称点击上传按钮 if not self.click_upload_by_breakpoint_name(breakpoint_name): self.logger.error("点击上传按钮失败") return False if not self.handle_upload_dialog(): self.logger.error("处理上传对话框失败") return False self.logger.info("开始执行上传配置页面测试") # 检查是否在上传配置页面 if not self.is_on_upload_config_page(): self.logger.info("不在上传配置页面,尝试导航...") return False # 检查是否有变化量属性,有就执行下面代码,没有就执行重新下载函数 # 直接检查页面中是否有"变化量"属性 if not self.check_change_amount_on_page(): self.logger.info("页面中缺少变化量属性,执行下载操作") # 执行下载操作 if not self.execute_download_operation(): self.logger.error("下载操作执行失败") return False # 下载操作完成后,点击上传导航按钮跳转到上传页面执行更多下载操作并检查状态 if not self.aging_down_data(breakpoint_name): self.logger.error("三次下载都失败,属性不存在变量") return False else: self.logger.info("页面中包含变化量属性,继续执行后续操作") user_id = global_variable.GLOBAL_USERNAME if user_id is None: self.logger.error("获取用户ID失败") return False max_variation = apis.get_user_max_variation(user_id) if max_variation is None: self.logger.error("获取用户最大变化量失败") return False # # 循环滑动收集所有测点数据 # logging.info("准备循环滑动收集所有测点数据") # all_point_data = self.collect_all_point_data(results_dir) # # 保存测试结果 # result_file = os.path.join(results_dir, f"{self.line_num}_{datetime.now().strftime('%Y%m%d')}.txt") # with open(result_file, 'w', encoding='utf-8') as f: # f.write(f"测试时间: {datetime.now().strftime('%Y-%m-%d')}\n") # f.write(f"获取到的测点数: {len(all_point_data)}\n\n") # for i, point in enumerate(all_point_data, 1): # f.write(f"测点 {i}: {point.get('point_name', '未知')}\n") # parsed = point.get('parsed_data', {}) # # if parsed: # # f.write(f" 本期数值: {parsed.get('current_value', 'N/A')}\n") # # f.write(f" 上期数值: {parsed.get('previous_value', 'N/A')}\n") # # f.write(f" 变化量: {parsed.get('change_amount', 'N/A')}\n") # # f.write(f" 测量时间: {parsed.get('measurement_time', 'N/A')}\n") # f.write(f"完整数据:\n{point.get('point_value', 'N/A')}\n\n") # self.logger.info(f"测试结果已保存到: {result_file}") # 给ai接口发送文件,等待接口返回是否能上传数据 # 不能就点击手机返回按钮,能就继续填写表单 # if not self.check_ai_upload_permission(result_file): # self.logger.info("AI接口返回不允许上传,点击返回按钮") # self.driver.back() # return True # 返回True继续下一个断点的上传配置。 if not self.collect_check_all_point_data(max_variation): self.logger.error(f"断点 '{breakpoint_name}' 上传失败") self.driver.back() return False # 返回False继续下一个断点的上传配置。 # 获取线路的所有工况信息 work_conditions = apis.get_work_conditions_by_linecode(self.line_num) # work_conditions = {'1962527': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后,第1个月', 'work_type': 2}, # '0299815Z2': {'sjName': '王顺', 'workinfoname': '冬休', 'work_type': 2}, # '0299820H1': {'sjName': '王顺', 'workinfoname': '架桥机(运梁车) 首次通过后', 'work_type': 4}, # '0431248D1': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后,第1个月', 'work_type': 4}, # '0431248D2': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后,第1个月', 'work_type': 4}, # '0299815Z1': {'sjName': '王顺', 'workinfoname': '架桥机(运梁车) 首次通过前', 'work_type': 2}, # '0431289D2': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后,第1个月', 'work_type': 4}, # '0431330D1': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后,第1个月', 'work_type': 2}, # '0431330D2': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后,第1个月', 'work_type': 2}, # '0431370D1': {'sjName': '王顺', 'workinfoname': '轨道板(道床)铺设后,第1个月', 'work_type': 2}} self.logger.info(f"获取线路工况信息成功: {work_conditions}") if not work_conditions: self.logger.error("获取工况信息失败") return False # 提取人员姓名和身份证 if not self._load_user_data(): self.logger.error("加载用户数据失败") return False # 获取第一个数据员姓名和身份证号 user_info = self.get_first_sjname_and_id(self.line_num, work_conditions) self.logger.info(f"获取到的第一个数据员姓名和身份证号为:{user_info}") if not user_info: self.logger.error(f"无法获取线路 '{self.line_num}' 的数据员信息") return False # 解析工况信息,现在返回两个值:主要工况字典和次要工况列表 main_condition_dict, minor_conditions_list = self.parse_work_conditions(work_conditions) self.logger.info(f"主要工况: {main_condition_dict}") self.logger.info(f"次要工况数量: {len(minor_conditions_list)}") # 设置所有测点并填写表单 - 传入两个参数 if not self.set_all_points_and_fill_form(results_dir, user_info.get("name"), user_info.get("id_card"), main_condition_dict, minor_conditions_list): self.logger.error("设置所有测点并填写表单失败") return False # # 表达填写完成,点击"保存上传"并处理弹窗 # if not self.click_save_upload_and_handle_dialogs(): # self.logger.error("点击保存上传并处理弹窗失败") # return False # 暂不上传,使用返回按钮替代。 self.driver.back() # 等待上传,查看loading弹窗。没有就下一个 if not self.wait_for_upload_completion(): self.logger.error("等待上传完成失败") return False self.logger.info("上传配置页面操作执行完成") # 把上传成功的断点写入全局变量GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST global_variable.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST.append(breakpoint_name) return True except Exception as e: self.logger.error(f"执行上传配置页面操作时出错: {str(e)}") # # 保存错误截图 # error_screenshot_file = os.path.join( # results_dir, # f"upload_config_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" # ) # self.driver.save_screenshot(error_screenshot_file) # self.logger.info(f"错误截图已保存: {error_screenshot_file}") return False