134 lines
4.2 KiB
Python
134 lines
4.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
DiNi 蓝牙服务 — 命令行发送脚本
|
|
===============================
|
|
通过传入参数请求 FastAPI 服务。
|
|
|
|
用法:
|
|
python send.py --type test # 测试接口
|
|
python send.py --type test --data "measure=false" # 测试 (不触发测量)
|
|
python send.py --type connect --data "COM6" # 连接 COM6
|
|
python send.py --type disconnect # 断开连接
|
|
python send.py --type command --data "?0000" # 发送 DiNi 命令
|
|
python send.py --type command --data "FML" # 触发测量
|
|
python send.py --type command --data "!KnM 5" # 设置参数
|
|
python send.py --type command --data "disconnect" # 断开 (命令方式)
|
|
|
|
python send.py --host 192.168.1.100 --port 58000 # 连接远程服务
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import argparse
|
|
|
|
import requests
|
|
|
|
# 默认服务地址
|
|
DEFAULT_HOST = "127.0.0.1"
|
|
DEFAULT_PORT = 58000
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="DiNi 蓝牙服务 — 命令行发送脚本",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
示例:
|
|
%(prog)s --type test
|
|
%(prog)s --type connect --data COM6
|
|
%(prog)s --type command --data "?0000"
|
|
%(prog)s --type command --data "FML"
|
|
%(prog)s --type disconnect
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"--type", required=True,
|
|
choices=["test", "connect", "disconnect", "command"],
|
|
help="操作类型"
|
|
)
|
|
parser.add_argument(
|
|
"--data", default="",
|
|
help="操作数据 (COM口名 / DiNi命令 / test参数)"
|
|
)
|
|
parser.add_argument(
|
|
"--host", default=DEFAULT_HOST,
|
|
help=f"FastAPI 服务地址 (默认 {DEFAULT_HOST})"
|
|
)
|
|
parser.add_argument(
|
|
"--port", type=int, default=DEFAULT_PORT,
|
|
help=f"FastAPI 服务端口 (默认 {DEFAULT_PORT})"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
base_url = f"http://{args.host}:{args.port}"
|
|
|
|
try:
|
|
# ── test ──
|
|
if args.type == "test":
|
|
# 支持 --data 传入查询参数, 如 "measure=false"
|
|
params = {}
|
|
if args.data:
|
|
for kv in args.data.split("&"):
|
|
if "=" in kv:
|
|
k, v = kv.split("=", 1)
|
|
params[k.strip()] = v.strip()
|
|
resp = requests.get(f"{base_url}/test", params=params, timeout=15)
|
|
|
|
# ── connect ──
|
|
elif args.type == "connect":
|
|
if not args.data:
|
|
print("✗ --type connect 需要 --data 指定 COM 口, 如: --data COM6")
|
|
sys.exit(1)
|
|
# 也支持 --data "COM6 19200" 指定波特率
|
|
parts = args.data.strip().split()
|
|
port = parts[0]
|
|
baudrate = int(parts[1]) if len(parts) > 1 else 9600
|
|
resp = requests.post(
|
|
f"{base_url}/connect",
|
|
json={"port": port, "baudrate": baudrate},
|
|
timeout=10,
|
|
)
|
|
|
|
# ── disconnect ──
|
|
elif args.type == "disconnect":
|
|
resp = requests.post(f"{base_url}/disconnect", timeout=10)
|
|
|
|
# ── command ──
|
|
elif args.type == "command":
|
|
if not args.data:
|
|
print("✗ --type command 需要 --data 指定命令, 如: --data \"?0000\"")
|
|
sys.exit(1)
|
|
resp = requests.post(
|
|
f"{base_url}/command",
|
|
json={"cmd": args.data},
|
|
timeout=30,
|
|
)
|
|
|
|
# ── 输出结果 ──
|
|
print(f"HTTP {resp.status_code} {resp.reason}")
|
|
print("-" * 50)
|
|
|
|
try:
|
|
data = resp.json()
|
|
print(json.dumps(data, indent=2, ensure_ascii=False))
|
|
except json.JSONDecodeError:
|
|
print(resp.text)
|
|
|
|
if not resp.ok:
|
|
sys.exit(1)
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
print(f"✗ 无法连接到服务 {base_url}")
|
|
print(" 请确认服务已启动: python start_service.py")
|
|
sys.exit(1)
|
|
except requests.exceptions.Timeout:
|
|
print(f"✗ 请求超时")
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
print("\n中断")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|