Files
cjgc_data/ck/device_b.py
2026-03-14 17:53:14 +08:00

90 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import sys
import time
import serial
import os
# 添加项目根目录到Python搜索路径
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from globals.driver_utils import DriverManager
import globals.driver_utils as driver_utils
import actions
import check_station
from check_station import get_excel_from_url
import globals.create_link as create_link
PORT = "COM9"
BAUD = 115200
RESPONSE_PAYLOAD = {
"1": "xxxx",
"2": "xxxx",
"3": "xxxx",
}
def main() -> int:
create_link.setup_adb_wireless()
driver, wait, device_id = DriverManager.get_driver()
url = f"https://database.yuxindazhineng.com/team-bucket/69378c5b4f42d83d9504560d/前测点表/20260309/CDWZQ-2标-龙家沟左线大桥-0-11号墩-平原.xlsx"
station_data = get_excel_from_url(url)
print(station_data)
station_quantity = len(station_data) #总站点数量
over_station_num = 0 #已完成的站点数量
over_station_list = []
try:
with serial.Serial(PORT, BAUD, timeout=1) as ser:
print(f"Opened {PORT} at {BAUD} baud")
while True:
line = ser.readline().decode("utf-8", errors="ignore").strip()
if line:
print(f"RX: {line}")
# 解析接收到的JSON数据
try:
data = json.loads(line)
# 检查station字段
if "station" in data:
station_value = data["station"]
# station等于0时启动actions.py
if station_value == 0:
print("🚀 站点编号为0启动 actions.py\n")
# 启动 actions.py
app = actions.DeviceAutomation(driver=driver, wait=wait, device_id=device_id)
app.run_automation()
# subprocess.Popen(["python", "actions.py"], cwd="d:\\Projects\\cjgc_data")
# station大于0时启动check_station.py
elif station_value > 0 and over_station_num < station_quantity and station_value not in over_station_list:
print(f"🚀 站点编号为{station_value},启动 check_station.py\n")
# 启动 check_station.py
# driver_utils.ensure_appium_server_running()
app = check_station.CheckStation(driver, wait, device_id)
station = app.main_run()
if station:
over_station_num += 1
over_station_list.append(station_value)
# subprocess.Popen(["python", "check_station.py"], cwd="d:\\Projects\\cjgc_data")
except json.JSONDecodeError:
print("Received non-JSON data")
reply = (
json.dumps(RESPONSE_PAYLOAD, ensure_ascii=False) + "\n"
)
ser.write(reply.encode("utf-8"))
print(f"TX: {reply.strip()}")
time.sleep(0.05)
except serial.SerialException as exc:
print(f"Serial error: {exc}")
return 1
finally:
if driver:
DriverManager.quit_driver(device_id) # 安全退出驱动
if __name__ == "__main__":
sys.exit(main())