待补平差检查和上传逻辑

This commit is contained in:
2026-05-26 14:40:04 +08:00
parent 19afa3d19b
commit 10db652cbc
18 changed files with 217 additions and 29 deletions

Binary file not shown.

View File

@@ -48,6 +48,11 @@ class DeviceAutomation:
self.driver = driver
self.wait = wait
self.device_id = device_id
if not self.device_id:
logging.error("设备 ID 为空,无法初始化自动化")
return
# 初始化权限
if driver_utils.grant_appium_permissions(self.device_id):
logging.info(f"设备 {self.device_id} 授予Appium权限成功")
@@ -303,16 +308,19 @@ class DeviceAutomation:
# 主执行逻辑
if __name__ == "__main__":
create_link.setup_adb_wireless()
device_id = create_link.setup_adb_wireless()
automation = None
try:
automation = DeviceAutomation()
success = automation.run_automation()
if success:
logging.info(f"设备 {automation.device_id} 自动化流程执行成功")
if not device_id:
logging.error("未能获取设备 ID无法继续执行自动化流程")
else:
logging.error(f"设备 {automation.device_id} 自动化流程执行失败")
automation = DeviceAutomation(device_id=device_id)
success = automation.run_automation()
if success:
logging.info(f"设备 {automation.device_id} 自动化流程执行成功")
else:
logging.error(f"设备 {automation.device_id} 自动化流程执行失败")
except Exception as e:
logging.error(f"设备执行出错: {str(e)}")
finally:

View File

@@ -3,6 +3,8 @@ import time
import requests
import pandas as pd
from io import BytesIO
from datetime import datetime
import os
import subprocess
import globals.global_variable as global_variable
import globals.driver_utils as driver_utils # 导入驱动工具模块
@@ -84,6 +86,7 @@ class CheckStation:
# return device_id
def get_measure_data(self):
# 模拟获取测量数据
pass
@@ -188,7 +191,14 @@ class CheckStation:
return result
def main_run(self):
return self.add_transition_point()
if not self.add_transition_point():
logging.error("添加转点失败")
return False
if not self.take_screenshot():
logging.error("截图失败")
return False
logging.info("检查站点成功")
return True
def run(self, station_num: int):
# last_station_num = 0
@@ -247,9 +257,123 @@ class CheckStation:
# 错误处理,可以继续循环或退出
print(f"已处理{over_station_num}个站点")
# 截图
self.driver.save_screenshot("check_station.png")
# # 截图
# self.driver.save_screenshot("check_station.png")
if not self.take_screenshot():
logging.error(f"设备 {device_id} 截图失败")
# 打完数据,截图完毕,点击平差处理按钮
if not self.click_adjustment_button(device_id):
self.logger.error(f"设备 {device_id} 点击平差处理按钮失败")
return False
return True
def click_adjustment_button(self, device_id):
"""
点击平差处理按钮
Args:
device_id: 设备ID
Returns:
bool: 是否成功点击
"""
try:
self.logger.info(f"设备 {device_id} 查找平差处理按钮")
# 查找平差处理按钮
adjustment_button = self.driver.find_element(AppiumBy.ID, "com.bjjw.cjgc:id/point_measure_btn")
# 验证按钮文本
button_text = adjustment_button.text
if "平差处理" not in button_text:
self.logger.warning(f"设备 {device_id} 按钮文本不匹配,期望'平差处理',实际: {button_text}")
if adjustment_button.is_displayed() and adjustment_button.is_enabled():
self.logger.info(f"设备 {device_id} 点击平差处理按钮")
adjustment_button.click()
time.sleep(3) # 等待平差处理完成
return True
else:
self.logger.error(f"设备 {device_id} 平差处理按钮不可点击")
return False
except NoSuchElementException:
self.logger.error(f"设备 {device_id} 未找到平差处理按钮")
return False
except Exception as e:
self.logger.error(f"设备 {device_id} 点击平差处理按钮时发生错误: {str(e)}")
return False
def take_screenshot(self):
"""
通过Appium驱动截取设备屏幕
参数:
filename_prefix: 断点名称
返回:
bool: 操作是否成功
"""
try:
# 获取项目名称
project_name = global_variable.GLOBAL_USERNAME or "用户名"
filename_prefix = global_variable.GLOBAL_CURRENT_PROJECT_NAME or "平差页面截图"
# 获取当前日期
date_str = datetime.now().strftime("%Y%m%d")
# if not date_str:
# date_str = datetime.now().strftime("%Y%m%d")
# 获取当前时间(如果没有提供),并确保格式合法(不含冒号)
time_str = datetime.now().strftime("%H%M%S")
# 创建D盘下的截图目录结构D:\uploadInfo\picture\项目名\年月日
screenshots_dir = os.path.join("D:\\", "uploadInfo", "picture", project_name, date_str)
# 确保目录存在
try:
os.makedirs(screenshots_dir, exist_ok=True)
logging.info(f"截图目录: {screenshots_dir}")
except Exception as dir_error:
logging.error(f"创建截图目录失败: {str(dir_error)}")
return False
line_code = global_variable.GLOBAL_LINE_NUM
if not line_code:
logging.error(f"未找到与断点名称 {filename_prefix} 对应的线路编码")
line_code = "unknown"
# 截图保存
screenshot_file = os.path.join(
screenshots_dir,
f"{line_code}_{filename_prefix}_{time_str}.png"
)
# 尝试保存截图
try:
success = self.driver.save_screenshot(screenshot_file)
if success:
logging.info(f"截图已保存: {screenshot_file}")
# 验证文件是否真的存在
if os.path.exists(screenshot_file):
logging.info(f"截图文件验证存在: {screenshot_file}")
else:
logging.warning(f"截图文件保存成功但验证不存在: {screenshot_file}")
return True
else:
logging.error(f"Appium截图保存失败: {screenshot_file}")
return False
except Exception as save_error:
logging.error(f"保存截图时发生错误: {str(save_error)}")
return False
except Exception as e:
logging.error(f"截图时发生错误: {str(e)}")
return False
def get_excel_from_url(url):
"""
从URL获取Excel文件并解析为字典

View File

@@ -87,3 +87,4 @@ def main() -> int:
if __name__ == "__main__":
sys.exit(main())

View File

@@ -325,7 +325,8 @@ def get_work_conditions_by_linecode(linecode: str) -> Optional[Dict[str, Dict]]:
while retry_count < max_retries:
try:
# 准备请求参数
payload = {"linecode": linecode}
account_id = global_variable.GLOBAL_ACCOUNT_ID
payload = {"linecode": linecode, "account_id": account_id}
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
@@ -422,7 +423,8 @@ def get_user_max_variation(username: str) -> Optional[int]:
# 1. 准备请求参数与头部
# 接口要求的POST参数JSON格式
payload = {"username": username}
account_id = global_variable.GLOBAL_ACCOUNT_ID
payload = {"username": username, "account_id": account_id}
# 请求头部指定JSON格式模拟浏览器UA避免被接口拦截
headers = {
"Content-Type": "application/json",

View File

@@ -17,8 +17,12 @@ from urllib3.connection import port_by_scheme
def run_command(command):
"""执行系统命令并返回输出"""
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return result.stdout.strip()
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, encoding='utf-8', errors='ignore')
return result.stdout.strip()
except Exception:
result = subprocess.run(command, shell=True, capture_output=True, text=True, errors='ignore')
return result.stdout.strip()
# =======================
@@ -131,18 +135,39 @@ def cleanup_wireless_connections(target_device_ip=None, target_port=4723):
def start_appium():
appium_port = 4723
print(f"🚀 启动 Appium Server端口 {appium_port}...")
subprocess.Popen(
["appium.cmd", "-a", "127.0.0.1", "-p", str(appium_port)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# 检查端口是否就绪替代固定sleep
# 先检查端口是否已被占用
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(("127.0.0.1", appium_port))
sock.close()
if result == 0:
print(f"✅ Appium Server 已在运行(端口 {appium_port}")
return True
except Exception:
pass
# 启动 Appium Server
try:
subprocess.Popen(
["appium.cmd", "-a", "127.0.0.1", "-p", str(appium_port)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
except FileNotFoundError:
print("❌ 未找到 appium.cmd请确认 Appium 已正确安装")
return False
except Exception as e:
print(f"❌ Appium Server 启动失败: {str(e)}")
return False
# 检查端口是否就绪
max_wait = 30 # 最大等待30秒
start_time = time.time()
while time.time() - start_time < max_wait:
try:
# 尝试连接Appium端口验证是否就绪
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(("127.0.0.1", appium_port))
@@ -190,7 +215,7 @@ def setup_adb_wireless():
if not usb_devices:
print("❌ 未检测到 USB 设备")
return
return None
for serial in usb_devices:
print(f"\n🔎 处理设备: {serial}")
@@ -261,7 +286,10 @@ def setup_adb_wireless():
# print("🔄 关闭Appium连接...")
# driver.quit()
break # 处理完第一个设备后退出,如需处理多个设备可移除此行
return wireless_id # 返回无线连接的设备ID
print("❌ 未能成功建立无线连接")
return None
# =======================

View File

@@ -547,6 +547,10 @@ def grant_appium_permissions(device_id: str, require_all: bool = False) -> bool:
"""
修复版:为 Appium 授予权限(使用正确的方法)
"""
if not device_id:
logging.error("设备 ID 为空,无法授予 Appium 权限")
return False
logging.info(f"设备 {device_id}开始设置Appium权限")
# 1. 使用系统设置命令替代原来的pm grant尝试
@@ -1084,6 +1088,11 @@ def go_main_click_tabber_button(driver, device_id, tabber_button_text, max_retri
返回:
bool: 成功返回True失败返回False
"""
# 检查沉降观测应用是否已启动
if not is_app_launched(driver):
logging.warning(f"设备 {device_id} 沉降观测应用未启动")
return False
retry_count = 0
while retry_count < max_retries:

View File

@@ -1,11 +1,12 @@
# 全局变量
GLOBAL_DEVICE_ID = "" # 设备ID
GLOBAL_USERNAME = "czyuzongwen" # 用户名
GLOBAL_ACCOUNT_ID = "" # 账号ID
GLOBAL_CURRENT_PROJECT_NAME = "" # 当前测试项目名称
GLOBAL_LINE_NUM = "" # 线路编码
GLOBAL_BREAKPOINT_STATUS_CODES = [0,3] # 要获取的断点状态码列表
GLOBAL_UPLOAD_BREAKPOINT_LIST = []
GLOBAL_UPLOAD_BREAKPOINT_DICT = {}
GLOBAL_UPLOAD_BREAKPOINT_DICT = {}
GLOBAL_TESTED_BREAKPOINT_LIST = [] # 测量结束的断点列表
LINE_TIME_MAPPING_DICT = {} # 存储所有线路编码和对应的时间的全局字典
GLOBAL_BREAKPOINT_DICT = {} # 存储测量结束的断点名称和对应的线路编码的全局字典

View File

@@ -123,8 +123,21 @@ class LoginPage:
accounts = apis.get_accounts_from_server("68ef0e02b0138d25e2ac9918")
matches = [acc for acc in accounts if acc.get("username") == existing_username]
password = None
account_id = False
if matches:
password = matches[0].get("password")
# ✅ 关键:把 account_id 存入全局变量
account_id = matches[0].get("account_id", False)
# 只有 account_id 存在时才存全局
if account_id is not False:
global_variable.GLOBAL_ACCOUNT_ID = account_id
logging.info(f"匹配到账号信息username={existing_username}, account_id={account_id}")
else:
logging.warning(f"账号 {existing_username} 未返回 account_id已设为 False")
password_field.send_keys(password)

View File

@@ -120,8 +120,8 @@ class MeasureTabbarPage:
# 向上滑动
# 记录滑动前的项目,用于判断是否滑动到顶
before_scroll_items = self.get_current_items()
start_y = list_container.location['y'] + list_container.size['height'] * 0.05
end_y = list_container.location['y'] + list_container.size['height'] * 0.95
start_y = list_container.location['y'] + list_container.size['height'] * 0.2
end_y = list_container.location['y'] + list_container.size['height'] * 0.8
# 执行滑动
self.driver.swipe(start_x, start_y, start_x, end_y, 1000)

View File

@@ -192,8 +192,8 @@ class ScreenshotPage:
# 向上滑动
# 记录滑动前的项目,用于判断是否滑动到顶
before_scroll_items = self.get_current_items()
start_y = list_container.location['y'] + list_container.size['height'] * 0.05
end_y = list_container.location['y'] + list_container.size['height'] * 0.95
start_y = list_container.location['y'] + list_container.size['height'] * 0.2
end_y = list_container.location['y'] + list_container.size['height'] * 0.8
self.logger.info("向上滑动列表")
# 执行滑动

View File

@@ -1086,6 +1086,8 @@ class SectionMileageConfigPage:
if not self.click_start_measure_btn():
return False
# if not self.check_station_page.run():
# self.logger.error("检查站页面运行失败")
# return False