90 lines
3.6 KiB
Python
90 lines
3.6 KiB
Python
import json
|
||
import sys
|
||
import time
|
||
import serial
|
||
import os
|
||
|
||
# 添加项目根目录到Python搜索路径
|
||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||
|
||
from globals.driver_utils import DriverManager
|
||
import globals.driver_utils as driver_utils
|
||
import actions
|
||
import check_station
|
||
from check_station import get_excel_from_url
|
||
import globals.create_link as create_link
|
||
|
||
PORT = "COM9"
|
||
BAUD = 115200
|
||
|
||
RESPONSE_PAYLOAD = {
|
||
"1": "xxxx",
|
||
"2": "xxxx",
|
||
"3": "xxxx",
|
||
}
|
||
|
||
|
||
def main() -> int:
|
||
create_link.setup_adb_wireless()
|
||
driver, wait, device_id = DriverManager.get_driver()
|
||
url = f"https://database.yuxindazhineng.com/team-bucket/69378c5b4f42d83d9504560d/前测点表/20260309/CDWZQ-2标-龙家沟左线大桥-0-11号墩-平原.xlsx"
|
||
station_data = get_excel_from_url(url)
|
||
print(station_data)
|
||
station_quantity = len(station_data) #总站点数量
|
||
over_station_num = 0 #已完成的站点数量
|
||
over_station_list = []
|
||
|
||
try:
|
||
with serial.Serial(PORT, BAUD, timeout=1) as ser:
|
||
print(f"Opened {PORT} at {BAUD} baud")
|
||
while True:
|
||
line = ser.readline().decode("utf-8", errors="ignore").strip()
|
||
if line:
|
||
print(f"RX: {line}")
|
||
# 解析接收到的JSON数据
|
||
try:
|
||
data = json.loads(line)
|
||
|
||
# 检查station字段
|
||
if "station" in data:
|
||
station_value = data["station"]
|
||
|
||
# station等于0时启动actions.py
|
||
if station_value == 0:
|
||
print("🚀 站点编号为0,启动 actions.py\n")
|
||
# 启动 actions.py
|
||
app = actions.DeviceAutomation(driver=driver, wait=wait, device_id=device_id)
|
||
app.run_automation()
|
||
# subprocess.Popen(["python", "actions.py"], cwd="d:\\Projects\\cjgc_data")
|
||
|
||
# station大于0时启动check_station.py
|
||
elif station_value > 0 and over_station_num < station_quantity and station_value not in over_station_list:
|
||
print(f"🚀 站点编号为{station_value},启动 check_station.py\n")
|
||
# 启动 check_station.py
|
||
# driver_utils.ensure_appium_server_running()
|
||
app = check_station.CheckStation(driver, wait, device_id)
|
||
station = app.main_run()
|
||
if station:
|
||
over_station_num += 1
|
||
over_station_list.append(station_value)
|
||
# subprocess.Popen(["python", "check_station.py"], cwd="d:\\Projects\\cjgc_data")
|
||
|
||
except json.JSONDecodeError:
|
||
print("Received non-JSON data")
|
||
reply = (
|
||
json.dumps(RESPONSE_PAYLOAD, ensure_ascii=False) + "\n"
|
||
)
|
||
ser.write(reply.encode("utf-8"))
|
||
print(f"TX: {reply.strip()}")
|
||
time.sleep(0.05)
|
||
except serial.SerialException as exc:
|
||
print(f"Serial error: {exc}")
|
||
return 1
|
||
finally:
|
||
if driver:
|
||
DriverManager.quit_driver(device_id) # 安全退出驱动
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|