commit a40feb04eed201bcd91ba1b7deb65c56b7aaefd2 Author: lhx Date: Mon Jun 8 15:17:52 2026 +0800 初始化 diff --git a/README.md b/README.md new file mode 100644 index 0000000..574f576 --- /dev/null +++ b/README.md @@ -0,0 +1,206 @@ +# app_v4_分离版 — DiNi 水准仪 & 手机通信 远程控制 + +两个独立 FastAPI 服务,分别管理 PC 端与水准仪(NiNi)、手机(蓝牙模块)的蓝牙连接。 + +## 目录结构 + +``` +app_v4_分离版/ +├── send.py # 共享客户端 — 统一命令行发送脚本 +├── shuizhunyi/ +│ └── main.py # 水准仪服务 — 连接 Trimble-1780 控制 DiNi +├── shouji/ +│ └── main.py # 手机通信服务 — 模拟水准仪向手机APP发测量数据 +└── README.md # 本文件 +``` + +--- + +## 1. 安装依赖 + +```bash +pip install fastapi uvicorn pyserial +``` + +`send.py` 额外依赖 `requests`(通常已自带)。 + +--- + +## 2. 水准仪控制服务 (`shuizhunyi/`) + +通过 Trimble-1780 蓝牙适配器与天宝 DiNi 水准仪通信,发送 DiNi 协议命令。 + +### 启动 + +```bash +cd shuizhunyi +python main.py # 默认 0.0.0.0:58000 +python main.py --port 8080 # 指定 HTTP 端口 +python main.py --dini-port COM6 # 启动时自动连接水准仪 +python main.py --baudrate 19200 # 指定波特率 +``` + +### API 接口 + +| 方法 | 路径 | 说明 | +|------|------|------| +| `GET` | `/` | 服务状态 + COM 口列表 | +| `GET/POST` | `/test` | 测试:未连接→COM列表;已连接→仪器标识 + 可选测量 | +| `POST` | `/connect` | `{"port":"COM6","baudrate":9600}` 连接水准仪 | +| `POST` | `/disconnect` | 断开连接 | +| `POST` | `/command` | `{"cmd":"..."}` 发送 DiNi 命令 | + +### send.py 调用示例 + +```bash +# 从上级目录执行 +send.py --type test --port 58000 +send.py --type connect --data COM6 --port 58000 +send.py --type command --data "?0000" --port 58000 # 读取仪器标识 +send.py --type command --data "?0100" --port 58000 # 读取仪器编号 +send.py --type command --data "?KSND" --port 58000 # 读取提示音状态 +send.py --type command --data "!KnM5" --port 58000 # 设置最大测量次数=5 +send.py --type command --data "FML" --port 58000 # 触发测量 +send.py --type command --data "SEO" --port 58000 # 远程关机 +send.py --type disconnect --port 58000 +``` + +### 支持的命令 + +**读取参数**: `?0000` `?0100` `?KT30` `?Krk` `?KLx` `?KOf` `?KmL` `?KmR` `?KnM` `?KEKR` `?KREF` `?KRAD` `?KFIR` `?KSND` `?KAPO` + +**设置参数**: `!KnM` `!KREF<0|1>` `!KEKR<0|1>` `!KFIR<0|1>` `!KSND<0|1>` `!KAPO<0|1>` `!KRAD<0|1|2>` `!Krk<浮点>` `!KLx<浮点>` `!KOf<浮点>` `!KmL<浮点>` `!KmR<浮点>` + +> **注意**: 设置命令参数名与值之间无需空格,直接拼接,如 `!KnM5`、`!KSND0`。 +> 文档标明 DiNi 设置命令"无需输入 Δ(空格)"。 + +**功能命令**: `FML` 触发测量 | `SEO` 远程关机 + +--- + +## 3. 手机通信服务 (`shouji/`) + +PC 端蓝牙 SPP 作为服务端,等待手机 APP 连接后,模拟 Trimble DiNi 03 水准仪向手机发送 M5 格式测量数据。 + +### 工作原理 + +``` +手机APP ──蓝牙──▶ 蓝牙模块 ──SPP串口──▶ PC (本服务) + ▲ │ + │ ?0100 轮询 │ 测量数据 (M5) + └────────────────────────────────┘ +``` + +- 手机通过蓝牙连接到 PC 的蓝牙模块 +- 手机 APP 周期性发送 `?0100` 轮询水准仪 +- 本服务响应协议握手/控制码,并在收到轮询时发送排队测量数据 +- 手机似乎只接收不回复确认 + +### 启动 + +```bash +cd shouji +python main.py # 默认 0.0.0.0:58100 +python main.py --port 8081 # 指定 HTTP 端口 +python main.py --bt-port COM3 # 启动时自动连接蓝牙 COM 口 +``` + +### API 接口 + +| 方法 | 路径 | 说明 | +|------|------|------| +| `GET` | `/` | 服务状态 + COM 口列表 | +| `GET/POST` | `/test` | 测试:未连接→COM列表;已连接→手机连接状态+统计 | +| `GET` | `/status` | 详细状态:手机连接、待发送数、收发字节数 | +| `POST` | `/connect` | `{"port":"COM3"}` 打开蓝牙 COM 口 | +| `POST` | `/disconnect` | 断开 | +| `POST` | `/command` | `{"cmd":"..."}` 发送命令 | + +### send.py 调用示例 + +```bash +# 从上级目录执行 +send.py --type test --port 58100 +send.py --type connect --data COM3 --port 58100 # 连接蓝牙 COM 口 +send.py --type command --data "send 0.89182 3.323" --port 58100 # 添加测量到队列 +send.py --type command --data "send 1.50000 5.000" --port 58100 # 再添加一条 +send.py --type command --data "force" --port 58100 # 强制发送 (不等轮询) +send.py --type command --data "clear" --port 58100 # 清空队列 +send.py --type disconnect --port 58100 +``` + +### /command 支持的命令 + +| 命令 | 格式 | 说明 | +|------|------|------| +| `send` | `send ` | 添加测量到发送队列 (R=标尺读数, HD=距离) | +| `force` | `force` | 强制立即发送队列中第一条 | +| `clear` | `clear` | 清空待发送队列 | +| `disconnect` | `disconnect` | 断开蓝牙连接 | +| 原始 | `hex:AABBCC` | 发送原始 hex 字节 | + +--- + +## 4. 共享客户端 (`send.py`) + +统一的命令行发送脚本,通过 `--type` / `--port` 参数调用不同服务。 + +### 参数 + +``` +--type {test,connect,disconnect,command} 操作类型 +--data DATA 操作数据 (COM口 / 命令 / 参数) +--host HOST 服务地址 (默认 127.0.0.1) +--port PORT 服务端口 (水准仪 58000 / 手机 58100) +``` + +### 示例 + +```bash +# 水准仪服务 (port 58000) +python send.py --type test --port 58000 +python send.py --type connect --data COM6 --port 58000 +python send.py --type command --data "?0000" --port 58000 +python send.py --type command --data "FML" --port 58000 + +# 手机通信服务 (port 58100) +python send.py --type test --port 58100 +python send.py --type connect --data COM3 --port 58100 +python send.py --type command --data "send 0.89182 3.323" --port 58100 +python send.py --type command --data "force" --port 58100 +``` + +> **提示**: 两个 `send.py` 是同一个文件,通过 `--port` 参数区分调用哪个服务。 + +--- + +## 5. 典型工作流程 + +### 场景: 电脑读取水准仪数据 → 发送到手机 APP + +```bash +# 终端1: 启动水准仪服务 +cd shuizhunyi +python main.py --dini-port COM6 # 端口 58000 + +# 终端2: 启动手机通信服务 +cd shouji +python main.py --bt-port COM3 # 端口 58100 + +# 终端3: 操作 +python send.py --type command --data "FML" --port 58000 # 从水准仪获取测量 +# → 假设返回 staff_reading=0.89182, distance=3.323 + +python send.py --type command --data "send 0.89182 3.323" --port 58100 # 排队发给手机 +# 手机APP发送 ?0100 轮询时自动收到数据 +``` + +--- + +## 6. 注意事项 + +1. **串口独占**: 每个 COM 口只能有一个程序打开,两个服务需使用不同的 COM 口 +2. **蓝牙配对**: 使用前确保 Trimble-1780 / 蓝牙模块已在 Windows 中配对,生成虚拟 COM 口 +3. **波特率**: DiNi 水准仪默认 9600,需与仪器内菜单设置一致 +4. **手机通信方向**: 手机通信服务使用 **incoming** SPP 端口(PC 作为服务端),启动时标 ★ 推荐的即为 incoming 端口 +5. **DiNi 设置命令格式**: 参数名与值之间不加空格(如 `!KSND0` 而非 `!KSND 0`),服务端已自动处理 diff --git a/send.py b/send.py new file mode 100644 index 0000000..6776e35 --- /dev/null +++ b/send.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +""" +DiNi 蓝牙服务 — 命令行发送脚本 +=============================== +通过传入参数请求 FastAPI 服务。 + +用法: + python send.py --type test # 测试接口 + python send.py --type test --data "measure=false" # 测试 (不触发测量) + python send.py --type connect --data "COM6" # 连接 COM6 + python send.py --type disconnect # 断开连接 + python send.py --type command --data "?0000" # 发送 DiNi 命令 + python send.py --type command --data "FML" # 触发测量 + python send.py --type command --data "!KnM 5" # 设置参数 + python send.py --type command --data "disconnect" # 断开 (命令方式) + + python send.py --host 192.168.1.100 --port 58000 # 连接远程服务 +""" + +import sys +import json +import argparse + +import requests + +# 默认服务地址 +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 58000 + + +def main(): + parser = argparse.ArgumentParser( + description="DiNi 蓝牙服务 — 命令行发送脚本", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + %(prog)s --type test + %(prog)s --type connect --data COM6 + %(prog)s --type command --data "?0000" + %(prog)s --type command --data "FML" + %(prog)s --type disconnect + """, + ) + parser.add_argument( + "--type", required=True, + choices=["test", "connect", "disconnect", "command"], + help="操作类型" + ) + parser.add_argument( + "--data", default="", + help="操作数据 (COM口名 / DiNi命令 / test参数)" + ) + parser.add_argument( + "--host", default=DEFAULT_HOST, + help=f"FastAPI 服务地址 (默认 {DEFAULT_HOST})" + ) + parser.add_argument( + "--port", type=int, default=DEFAULT_PORT, + help=f"FastAPI 服务端口 (默认 {DEFAULT_PORT})" + ) + + args = parser.parse_args() + base_url = f"http://{args.host}:{args.port}" + + try: + # ── test ── + if args.type == "test": + # 支持 --data 传入查询参数, 如 "measure=false" + params = {} + if args.data: + for kv in args.data.split("&"): + if "=" in kv: + k, v = kv.split("=", 1) + params[k.strip()] = v.strip() + resp = requests.get(f"{base_url}/test", params=params, timeout=15) + + # ── connect ── + elif args.type == "connect": + if not args.data: + print("✗ --type connect 需要 --data 指定 COM 口, 如: --data COM6") + sys.exit(1) + # 也支持 --data "COM6 19200" 指定波特率 + parts = args.data.strip().split() + port = parts[0] + baudrate = int(parts[1]) if len(parts) > 1 else 9600 + resp = requests.post( + f"{base_url}/connect", + json={"port": port, "baudrate": baudrate}, + timeout=10, + ) + + # ── disconnect ── + elif args.type == "disconnect": + resp = requests.post(f"{base_url}/disconnect", timeout=10) + + # ── command ── + elif args.type == "command": + if not args.data: + print("✗ --type command 需要 --data 指定命令, 如: --data \"?0000\"") + sys.exit(1) + resp = requests.post( + f"{base_url}/command", + json={"cmd": args.data}, + timeout=30, + ) + + # ── 输出结果 ── + print(f"HTTP {resp.status_code} {resp.reason}") + print("-" * 50) + + try: + data = resp.json() + print(json.dumps(data, indent=2, ensure_ascii=False)) + except json.JSONDecodeError: + print(resp.text) + + if not resp.ok: + sys.exit(1) + + except requests.exceptions.ConnectionError: + print(f"✗ 无法连接到服务 {base_url}") + print(" 请确认服务已启动: python start_service.py") + sys.exit(1) + except requests.exceptions.Timeout: + print(f"✗ 请求超时") + sys.exit(1) + except KeyboardInterrupt: + print("\n中断") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/shouji/__pycache__/main.cpython-313.pyc b/shouji/__pycache__/main.cpython-313.pyc new file mode 100644 index 0000000..553380a Binary files /dev/null and b/shouji/__pycache__/main.cpython-313.pyc differ diff --git a/shouji/main.py b/shouji/main.py new file mode 100644 index 0000000..c0ad893 --- /dev/null +++ b/shouji/main.py @@ -0,0 +1,761 @@ +# -*- coding: utf-8 -*- +""" +手机蓝牙通信 — FastAPI 服务 (单文件版) +======================================= +PC端通过蓝牙 SPP 模拟 Trimble DiNi 水准仪,等待手机APP连接。 +当手机发送 ?0100 轮询时,自动将排队的测量数据以 M5 格式发送给手机。 + +用法: + python main.py # 默认 0.0.0.0:58100 + python main.py --port 8080 # 指定 HTTP 端口 + python main.py --bt-port COM3 # 启动时自动连接蓝牙 COM 口 + +API 接口: + GET / 服务信息 + 连接状态 + COM 口列表 + GET /test 测试接口 (未连接→COM列表, 已连接→手机状态) + POST /connect {port} 打开蓝牙 COM 口, 等待手机连接 + POST /disconnect 断开 + POST /command {cmd} 命令: send 发送测量 | disconnect 断开 + GET /status 手机连接状态 + 待发送队列 + +协议说明: + 手机→PC: 0x02F0... 握手 | ?0100 轮询 | 0x050B8D 控制码 | KENC 编码 + PC→手机: 0x02E0... 握手 | M5 测量数据行 + +依赖: pip install fastapi uvicorn pyserial +""" + +import sys +import time +import datetime +import hashlib +import threading +import asyncio +import argparse +from contextlib import asynccontextmanager +from typing import Optional + +import serial +import serial.tools.list_ports +from fastapi import FastAPI, HTTPException, Query +from pydantic import BaseModel + +# ════════════════════════════════════════════════════════════════ +# 常量 +# ════════════════════════════════════════════════════════════════ + +DEFAULT_HTTP_PORT = 58100 +DEFAULT_BAUDRATE = 9600 + +# ── 协议常量 ── +HANDSHAKE_PHONE = bytes([0x02, 0xF0, 0x00, 0x00, 0xDE, 0x03, 0x00, 0x07]) +HANDSHAKE_PC = bytes([0x02, 0xE0, 0x00, 0x00, 0x7F, 0x00, 0x00, 0x07]) +CONTROL_CODE = bytes([0x05, 0x0B, 0x8D]) +QUERY_MARKER = b"?0100" +KENC_ON_MARKER = b"KENC | 1 bit" +KENC_OFF_MARKER = b"KENC | 0 bit" +END_ACK = b"\r\n@" + + +# ════════════════════════════════════════════════════════════════ +# COM 口扫描 +# ════════════════════════════════════════════════════════════════ + +def list_com_ports() -> list[dict]: + """列出所有 COM 口并标注蓝牙类型""" + result = [] + for p in serial.tools.list_ports.comports(): + hwid = (p.hwid or "").upper() + desc_raw = p.description or "" + + is_bt_spp = "00001101" in hwid + is_incoming = is_bt_spp and "000000000000" in hwid + is_outgoing = is_bt_spp and not is_incoming + is_bluetooth = "BTHENUM" in hwid or is_bt_spp or \ + any(kw in desc_raw.lower() for kw in + ["bluetooth", "spp", "rfcomm", "蓝牙"]) + + result.append({ + "device": p.device, + "description": desc_raw, + "hwid": p.hwid or "", + "is_bluetooth": bool(is_bluetooth), + "is_spp_incoming": is_incoming, + "is_spp_outgoing": is_outgoing, + }) + return result + + +def find_bt_spp_ports() -> tuple[list[str], list[str]]: + """返回 (incoming, outgoing) — incoming是手机接入端口, 推荐""" + incoming, outgoing = [], [] + for p in serial.tools.list_ports.comports(): + hwid = (p.hwid or "").upper() + if "BTHENUM" in hwid and "00001101" in hwid: + if "000000000000" in hwid: + incoming.append(p.device) + else: + outgoing.append(p.device) + return incoming, outgoing + + +def print_com_list(): + """打印可连接 COM 口到控制台""" + ports = list_com_ports() + incoming, outgoing = find_bt_spp_ports() + + print("=" * 60) + print(" 可连接 COM 口列表 (手机通信)") + print("-" * 60) + if not ports: + print(" (未检测到任何 COM 口)") + else: + rec_ports = set(incoming) + bt = [p for p in ports if p.get("is_bluetooth")] + other = [p for p in ports if not p.get("is_bluetooth")] + if bt: + print(f"\n 蓝牙端口 ({len(bt)} 个):") + for p in bt: + rec = " ★ 推荐(手机接入)" if p["device"] in rec_ports else "" + print(f" {p['device']:<8} — {p['description']}{rec}") + if other: + print(f"\n 其他 COM 口 ({len(other)} 个):") + for p in other: + print(f" {p['device']:<8} — {p['description']}") + print("=" * 60) + return ports + + +# ════════════════════════════════════════════════════════════════ +# 协议处理器 — 解析手机发来的二进制/文本数据 +# ════════════════════════════════════════════════════════════════ + +class ProtocolHandler: + """处理手机 APP 通过蓝牙模块发来的数据,提取握手/轮询/控制命令""" + + def __init__(self): + self.buf = b"" + + def feed(self, data: bytes) -> tuple[list[bytes], bool]: + """ + 喂入数据 → (待发送响应列表, 是否有 ?0100 轮询) + """ + self.buf += data + responses = [] + has_query = False + + while self.buf: + first = self.buf[:1] + + # ── 二进制握手 (8字节) ── + if first == b"\x02": + if len(self.buf) >= 8 and self.buf[:8] == HANDSHAKE_PHONE: + responses.append(HANDSHAKE_PC) + self.buf = self.buf[8:] + continue + break + + # ── 短控制码 (3字节) ── + elif first == b"\x05": + if len(self.buf) >= 3 and self.buf[:3] == CONTROL_CODE: + responses.append(CONTROL_CODE) + self.buf = self.buf[3:] + continue + break + + # ── ?0100 轮询 ── + elif first in (b"\x3f", b"?"): + idx = self.buf.find(QUERY_MARKER) + if idx >= 0: + if idx > 0: + self.buf = self.buf[idx:] + has_query = True + self.buf = self.buf[len(QUERY_MARKER):] + if self.buf[:2] == b"\r\n": + self.buf = self.buf[2:] + continue + break + + # ── KENC 编码设置 ── + elif KENC_ON_MARKER in self.buf: + idx = self.buf.find(KENC_ON_MARKER) + self.buf = self.buf[idx + len(KENC_ON_MARKER):] + if self.buf[:2] == b"\r\n": + self.buf = self.buf[2:] + continue + + elif KENC_OFF_MARKER in self.buf: + idx = self.buf.find(KENC_OFF_MARKER) + responses.append(END_ACK) + self.buf = self.buf[idx + len(KENC_OFF_MARKER):] + if self.buf[:2] == b"\r\n": + self.buf = self.buf[2:] + continue + + # ── 非打印字节 → 跳过 (RFCOMM Credit 残留) ── + elif self.buf[0] < 0x20: + self.buf = self.buf[1:] + continue + + else: + break + + # 缓冲区保护 + if len(self.buf) > 4096: + self.buf = b"" + + return responses, has_query + + +# ════════════════════════════════════════════════════════════════ +# 测量数据生成 — 模拟 DiNi M5 格式 +# ════════════════════════════════════════════════════════════════ + +class MeasurementBuilder: + """生成 M5 格式测量数据 (模拟 Trimble DiNi 03 输出)""" + + @staticmethod + def build(staff_reading: float, distance: float) -> tuple[bytes, bytes]: + """返回 (line1_bytes, line2_bytes)""" + now = datetime.datetime.now() + ts = now.strftime("%H:%M:%S") + str(now.microsecond // 100000) + r, hd = staff_reading, distance + + h = hashlib.md5( + f"Trimble DiNi 03|{ts}|{r:.5f}|{hd:.3f}|salt".encode() + ).hexdigest() + + line1 = ( + f"For M5|Adr |KD1 {ts} " + f"|R {r:.5f} m " + f"|HD {hd:.3f} m " + f"| @" + ) + line2 = f" | {h}\r\n@" + + return line1.encode("ascii"), line2.encode("ascii") + + +# ════════════════════════════════════════════════════════════════ +# 蓝牙串口 — 后台线程读写 +# ════════════════════════════════════════════════════════════════ + +class BtSerial: + """蓝牙虚拟串口 — 非阻塞读, 后台线程持续收数据""" + + def __init__(self, port: str): + self.port = port + self.ser: Optional[serial.Serial] = None + self.running = False + self._phone_connected = False + self._rx_queue = b"" + self._read_thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + self._log_cb = None + self._last_data_time = 0.0 + self._total_bytes_rx = 0 + self._total_bytes_tx = 0 + + def _log(self, msg: str): + if self._log_cb: + self._log_cb(msg) + + def open(self) -> bool: + try: + self.ser = serial.Serial( + self.port, baudrate=DEFAULT_BAUDRATE, + timeout=0, # 非阻塞 + write_timeout=1.0, + ) + except Exception as e: + self._log(f"✗ 打开 {self.port} 失败: {e}") + return False + + self.running = True + self._log(f"✓ {self.port} 已打开, 等待手机连接...") + + self._read_thread = threading.Thread(target=self._read_loop, daemon=True) + self._read_thread.start() + return True + + def close(self): + self.running = False + if self._read_thread and self._read_thread.is_alive(): + self._read_thread.join(timeout=2) + if self.ser: + try: + self.ser.close() + except Exception: + pass + self.ser = None + self._phone_connected = False + + @property + def phone_connected(self) -> bool: + return self._phone_connected + + @property + def total_rx(self) -> int: + return self._total_bytes_rx + + @property + def total_tx(self) -> int: + return self._total_bytes_tx + + def _read_loop(self): + """后台读线程 — 非阻塞轮询""" + buf = b"" + while self.running: + try: + if not self.ser or not self.ser.is_open: + time.sleep(0.1) + continue + + chunk = self.ser.read(4096) + if chunk: + buf += chunk + self._total_bytes_rx += len(chunk) + self._last_data_time = time.time() + + if not self._phone_connected: + self._phone_connected = True + self._log("*** 手机已连接 ***") + + # 按已知协议标记对齐数据 + while True: + idx = -1 + for marker in [b"\x02", b"\x05", b"?", b"\x3f", + b"KENC", b"!KENC"]: + pos = buf.find(marker) + if pos >= 0 and (idx < 0 or pos < idx): + idx = pos + if idx > 0: + buf = buf[idx:] + elif idx < 0: + if len(buf) > 512: + buf = b"" + break + # 将对齐后的数据放入队列 + with self._lock: + self._rx_queue += buf + buf = b"" + break + + # 超时断开检测 (30秒无数据) + if self._phone_connected and \ + (time.time() - self._last_data_time) > 30: + self._phone_connected = False + self._log("*** 手机已断开 ***") + + time.sleep(0.05) + except OSError: + self._phone_connected = False + time.sleep(0.5) + except Exception: + time.sleep(0.1) + + def read_all(self) -> bytes: + """主线程读取累积数据""" + with self._lock: + if self._rx_queue: + data = self._rx_queue + self._rx_queue = b"" + return data + return b"" + + def write(self, data: bytes) -> bool: + if not self.ser or not self.ser.is_open: + return False + try: + with self._lock: + self.ser.write(data) + self.ser.flush() + self._total_bytes_tx += len(data) + return True + except Exception as e: + self._log(f"发送失败: {e}") + return False + + +# ════════════════════════════════════════════════════════════════ +# 手机通信服务 — 串口 + 协议 + 测量队列 +# ════════════════════════════════════════════════════════════════ + +class MobileService: + """ + 手机通信主服务。 + - 后台线程处理串口读写 + 协议解析 + - 手机发 ?0100 轮询时自动发送排队测量数据 + """ + + def __init__(self): + self.serial: Optional[BtSerial] = None + self.proto = ProtocolHandler() + self._pending: list[tuple[bytes, bytes]] = [] # 待发送测量数据 + self._sent_count = 0 + self._poll_thread: Optional[threading.Thread] = None + self._log_cb = None + + def _log(self, msg: str): + if self._log_cb: + self._log_cb(msg) + + @property + def connected(self) -> bool: + return self.serial is not None and self.serial.running + + @property + def phone_connected(self) -> bool: + return self.serial is not None and self.serial.phone_connected + + @property + def sent_count(self) -> int: + return self._sent_count + + @property + def pending_count(self) -> int: + return len(self._pending) + + # ── 连接 ── + + def connect(self, port: str) -> bool: + if self.serial: + self.disconnect() + self.serial = BtSerial(port) + self.serial._log_cb = self._log + if not self.serial.open(): + self.serial = None + return False + # 启动轮询线程 + self._start_poll() + return True + + def disconnect(self): + self._stop_poll() + if self.serial: + self.serial.close() + self.serial = None + + # ── 后台轮询 ── + + def _start_poll(self): + self._poll_thread = threading.Thread( + target=self._poll_loop, daemon=True) + self._poll_thread.start() + + def _stop_poll(self): + if self._poll_thread and self._poll_thread.is_alive(): + self._poll_thread.join(timeout=2) + self._poll_thread = None + + def _poll_loop(self): + """后台轮询 — 处理收数据 + 自动响应""" + while self.serial and self.serial.running: + try: + data = self.serial.read_all() + if data: + responses, has_query = self.proto.feed(data) + + # 发送协议响应 (握手/控制码) + for rsp in responses: + self.serial.write(rsp) + + # 手机轮询 (±0100) + 有待发送数据 → 发送 + if has_query and self._pending: + self._send_data() + + time.sleep(0.05) + except Exception: + time.sleep(0.1) + + # ── 数据发送 ── + + def queue_measurement(self, staff_reading: float, distance: float): + """将测量数据加入发送队列""" + line1, line2 = MeasurementBuilder.build(staff_reading, distance) + self._pending.append((line1, line2)) + self._log(f"+ 队列: R={staff_reading:.5f}m HD={distance:.3f}m " + f"(共{len(self._pending)}条)") + + def _send_data(self): + """发送队列中第一条 (由轮询触发)""" + if not self._pending or not self.serial: + return + line1, line2 = self._pending.pop(0) + + self.serial.write(line1) + time.sleep(0.03) # 两帧间隔 (模拟水准仪) + self.serial.write(line2) + + self._sent_count += 1 + self._log(f"↑ 发送 #{self._sent_count}") + + def force_send(self, ignore_phone: bool = False) -> bool: + """强制发送 (不等轮询)""" + if not self.serial or not self.serial.running: + self._log("⚠ 串口未打开") + return False + if not self.serial.phone_connected and not ignore_phone: + self._log("⚠ 手机未连接") + return False + if not self._pending: + self._log("⚠ 无待发送数据") + return False + self._send_data() + return True + + def clear_queue(self): + self._pending.clear() + + +# ════════════════════════════════════════════════════════════════ +# FastAPI — 全局状态 +# ════════════════════════════════════════════════════════════════ + +svc = MobileService() +svc_lock = threading.Lock() + + +class ConnectRequest(BaseModel): + port: str + + +class CommandRequest(BaseModel): + cmd: str + + +# ════════════════════════════════════════════════════════════════ + +@asynccontextmanager +async def lifespan(app: FastAPI): + print("[mobile] 手机蓝牙通信服务启动") + yield + with svc_lock: + svc.disconnect() + print("[mobile] 手机蓝牙通信服务已停止") + + +app = FastAPI( + title="DiNi Mobile Bluetooth Service", + description="PC蓝牙SPP服务端 — 模拟水准仪, 向手机APP发送M5测量数据", + version="4.0", + lifespan=lifespan, +) + + +# ── GET / ── + +@app.get("/") +async def root(): + return { + "service": "DiNi Mobile Bluetooth Service v4.0", + "connected": svc.connected, + "port": svc.serial.port if svc.serial else None, + "phone_connected": svc.phone_connected, + "com_ports": list_com_ports() if not svc.connected else None, + } + + +# ── GET /status ── + +@app.get("/status") +async def status(): + info = { + "connected": svc.connected, + "phone_connected": svc.phone_connected, + "pending_count": svc.pending_count, + "sent_count": svc.sent_count, + } + if svc.serial: + info["port"] = svc.serial.port + info["total_rx"] = svc.serial.total_rx + info["total_tx"] = svc.serial.total_tx + return info + + +# ── GET/POST /test ── + +@app.api_route("/test", methods=["GET", "POST"]) +async def test(): + if not svc.connected: + return { + "connected": False, + "message": "未连接 — 返回可连接的 COM 口列表", + "com_ports": list_com_ports(), + } + return { + "connected": True, + "port": svc.serial.port, + "phone_connected": svc.phone_connected, + "pending_count": svc.pending_count, + "sent_count": svc.sent_count, + "total_rx": svc.serial.total_rx, + "total_tx": svc.serial.total_tx, + } + + +# ── POST /connect ── + +@app.post("/connect") +async def connect(req: ConnectRequest): + def _do(): + with svc_lock: + ok = svc.connect(req.port) + if not ok: + raise HTTPException( + status_code=500, + detail=f"无法打开 {req.port},请检查 COM 口是否存在或已被占用" + ) + return {"status": "connected", "port": req.port} + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _do) + + +# ── POST /disconnect ── + +@app.post("/disconnect") +async def disconnect(): + def _do(): + with svc_lock: + svc.disconnect() + return {"status": "disconnected"} + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _do) + + +# ── POST /command ── + +@app.post("/command") +async def send_command(req: CommandRequest): + """ + 发送命令。 + + 支持的命令: + send → 添加测量到队列 如: "send 0.89182 3.323" + force → 强制发送队列中数据 (不等轮询) + clear → 清空待发送队列 + disconnect → 断开蓝牙连接 + """ + cmd = req.cmd.strip() + parts = cmd.split() + action = parts[0].lower() if parts else "" + + if action == "disconnect": + def _d(): + with svc_lock: + svc.disconnect() + return {"status": "disconnected", "command": "disconnect"} + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _d) + + if action == "send": + # 格式: send + if len(parts) < 3: + raise HTTPException( + status_code=400, + detail="格式: send <标尺读数> <距离> 如: send 0.89182 3.323" + ) + try: + r = float(parts[1]) + hd = float(parts[2]) + except ValueError: + raise HTTPException(status_code=400, detail="参数必须是数字") + + def _queue(): + with svc_lock: + if not svc.connected: + raise HTTPException(status_code=400, + detail="未连接, 请先 POST /connect") + svc.queue_measurement(r, hd) + return { + "command": "send", + "staff_reading": r, + "distance": hd, + "pending_count": svc.pending_count, + } + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _queue) + + if action == "force": + def _force(): + with svc_lock: + ok = svc.force_send(ignore_phone=False) + return { + "command": "force", + "status": "ok" if ok else "skipped", + "pending_count": svc.pending_count, + } + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _force) + + if action == "clear": + with svc_lock: + svc.clear_queue() + return {"command": "clear", "pending_count": svc.pending_count} + + # ── 原始数据发送 ── + if not svc.connected: + raise HTTPException(status_code=400, + detail="未连接, 请先 POST /connect") + + def _raw(): + # 支持 hex: "hex:AABBCC" 或纯文本直接发送 + if cmd.startswith("hex:"): + raw = bytes.fromhex(cmd[4:].replace(" ", "")) + else: + raw = cmd.encode("ascii") + ok = svc.serial.write(raw) + return { + "command": "raw", + "status": "ok" if ok else "error", + "bytes_sent": len(raw) if ok else 0, + } + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, _raw) + + +# ════════════════════════════════════════════════════════════════ +# 启动入口 +# ════════════════════════════════════════════════════════════════ + +def main(): + parser = argparse.ArgumentParser( + description="手机蓝牙通信 — FastAPI 服务 (模拟 DiNi 水准仪)") + parser.add_argument("--host", default="0.0.0.0", help="监听地址 (默认 0.0.0.0)") + parser.add_argument("--port", type=int, default=DEFAULT_HTTP_PORT, + help=f"HTTP 服务端口 (默认 {DEFAULT_HTTP_PORT})") + parser.add_argument("--bt-port", default=None, + help="启动时自动连接的蓝牙 COM 口 (如 COM3)") + args = parser.parse_args() + + # ── 打印可连接 COM 口 ── + print_com_list() + + # ── 可选: 启动时自动连接 ── + if args.bt_port: + print(f"\n启动时自动连接 → {args.bt_port} ...") + try: + ok = svc.connect(args.bt_port) + if ok: + print(f"✓ 启动时已自动连接 {args.bt_port}") + else: + print(f"✗ 自动连接 {args.bt_port} 失败, 服务仍正常启动") + except Exception as e: + print(f"✗ 自动连接失败: {e}") + + # ── 启动 uvicorn ── + import uvicorn + print(f"\n▶ 手机蓝牙通信服务启动中...") + print(f" HTTP 地址: http://{args.host}:{args.port}") + print(f" 测试接口: http://{args.host}:{args.port}/test") + print(f" 命令接口: http://{args.host}:{args.port}/command") + print(f" API 文档: http://{args.host}:{args.port}/docs") + print(f" 按 Ctrl+C 停止服务\n") + uvicorn.run(app, host=args.host, port=args.port, workers=1, log_level="info") + + +if __name__ == "__main__": + main() diff --git a/shuizhunyi/main.py b/shuizhunyi/main.py new file mode 100644 index 0000000..16df3af --- /dev/null +++ b/shuizhunyi/main.py @@ -0,0 +1,617 @@ +# -*- coding: utf-8 -*- +""" +DiNi 水准仪 — 蓝牙连接 FastAPI 服务 (单文件版) +=============================================== +启动即列出可连接 COM 口并运行 HTTP 服务,通过 API 维护持久蓝牙连接。 + +用法: + python main.py # 默认 0.0.0.0:58000 + python main.py --port 8080 # 指定 HTTP 端口 + python main.py --dini-port COM6 # 启动时自动连接 COM6 + python main.py --baudrate 19200 # 指定波特率 + +API 接口: + GET / 服务信息 + 当前连接状态 + GET /test?measure=true 测试接口 (未连接→COM列表, 已连接→标识+测量) + POST /connect {port,baudrate} 连接 COM 口 + POST /disconnect 断开 + POST /command {cmd} 发送 DiNi 命令 (disconnect 断开) + +客户端调用: + python send.py --type test + python send.py --type connect --data COM6 + python send.py --type command --data "?0000" + python send.py --type command --data "FML" + python send.py --type disconnect + +依赖: pip install fastapi uvicorn pyserial +""" + +import sys +import time +import re +import asyncio +import threading +import argparse +from contextlib import asynccontextmanager +from typing import Optional, Any + +import serial +import serial.tools.list_ports +from fastapi import FastAPI, HTTPException, Query +from pydantic import BaseModel + +# ════════════════════════════════════════════════════════════════ +# 常量 +# ════════════════════════════════════════════════════════════════ + +DEFAULT_BAUDRATE = 9600 +DEFAULT_TIMEOUT = 3.0 +DEFAULT_HTTP_PORT = 58000 + +BT_SPP_KEYWORDS = [ + "bluetooth", "serial over bluetooth", "spp", + "rfcomm", "serial port", "standard serial", "蓝牙", +] + + +# ════════════════════════════════════════════════════════════════ +# DiNi 命令定义 +# ════════════════════════════════════════════════════════════════ + +class DiniCommands: + READ = { + "0000": "仪器标识和软件版本", + "0100": "仪器编号 (IM)", + "Kc ": "视准误差 (i角, DMS)", + "KT30": "30cm 测试状态", + "Krk": "折光系数", + "KLx": "标尺常数", + "KOf": "标尺偏移量", + "KmL": "重复测量最大标准差", + "KmR": "重复测量最大标准差(右)", + "KnM": "最大重复测量次数", + "KEKR": "地球曲率改正", + "KREF": "折光改正", + "KRAD": "记录附加信息", + "KFIR": "倒尺测量", + "KSND": "提示音", + "KAPO": "自动关机", + } + SET = { + "KnM": ("最大重复测量次数", "int"), + "KREF": ("折光改正", "bool"), + "KEKR": ("地球曲率改正", "bool"), + "KFIR": ("倒尺测量", "bool"), + "KSND": ("提示音", "bool"), + "KAPO": ("自动关机", "bool"), + "KRAD": ("记录附加信息", "int"), + "Krk": ("折光系数", "float"), + "KLx": ("标尺常数", "float"), + "KOf": ("标尺偏移量", "float"), + "KmL": ("重复测量最大标准差", "float"), + "KmR": ("重复测量最大标准差(右)", "float"), + } + MODELS = { + "701520": "DiNi (精度 0.3mm/km)", + "701510": "DiNi (精度 0.7mm/km)", + } + ERRORS = { + "E": "命令语法错误", + "E202": "补偿器超出范围 — 请重新整平仪器", + "E320": "运行时错误 — 请重新测量", + "E321": "亮度变化过大 — 请重新测量", + "E323": "无法读取标尺 — 检查环境条件", + "E327": "标尺截取段太小", + } + + +# ════════════════════════════════════════════════════════════════ +# 响应解析 +# ════════════════════════════════════════════════════════════════ + +def parse_dini_response(raw: bytes) -> dict: + result = { + "type": "unknown", "param": None, "raw_text": None, + "raw_hex": raw.hex(" ").upper() if raw else "", + "values": [], "error_code": None, "error_desc": None, + } + if not raw: + result["type"] = "empty" + return result + try: + text = raw.decode("ascii", errors="replace").strip() + except Exception: + text = "" + result["raw_text"] = text + if not text: + result["type"] = "empty" + return result + if re.match(r'^Q\s*$', text): + result["type"] = "ok_ack"; result["values"] = ["Q"]; return result + if re.match(r'^E\d{0,3}$', text): + result["type"] = "error"; result["error_code"] = text + result["error_desc"] = DiniCommands.ERRORS.get(text, f"未知错误: {text}") + return result + if text.startswith("!"): + result["type"] = "param_response" + rest = text[1:].strip() + if "|" in rest: + left, right = rest.split("|", 1) + result["param"] = left.strip() + result["values"] = right.strip().split() + else: + parts = rest.split() + if parts: + result["param"] = parts[0] + result["values"] = parts[1:] + return result + numbers = re.findall(r"[-+]?\d*\.?\d+", text) + if numbers: + result["type"] = "measurement"; result["values"] = numbers; return result + result["type"] = "text"; result["values"] = [text] + return result + + +# ════════════════════════════════════════════════════════════════ +# COM 口扫描 +# ════════════════════════════════════════════════════════════════ + +def list_com_ports() -> list[dict]: + """列出所有 COM 口,标注蓝牙 SPP""" + result = [] + for port in serial.tools.list_ports.comports(): + desc = (port.description or "").lower() + hwid = port.hwid or "" + is_bt = any(kw in desc or kw in hwid.lower() for kw in BT_SPP_KEYWORDS) + result.append({ + "device": port.device, + "description": port.description or "", + "hwid": hwid, + "vid": port.vid, "pid": port.pid, + "is_bluetooth": bool(is_bt), + }) + return result + + +def find_bluetooth_ports() -> tuple: + """返回 (incoming_ports, outgoing_ports)""" + incoming, outgoing = [], [] + for p in serial.tools.list_ports.comports(): + hwid = (p.hwid or "").upper() + if "BTHENUM" in hwid and "00001101" in hwid: + if "000000000000" in hwid: + incoming.append(p.device) + else: + outgoing.append(p.device) + return incoming, outgoing + + +def list_connectable_ports() -> list[dict]: + """可连接 COM 口列表,标记推荐 (outgoing SPP)""" + _, outgoing = find_bluetooth_ports() + ports = list_com_ports() + for p in ports: + p["recommended"] = p["device"] in outgoing + return ports + + +def print_com_list(): + """打印可连接 COM 口到控制台""" + ports = list_connectable_ports() + print("=" * 60) + print(" 可连接 COM 口列表") + print("-" * 60) + if not ports: + print(" (未检测到任何 COM 口)") + else: + bt = [p for p in ports if p.get("is_bluetooth")] + other = [p for p in ports if not p.get("is_bluetooth")] + if bt: + print(f"\n 蓝牙 SPP 端口 ({len(bt)} 个):") + for p in bt: + rec = " ★ 推荐" if p.get("recommended") else "" + print(f" {p['device']:<8} — {p['description']}{rec}") + if other: + print(f"\n 其他 COM 口 ({len(other)} 个):") + for p in other: + print(f" {p['device']:<8} — {p['description']}") + print("=" * 60) + return ports + + +# ════════════════════════════════════════════════════════════════ +# DiNi 客户端 +# ════════════════════════════════════════════════════════════════ + +class DiniClient: + """通过蓝牙 Trimble-1780 SPP 虚拟串口与 DiNi 水准仪通信""" + + def __init__(self, port: str, baudrate: int = DEFAULT_BAUDRATE, + timeout: float = DEFAULT_TIMEOUT): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.ser: Optional[serial.Serial] = None + self._log_cb = None + + def _log(self, msg: str): + if self._log_cb: + self._log_cb(msg) + + # ── 连接 ── + + def connect(self) -> bool: + try: + self.ser = serial.Serial( + port=self.port, baudrate=self.baudrate, + bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, timeout=self.timeout, + write_timeout=self.timeout, + ) + self._log(f"✓ DiNi 已连接 {self.port} @ {self.baudrate}") + return True + except serial.SerialException as e: + self._log(f"✗ 无法打开 {self.port}: {e}") + return False + + def disconnect(self): + if self.ser and self.ser.is_open: + self.ser.close() + self._log(f"✓ DiNi 已断开 {self.port}") + self.ser = None + + def is_connected(self) -> bool: + return self.ser is not None and self.ser.is_open + + # ── 通信 ── + + def send_command(self, cmd: str, measure_mode: bool = False) -> dict: + if not self.is_connected(): + return {"status": "disconnected", "type": "empty", + "error_desc": "未连接", "raw_hex": "", "raw_text": None, + "values": []} + + cmd_ascii = cmd.strip().rstrip(".") + if not cmd_ascii.endswith("\r\n"): + cmd_ascii += "\r\n" + tx_bytes = cmd_ascii.encode("ascii") + self._log(f"TX: {tx_bytes!r}") + + self.ser.reset_input_buffer() + self.ser.write(tx_bytes) + self.ser.flush() + + time.sleep(2.0 if measure_mode else 0.25) + + raw = self._read_response() + if raw: + self._log(f"RX ({len(raw)}B): {raw.hex(' ').upper()}") + else: + self._log("RX: (超时无数据)") + + result = parse_dini_response(raw) + result["status"] = { + "param_response": "ok", "measurement": "ok", "text": "ok", + "ok_ack": "ok", "error": "error", "empty": "timeout", + }.get(result["type"], "ok") + result["tx"] = repr(tx_bytes) + return result + + def _read_response(self) -> bytes: + response = bytearray() + deadline = time.time() + self.timeout + last_data_time = time.time() + got_data = False + while time.time() < deadline: + w = self.ser.in_waiting + if w > 0: + chunk = self.ser.read(w) + response.extend(chunk) + last_data_time = time.time() + got_data = True + if b"\n" in chunk or b"\r" in chunk: + time.sleep(0.08) + extra = self.ser.read(self.ser.in_waiting) + if extra: + response.extend(extra) + break + else: + if got_data and (time.time() - last_data_time) > 0.4: + break + time.sleep(0.05) + return bytes(response) + + # ── 高层 API ── + + def read_param(self, param: str) -> dict: + result = self.send_command(f"?{param}") + result["param_requested"] = param + if result["type"] == "param_response" and result["param"] is None: + result["param"] = param + return result + + def set_param(self, param: str, value: Any) -> dict: + info = DiniCommands.SET.get(param) + if info: + _, vtype = info + s = str(value) + if vtype == "bool": + val = "1" if s in ("1", "true", "True", "on") else "0" + elif vtype == "float": + val = f"{float(value):.5f}" + elif vtype == "int": + val = str(int(value)) + else: + val = s + else: + val = str(value) + result = self.send_command(f"!{param}{val}") + result["param_requested"] = param + result["value_sent"] = val + return result + + def measure(self) -> dict: + result = self.send_command("FML", measure_mode=True) + if result["type"] == "measurement" and result["values"]: + vals = result["values"] + if len(vals) >= 1: result["staff_reading"] = float(vals[0]) + if len(vals) >= 2: result["distance"] = float(vals[1]) + if len(vals) >= 3: result["height_diff"] = float(vals[2]) + if len(vals) >= 4: result["std_dev"] = float(vals[3]) + return result + + def identify(self) -> dict: + result = self.read_param("0000") + info = {"ok": False, "model": "", "model_name": "", "sw_version": "", + "raw_text": result.get("raw_text")} + if result["status"] == "ok" and result["values"]: + vals = result["values"] + model = vals[0] if vals else "" + info["ok"] = True + info["model"] = model + info["model_name"] = DiniCommands.MODELS.get(model, f"型号 {model}") + info["sw_version"] = vals[1] if len(vals) > 1 else "" + else: + info["error_desc"] = result.get("error_desc") or result.get("status") + return info + + def shutdown(self) -> dict: + return self.send_command("SEO") + + +# ════════════════════════════════════════════════════════════════ +# FastAPI — 全局状态 +# ════════════════════════════════════════════════════════════════ + +dini: Optional[DiniClient] = None +dini_lock = threading.Lock() + + +class ConnectRequest(BaseModel): + port: str + baudrate: int = DEFAULT_BAUDRATE + + +class CommandRequest(BaseModel): + cmd: str + + +# ════════════════════════════════════════════════════════════════ +# 串口操作 (在线程池中执行, 带锁) +# ════════════════════════════════════════════════════════════════ + +async def _run_serial(func, *args, **kwargs): + loop = asyncio.get_running_loop() + def _locked(): + with dini_lock: + return func(*args, **kwargs) + return await loop.run_in_executor(None, _locked) + + +def _sync_connect(req: ConnectRequest) -> dict: + global dini + if dini and dini.is_connected(): + dini.disconnect() + client = DiniClient(port=req.port, baudrate=req.baudrate) + if not client.connect(): + dini = None + raise HTTPException(status_code=500, + detail=f"无法连接到 {req.port},请检查 COM 口是否存在或已被占用") + dini = client + return {"status": "connected", "port": client.port, "baudrate": client.baudrate} + + +def _sync_disconnect() -> dict: + global dini + if dini and dini.is_connected(): + port = dini.port + dini.disconnect() + dini = None + return {"status": "disconnected", "port": port} + dini = None + return {"status": "not_connected"} + + +def _sync_test(client: DiniClient, do_measure: bool) -> dict: + result = {"connected": True, "port": client.port, "baudrate": client.baudrate} + # 1. 仪器标识 + ident = client.identify() + result["instrument_info"] = { + "ok": ident.get("ok"), + "model": ident.get("model"), + "model_name": ident.get("model_name"), + "software_version": ident.get("sw_version"), + } + if not ident.get("ok"): + result["instrument_info"]["error"] = ident.get("error_desc", "无响应") + # 2. 测量 + if do_measure: + m = client.measure() + result["measurement"] = { + "status": m.get("status"), + "staff_reading": m.get("staff_reading"), + "distance": m.get("distance"), + "height_diff": m.get("height_diff"), + "std_dev": m.get("std_dev"), + "raw_text": m.get("raw_text"), + } + if m.get("status") == "error": + result["measurement"]["error"] = m.get("error_desc") + return result + + +def _sync_send_command(client: DiniClient, cmd: str) -> dict: + c = cmd.strip() + cu = c.upper().rstrip(".") + if cu == "FML": return client.measure() + if cu == "SEO": return client.shutdown() + if c.startswith("?"): return client.read_param(c[1:].rstrip(".")) + if c.startswith("!"): + parts = c[1:].rstrip(".").split(maxsplit=1) + return client.set_param(parts[0], parts[1] if len(parts) > 1 else "") + return client.send_command(c) + + +def _fmt_result(cmd: str, r: dict) -> dict: + return { + "command": cmd, "status": r.get("status"), "type": r.get("type"), + "param": r.get("param"), "param_requested": r.get("param_requested"), + "values": r.get("values"), "raw_text": r.get("raw_text"), + "raw_hex": r.get("raw_hex"), "error_code": r.get("error_code"), + "error_desc": r.get("error_desc"), + "staff_reading": r.get("staff_reading"), + "distance": r.get("distance"), + "height_diff": r.get("height_diff"), + "std_dev": r.get("std_dev"), "tx": r.get("tx"), + } + + +# ════════════════════════════════════════════════════════════════ +# FastAPI 应用 +# ════════════════════════════════════════════════════════════════ + +@asynccontextmanager +async def lifespan(app: FastAPI): + print("[bt_service] DiNi 蓝牙服务启动") + yield + global dini + with dini_lock: + if dini and dini.is_connected(): + dini.disconnect() + print("[bt_service] DiNi 蓝牙服务已停止") + + +app = FastAPI( + title="DiNi Bluetooth Service", + description="通过 Trimble-1780 蓝牙适配器远程控制天宝 DiNi 水准仪", + version="4.0", + lifespan=lifespan, +) + +# ── GET / ── + +@app.get("/") +async def root(): + client = dini + connected = client is not None and client.is_connected() + info = {"service": "DiNi Bluetooth Service v4.0", "connected": connected} + if connected: + info["port"] = client.port + info["baudrate"] = client.baudrate + else: + info["com_ports"] = list_connectable_ports() + return info + +# ── POST /connect ── + +@app.post("/connect") +async def connect(req: ConnectRequest): + return await _run_serial(_sync_connect, req) + +# ── POST /disconnect ── + +@app.post("/disconnect") +async def disconnect(): + return await _run_serial(_sync_disconnect) + +# ── GET/POST /test ── + +@app.api_route("/test", methods=["GET", "POST"]) +async def test( + measure: bool = Query(default=True, + description="已连接时是否同时触发测量 (FML)") +): + client = dini + connected = client is not None and client.is_connected() + if not connected: + return { + "connected": False, + "message": "未连接 — 返回可连接的 COM 口列表", + "com_ports": list_connectable_ports(), + } + return await _run_serial(_sync_test, client, measure) + +# ── POST /command ── + +@app.post("/command") +async def send_command(req: CommandRequest): + cmd = req.cmd.strip() + if cmd.lower() == "disconnect": + return await _run_serial(_sync_disconnect) + + client = dini + if client is None or not client.is_connected(): + raise HTTPException(status_code=400, + detail="未连接到仪器,请先调用 POST /connect") + + result = await _run_serial(_sync_send_command, client, cmd) + return _fmt_result(cmd, result) + + +# ════════════════════════════════════════════════════════════════ +# 启动入口 +# ════════════════════════════════════════════════════════════════ + +def main(): + parser = argparse.ArgumentParser( + description="DiNi 水准仪 — 蓝牙连接 FastAPI 服务") + parser.add_argument("--host", default="0.0.0.0", help="监听地址 (默认 0.0.0.0)") + parser.add_argument("--port", type=int, default=DEFAULT_HTTP_PORT, + help=f"HTTP 服务端口 (默认 {DEFAULT_HTTP_PORT})") + parser.add_argument("--dini-port", default=None, + help="启动时自动连接的 COM 口 (如 COM6)") + parser.add_argument("--baudrate", type=int, default=DEFAULT_BAUDRATE, + help=f"DiNi 波特率 (默认 {DEFAULT_BAUDRATE})") + args = parser.parse_args() + + # ── 打印可连接 COM 口 ── + print_com_list() + + # ── 可选: 启动时自动连接 ── + if args.dini_port: + print(f"\n启动时自动连接 → {args.dini_port} ...") + try: + global dini + client = DiniClient(port=args.dini_port, baudrate=args.baudrate) + if client.connect(): + dini = client + print(f"✓ 启动时已自动连接 {args.dini_port}") + else: + print(f"✗ 自动连接 {args.dini_port} 失败, 服务仍正常启动") + except Exception as e: + print(f"✗ 自动连接失败: {e}") + + # ── 启动 uvicorn ── + import uvicorn + print(f"\n▶ DiNi 蓝牙服务启动中...") + print(f" HTTP 地址: http://{args.host}:{args.port}") + print(f" 测试接口: http://{args.host}:{args.port}/test") + print(f" 命令接口: http://{args.host}:{args.port}/command") + print(f" API 文档: http://{args.host}:{args.port}/docs") + print(f" 按 Ctrl+C 停止服务\n") + uvicorn.run(app, host=args.host, port=args.port, workers=1, log_level="info") + + +if __name__ == "__main__": + main()