# -*- coding: utf-8 -*- """ DiNi 蓝牙服务 — 命令行发送脚本 =============================== 通过传入参数请求 FastAPI 服务。 用法: python send.py --type test # 测试接口 python send.py --type test --data "measure=false" # 测试 (不触发测量) python send.py --type connect --data "COM6" # 连接 COM6 python send.py --type disconnect # 断开连接 python send.py --type command --data "?0000" # 发送 DiNi 命令 python send.py --type command --data "FML" # 触发测量 python send.py --type command --data "!KnM 5" # 设置参数 python send.py --type command --data "disconnect" # 断开 (命令方式) python send.py --host 192.168.1.100 --port 58000 # 连接远程服务 """ import sys import json import argparse import requests # 默认服务地址 DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 58000 def main(): parser = argparse.ArgumentParser( description="DiNi 蓝牙服务 — 命令行发送脚本", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: %(prog)s --type test %(prog)s --type connect --data COM6 %(prog)s --type command --data "?0000" %(prog)s --type command --data "FML" %(prog)s --type disconnect """, ) parser.add_argument( "--type", required=True, choices=["test", "connect", "disconnect", "command"], help="操作类型" ) parser.add_argument( "--data", default="", help="操作数据 (COM口名 / DiNi命令 / test参数)" ) parser.add_argument( "--host", default=DEFAULT_HOST, help=f"FastAPI 服务地址 (默认 {DEFAULT_HOST})" ) parser.add_argument( "--port", type=int, default=DEFAULT_PORT, help=f"FastAPI 服务端口 (默认 {DEFAULT_PORT})" ) args = parser.parse_args() base_url = f"http://{args.host}:{args.port}" try: # ── test ── if args.type == "test": # 支持 --data 传入查询参数, 如 "measure=false" params = {} if args.data: for kv in args.data.split("&"): if "=" in kv: k, v = kv.split("=", 1) params[k.strip()] = v.strip() resp = requests.get(f"{base_url}/test", params=params, timeout=15) # ── connect ── elif args.type == "connect": if not args.data: print("✗ --type connect 需要 --data 指定 COM 口, 如: --data COM6") sys.exit(1) # 也支持 --data "COM6 19200" 指定波特率 parts = args.data.strip().split() port = parts[0] baudrate = int(parts[1]) if len(parts) > 1 else 9600 resp = requests.post( f"{base_url}/connect", json={"port": port, "baudrate": baudrate}, timeout=10, ) # ── disconnect ── elif args.type == "disconnect": resp = requests.post(f"{base_url}/disconnect", timeout=10) # ── command ── elif args.type == "command": if not args.data: print("✗ --type command 需要 --data 指定命令, 如: --data \"?0000\"") sys.exit(1) resp = requests.post( f"{base_url}/command", json={"cmd": args.data}, timeout=30, ) # ── 输出结果 ── print(f"HTTP {resp.status_code} {resp.reason}") print("-" * 50) try: data = resp.json() print(json.dumps(data, indent=2, ensure_ascii=False)) except json.JSONDecodeError: print(resp.text) if not resp.ok: sys.exit(1) except requests.exceptions.ConnectionError: print(f"✗ 无法连接到服务 {base_url}") print(" 请确认服务已启动: python start_service.py") sys.exit(1) except requests.exceptions.Timeout: print(f"✗ 请求超时") sys.exit(1) except KeyboardInterrupt: print("\n中断") sys.exit(1) if __name__ == "__main__": main()