初始化

This commit is contained in:
lhx
2026-06-08 15:17:52 +08:00
commit a40feb04ee
5 changed files with 1717 additions and 0 deletions

206
README.md Normal file
View File

@@ -0,0 +1,206 @@
# app_v4_分离版 — DiNi 水准仪 & 手机通信 远程控制
两个独立 FastAPI 服务,分别管理 PC 端与水准仪(NiNi)、手机(蓝牙模块)的蓝牙连接。
## 目录结构
```
app_v4_分离版/
├── send.py # 共享客户端 — 统一命令行发送脚本
├── shuizhunyi/
│ └── main.py # 水准仪服务 — 连接 Trimble-1780 控制 DiNi
├── shouji/
│ └── main.py # 手机通信服务 — 模拟水准仪向手机APP发测量数据
└── README.md # 本文件
```
---
## 1. 安装依赖
```bash
pip install fastapi uvicorn pyserial
```
`send.py` 额外依赖 `requests`(通常已自带)。
---
## 2. 水准仪控制服务 (`shuizhunyi/`)
通过 Trimble-1780 蓝牙适配器与天宝 DiNi 水准仪通信,发送 DiNi 协议命令。
### 启动
```bash
cd shuizhunyi
python main.py # 默认 0.0.0.0:58000
python main.py --port 8080 # 指定 HTTP 端口
python main.py --dini-port COM6 # 启动时自动连接水准仪
python main.py --baudrate 19200 # 指定波特率
```
### API 接口
| 方法 | 路径 | 说明 |
|------|------|------|
| `GET` | `/` | 服务状态 + COM 口列表 |
| `GET/POST` | `/test` | 测试未连接→COM列表已连接→仪器标识 + 可选测量 |
| `POST` | `/connect` | `{"port":"COM6","baudrate":9600}` 连接水准仪 |
| `POST` | `/disconnect` | 断开连接 |
| `POST` | `/command` | `{"cmd":"..."}` 发送 DiNi 命令 |
### send.py 调用示例
```bash
# 从上级目录执行
send.py --type test --port 58000
send.py --type connect --data COM6 --port 58000
send.py --type command --data "?0000" --port 58000 # 读取仪器标识
send.py --type command --data "?0100" --port 58000 # 读取仪器编号
send.py --type command --data "?KSND" --port 58000 # 读取提示音状态
send.py --type command --data "!KnM5" --port 58000 # 设置最大测量次数=5
send.py --type command --data "FML" --port 58000 # 触发测量
send.py --type command --data "SEO" --port 58000 # 远程关机
send.py --type disconnect --port 58000
```
### 支持的命令
**读取参数**: `?0000` `?0100` `?KT30` `?Krk` `?KLx` `?KOf` `?KmL` `?KmR` `?KnM` `?KEKR` `?KREF` `?KRAD` `?KFIR` `?KSND` `?KAPO`
**设置参数**: `!KnM<N>` `!KREF<0|1>` `!KEKR<0|1>` `!KFIR<0|1>` `!KSND<0|1>` `!KAPO<0|1>` `!KRAD<0|1|2>` `!Krk<浮点>` `!KLx<浮点>` `!KOf<浮点>` `!KmL<浮点>` `!KmR<浮点>`
> **注意**: 设置命令参数名与值之间无需空格,直接拼接,如 `!KnM5`、`!KSND0`。
> 文档标明 DiNi 设置命令"无需输入 Δ(空格)"。
**功能命令**: `FML` 触发测量 | `SEO` 远程关机
---
## 3. 手机通信服务 (`shouji/`)
PC 端蓝牙 SPP 作为服务端,等待手机 APP 连接后,模拟 Trimble DiNi 03 水准仪向手机发送 M5 格式测量数据。
### 工作原理
```
手机APP ──蓝牙──▶ 蓝牙模块 ──SPP串口──▶ PC (本服务)
▲ │
│ ?0100 轮询 │ 测量数据 (M5)
└────────────────────────────────┘
```
- 手机通过蓝牙连接到 PC 的蓝牙模块
- 手机 APP 周期性发送 `?0100` 轮询水准仪
- 本服务响应协议握手/控制码,并在收到轮询时发送排队测量数据
- 手机似乎只接收不回复确认
### 启动
```bash
cd shouji
python main.py # 默认 0.0.0.0:58100
python main.py --port 8081 # 指定 HTTP 端口
python main.py --bt-port COM3 # 启动时自动连接蓝牙 COM 口
```
### API 接口
| 方法 | 路径 | 说明 |
|------|------|------|
| `GET` | `/` | 服务状态 + COM 口列表 |
| `GET/POST` | `/test` | 测试未连接→COM列表已连接→手机连接状态+统计 |
| `GET` | `/status` | 详细状态:手机连接、待发送数、收发字节数 |
| `POST` | `/connect` | `{"port":"COM3"}` 打开蓝牙 COM 口 |
| `POST` | `/disconnect` | 断开 |
| `POST` | `/command` | `{"cmd":"..."}` 发送命令 |
### send.py 调用示例
```bash
# 从上级目录执行
send.py --type test --port 58100
send.py --type connect --data COM3 --port 58100 # 连接蓝牙 COM 口
send.py --type command --data "send 0.89182 3.323" --port 58100 # 添加测量到队列
send.py --type command --data "send 1.50000 5.000" --port 58100 # 再添加一条
send.py --type command --data "force" --port 58100 # 强制发送 (不等轮询)
send.py --type command --data "clear" --port 58100 # 清空队列
send.py --type disconnect --port 58100
```
### /command 支持的命令
| 命令 | 格式 | 说明 |
|------|------|------|
| `send` | `send <R> <HD>` | 添加测量到发送队列 (R=标尺读数, HD=距离) |
| `force` | `force` | 强制立即发送队列中第一条 |
| `clear` | `clear` | 清空待发送队列 |
| `disconnect` | `disconnect` | 断开蓝牙连接 |
| 原始 | `hex:AABBCC` | 发送原始 hex 字节 |
---
## 4. 共享客户端 (`send.py`)
统一的命令行发送脚本,通过 `--type` / `--port` 参数调用不同服务。
### 参数
```
--type {test,connect,disconnect,command} 操作类型
--data DATA 操作数据 (COM口 / 命令 / 参数)
--host HOST 服务地址 (默认 127.0.0.1)
--port PORT 服务端口 (水准仪 58000 / 手机 58100)
```
### 示例
```bash
# 水准仪服务 (port 58000)
python send.py --type test --port 58000
python send.py --type connect --data COM6 --port 58000
python send.py --type command --data "?0000" --port 58000
python send.py --type command --data "FML" --port 58000
# 手机通信服务 (port 58100)
python send.py --type test --port 58100
python send.py --type connect --data COM3 --port 58100
python send.py --type command --data "send 0.89182 3.323" --port 58100
python send.py --type command --data "force" --port 58100
```
> **提示**: 两个 `send.py` 是同一个文件,通过 `--port` 参数区分调用哪个服务。
---
## 5. 典型工作流程
### 场景: 电脑读取水准仪数据 → 发送到手机 APP
```bash
# 终端1: 启动水准仪服务
cd shuizhunyi
python main.py --dini-port COM6 # 端口 58000
# 终端2: 启动手机通信服务
cd shouji
python main.py --bt-port COM3 # 端口 58100
# 终端3: 操作
python send.py --type command --data "FML" --port 58000 # 从水准仪获取测量
# → 假设返回 staff_reading=0.89182, distance=3.323
python send.py --type command --data "send 0.89182 3.323" --port 58100 # 排队发给手机
# 手机APP发送 ?0100 轮询时自动收到数据
```
---
## 6. 注意事项
1. **串口独占**: 每个 COM 口只能有一个程序打开,两个服务需使用不同的 COM 口
2. **蓝牙配对**: 使用前确保 Trimble-1780 / 蓝牙模块已在 Windows 中配对,生成虚拟 COM 口
3. **波特率**: DiNi 水准仪默认 9600需与仪器内菜单设置一致
4. **手机通信方向**: 手机通信服务使用 **incoming** SPP 端口PC 作为服务端),启动时标 ★ 推荐的即为 incoming 端口
5. **DiNi 设置命令格式**: 参数名与值之间不加空格(如 `!KSND0` 而非 `!KSND 0`),服务端已自动处理

133
send.py Normal file
View File

@@ -0,0 +1,133 @@
# -*- coding: utf-8 -*-
"""
DiNi 蓝牙服务 — 命令行发送脚本
===============================
通过传入参数请求 FastAPI 服务。
用法:
python send.py --type test # 测试接口
python send.py --type test --data "measure=false" # 测试 (不触发测量)
python send.py --type connect --data "COM6" # 连接 COM6
python send.py --type disconnect # 断开连接
python send.py --type command --data "?0000" # 发送 DiNi 命令
python send.py --type command --data "FML" # 触发测量
python send.py --type command --data "!KnM 5" # 设置参数
python send.py --type command --data "disconnect" # 断开 (命令方式)
python send.py --host 192.168.1.100 --port 58000 # 连接远程服务
"""
import sys
import json
import argparse
import requests
# 默认服务地址
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 58000
def main():
parser = argparse.ArgumentParser(
description="DiNi 蓝牙服务 — 命令行发送脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
%(prog)s --type test
%(prog)s --type connect --data COM6
%(prog)s --type command --data "?0000"
%(prog)s --type command --data "FML"
%(prog)s --type disconnect
""",
)
parser.add_argument(
"--type", required=True,
choices=["test", "connect", "disconnect", "command"],
help="操作类型"
)
parser.add_argument(
"--data", default="",
help="操作数据 (COM口名 / DiNi命令 / test参数)"
)
parser.add_argument(
"--host", default=DEFAULT_HOST,
help=f"FastAPI 服务地址 (默认 {DEFAULT_HOST})"
)
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT,
help=f"FastAPI 服务端口 (默认 {DEFAULT_PORT})"
)
args = parser.parse_args()
base_url = f"http://{args.host}:{args.port}"
try:
# ── test ──
if args.type == "test":
# 支持 --data 传入查询参数, 如 "measure=false"
params = {}
if args.data:
for kv in args.data.split("&"):
if "=" in kv:
k, v = kv.split("=", 1)
params[k.strip()] = v.strip()
resp = requests.get(f"{base_url}/test", params=params, timeout=15)
# ── connect ──
elif args.type == "connect":
if not args.data:
print("✗ --type connect 需要 --data 指定 COM 口, 如: --data COM6")
sys.exit(1)
# 也支持 --data "COM6 19200" 指定波特率
parts = args.data.strip().split()
port = parts[0]
baudrate = int(parts[1]) if len(parts) > 1 else 9600
resp = requests.post(
f"{base_url}/connect",
json={"port": port, "baudrate": baudrate},
timeout=10,
)
# ── disconnect ──
elif args.type == "disconnect":
resp = requests.post(f"{base_url}/disconnect", timeout=10)
# ── command ──
elif args.type == "command":
if not args.data:
print("✗ --type command 需要 --data 指定命令, 如: --data \"?0000\"")
sys.exit(1)
resp = requests.post(
f"{base_url}/command",
json={"cmd": args.data},
timeout=30,
)
# ── 输出结果 ──
print(f"HTTP {resp.status_code} {resp.reason}")
print("-" * 50)
try:
data = resp.json()
print(json.dumps(data, indent=2, ensure_ascii=False))
except json.JSONDecodeError:
print(resp.text)
if not resp.ok:
sys.exit(1)
except requests.exceptions.ConnectionError:
print(f"✗ 无法连接到服务 {base_url}")
print(" 请确认服务已启动: python start_service.py")
sys.exit(1)
except requests.exceptions.Timeout:
print(f"✗ 请求超时")
sys.exit(1)
except KeyboardInterrupt:
print("\n中断")
sys.exit(1)
if __name__ == "__main__":
main()

Binary file not shown.

761
shouji/main.py Normal file
View File

@@ -0,0 +1,761 @@
# -*- coding: utf-8 -*-
"""
手机蓝牙通信 — FastAPI 服务 (单文件版)
=======================================
PC端通过蓝牙 SPP 模拟 Trimble DiNi 水准仪等待手机APP连接。
当手机发送 ?0100 轮询时,自动将排队的测量数据以 M5 格式发送给手机。
用法:
python main.py # 默认 0.0.0.0:58100
python main.py --port 8080 # 指定 HTTP 端口
python main.py --bt-port COM3 # 启动时自动连接蓝牙 COM 口
API 接口:
GET / 服务信息 + 连接状态 + COM 口列表
GET /test 测试接口 (未连接→COM列表, 已连接→手机状态)
POST /connect {port} 打开蓝牙 COM 口, 等待手机连接
POST /disconnect 断开
POST /command {cmd} 命令: send <r> <hd> 发送测量 | disconnect 断开
GET /status 手机连接状态 + 待发送队列
协议说明:
手机→PC: 0x02F0... 握手 | ?0100 轮询 | 0x050B8D 控制码 | KENC 编码
PC→手机: 0x02E0... 握手 | M5 测量数据行
依赖: pip install fastapi uvicorn pyserial
"""
import sys
import time
import datetime
import hashlib
import threading
import asyncio
import argparse
from contextlib import asynccontextmanager
from typing import Optional
import serial
import serial.tools.list_ports
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
# ════════════════════════════════════════════════════════════════
# 常量
# ════════════════════════════════════════════════════════════════
DEFAULT_HTTP_PORT = 58100
DEFAULT_BAUDRATE = 9600
# ── 协议常量 ──
HANDSHAKE_PHONE = bytes([0x02, 0xF0, 0x00, 0x00, 0xDE, 0x03, 0x00, 0x07])
HANDSHAKE_PC = bytes([0x02, 0xE0, 0x00, 0x00, 0x7F, 0x00, 0x00, 0x07])
CONTROL_CODE = bytes([0x05, 0x0B, 0x8D])
QUERY_MARKER = b"?0100"
KENC_ON_MARKER = b"KENC | 1 bit"
KENC_OFF_MARKER = b"KENC | 0 bit"
END_ACK = b"\r\n@"
# ════════════════════════════════════════════════════════════════
# COM 口扫描
# ════════════════════════════════════════════════════════════════
def list_com_ports() -> list[dict]:
"""列出所有 COM 口并标注蓝牙类型"""
result = []
for p in serial.tools.list_ports.comports():
hwid = (p.hwid or "").upper()
desc_raw = p.description or ""
is_bt_spp = "00001101" in hwid
is_incoming = is_bt_spp and "000000000000" in hwid
is_outgoing = is_bt_spp and not is_incoming
is_bluetooth = "BTHENUM" in hwid or is_bt_spp or \
any(kw in desc_raw.lower() for kw in
["bluetooth", "spp", "rfcomm", "蓝牙"])
result.append({
"device": p.device,
"description": desc_raw,
"hwid": p.hwid or "",
"is_bluetooth": bool(is_bluetooth),
"is_spp_incoming": is_incoming,
"is_spp_outgoing": is_outgoing,
})
return result
def find_bt_spp_ports() -> tuple[list[str], list[str]]:
"""返回 (incoming, outgoing) — incoming是手机接入端口, 推荐"""
incoming, outgoing = [], []
for p in serial.tools.list_ports.comports():
hwid = (p.hwid or "").upper()
if "BTHENUM" in hwid and "00001101" in hwid:
if "000000000000" in hwid:
incoming.append(p.device)
else:
outgoing.append(p.device)
return incoming, outgoing
def print_com_list():
"""打印可连接 COM 口到控制台"""
ports = list_com_ports()
incoming, outgoing = find_bt_spp_ports()
print("=" * 60)
print(" 可连接 COM 口列表 (手机通信)")
print("-" * 60)
if not ports:
print(" (未检测到任何 COM 口)")
else:
rec_ports = set(incoming)
bt = [p for p in ports if p.get("is_bluetooth")]
other = [p for p in ports if not p.get("is_bluetooth")]
if bt:
print(f"\n 蓝牙端口 ({len(bt)} 个):")
for p in bt:
rec = " ★ 推荐(手机接入)" if p["device"] in rec_ports else ""
print(f" {p['device']:<8}{p['description']}{rec}")
if other:
print(f"\n 其他 COM 口 ({len(other)} 个):")
for p in other:
print(f" {p['device']:<8}{p['description']}")
print("=" * 60)
return ports
# ════════════════════════════════════════════════════════════════
# 协议处理器 — 解析手机发来的二进制/文本数据
# ════════════════════════════════════════════════════════════════
class ProtocolHandler:
"""处理手机 APP 通过蓝牙模块发来的数据,提取握手/轮询/控制命令"""
def __init__(self):
self.buf = b""
def feed(self, data: bytes) -> tuple[list[bytes], bool]:
"""
喂入数据 → (待发送响应列表, 是否有 ?0100 轮询)
"""
self.buf += data
responses = []
has_query = False
while self.buf:
first = self.buf[:1]
# ── 二进制握手 (8字节) ──
if first == b"\x02":
if len(self.buf) >= 8 and self.buf[:8] == HANDSHAKE_PHONE:
responses.append(HANDSHAKE_PC)
self.buf = self.buf[8:]
continue
break
# ── 短控制码 (3字节) ──
elif first == b"\x05":
if len(self.buf) >= 3 and self.buf[:3] == CONTROL_CODE:
responses.append(CONTROL_CODE)
self.buf = self.buf[3:]
continue
break
# ── ?0100 轮询 ──
elif first in (b"\x3f", b"?"):
idx = self.buf.find(QUERY_MARKER)
if idx >= 0:
if idx > 0:
self.buf = self.buf[idx:]
has_query = True
self.buf = self.buf[len(QUERY_MARKER):]
if self.buf[:2] == b"\r\n":
self.buf = self.buf[2:]
continue
break
# ── KENC 编码设置 ──
elif KENC_ON_MARKER in self.buf:
idx = self.buf.find(KENC_ON_MARKER)
self.buf = self.buf[idx + len(KENC_ON_MARKER):]
if self.buf[:2] == b"\r\n":
self.buf = self.buf[2:]
continue
elif KENC_OFF_MARKER in self.buf:
idx = self.buf.find(KENC_OFF_MARKER)
responses.append(END_ACK)
self.buf = self.buf[idx + len(KENC_OFF_MARKER):]
if self.buf[:2] == b"\r\n":
self.buf = self.buf[2:]
continue
# ── 非打印字节 → 跳过 (RFCOMM Credit 残留) ──
elif self.buf[0] < 0x20:
self.buf = self.buf[1:]
continue
else:
break
# 缓冲区保护
if len(self.buf) > 4096:
self.buf = b""
return responses, has_query
# ════════════════════════════════════════════════════════════════
# 测量数据生成 — 模拟 DiNi M5 格式
# ════════════════════════════════════════════════════════════════
class MeasurementBuilder:
"""生成 M5 格式测量数据 (模拟 Trimble DiNi 03 输出)"""
@staticmethod
def build(staff_reading: float, distance: float) -> tuple[bytes, bytes]:
"""返回 (line1_bytes, line2_bytes)"""
now = datetime.datetime.now()
ts = now.strftime("%H:%M:%S") + str(now.microsecond // 100000)
r, hd = staff_reading, distance
h = hashlib.md5(
f"Trimble DiNi 03|{ts}|{r:.5f}|{hd:.3f}|salt".encode()
).hexdigest()
line1 = (
f"For M5|Adr |KD1 {ts} "
f"|R {r:.5f} m "
f"|HD {hd:.3f} m "
f"| @"
)
line2 = f" | <Trimble DiNi 03>{h}\r\n@"
return line1.encode("ascii"), line2.encode("ascii")
# ════════════════════════════════════════════════════════════════
# 蓝牙串口 — 后台线程读写
# ════════════════════════════════════════════════════════════════
class BtSerial:
"""蓝牙虚拟串口 — 非阻塞读, 后台线程持续收数据"""
def __init__(self, port: str):
self.port = port
self.ser: Optional[serial.Serial] = None
self.running = False
self._phone_connected = False
self._rx_queue = b""
self._read_thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
self._log_cb = None
self._last_data_time = 0.0
self._total_bytes_rx = 0
self._total_bytes_tx = 0
def _log(self, msg: str):
if self._log_cb:
self._log_cb(msg)
def open(self) -> bool:
try:
self.ser = serial.Serial(
self.port, baudrate=DEFAULT_BAUDRATE,
timeout=0, # 非阻塞
write_timeout=1.0,
)
except Exception as e:
self._log(f"✗ 打开 {self.port} 失败: {e}")
return False
self.running = True
self._log(f"{self.port} 已打开, 等待手机连接...")
self._read_thread = threading.Thread(target=self._read_loop, daemon=True)
self._read_thread.start()
return True
def close(self):
self.running = False
if self._read_thread and self._read_thread.is_alive():
self._read_thread.join(timeout=2)
if self.ser:
try:
self.ser.close()
except Exception:
pass
self.ser = None
self._phone_connected = False
@property
def phone_connected(self) -> bool:
return self._phone_connected
@property
def total_rx(self) -> int:
return self._total_bytes_rx
@property
def total_tx(self) -> int:
return self._total_bytes_tx
def _read_loop(self):
"""后台读线程 — 非阻塞轮询"""
buf = b""
while self.running:
try:
if not self.ser or not self.ser.is_open:
time.sleep(0.1)
continue
chunk = self.ser.read(4096)
if chunk:
buf += chunk
self._total_bytes_rx += len(chunk)
self._last_data_time = time.time()
if not self._phone_connected:
self._phone_connected = True
self._log("*** 手机已连接 ***")
# 按已知协议标记对齐数据
while True:
idx = -1
for marker in [b"\x02", b"\x05", b"?", b"\x3f",
b"KENC", b"!KENC"]:
pos = buf.find(marker)
if pos >= 0 and (idx < 0 or pos < idx):
idx = pos
if idx > 0:
buf = buf[idx:]
elif idx < 0:
if len(buf) > 512:
buf = b""
break
# 将对齐后的数据放入队列
with self._lock:
self._rx_queue += buf
buf = b""
break
# 超时断开检测 (30秒无数据)
if self._phone_connected and \
(time.time() - self._last_data_time) > 30:
self._phone_connected = False
self._log("*** 手机已断开 ***")
time.sleep(0.05)
except OSError:
self._phone_connected = False
time.sleep(0.5)
except Exception:
time.sleep(0.1)
def read_all(self) -> bytes:
"""主线程读取累积数据"""
with self._lock:
if self._rx_queue:
data = self._rx_queue
self._rx_queue = b""
return data
return b""
def write(self, data: bytes) -> bool:
if not self.ser or not self.ser.is_open:
return False
try:
with self._lock:
self.ser.write(data)
self.ser.flush()
self._total_bytes_tx += len(data)
return True
except Exception as e:
self._log(f"发送失败: {e}")
return False
# ════════════════════════════════════════════════════════════════
# 手机通信服务 — 串口 + 协议 + 测量队列
# ════════════════════════════════════════════════════════════════
class MobileService:
"""
手机通信主服务。
- 后台线程处理串口读写 + 协议解析
- 手机发 ?0100 轮询时自动发送排队测量数据
"""
def __init__(self):
self.serial: Optional[BtSerial] = None
self.proto = ProtocolHandler()
self._pending: list[tuple[bytes, bytes]] = [] # 待发送测量数据
self._sent_count = 0
self._poll_thread: Optional[threading.Thread] = None
self._log_cb = None
def _log(self, msg: str):
if self._log_cb:
self._log_cb(msg)
@property
def connected(self) -> bool:
return self.serial is not None and self.serial.running
@property
def phone_connected(self) -> bool:
return self.serial is not None and self.serial.phone_connected
@property
def sent_count(self) -> int:
return self._sent_count
@property
def pending_count(self) -> int:
return len(self._pending)
# ── 连接 ──
def connect(self, port: str) -> bool:
if self.serial:
self.disconnect()
self.serial = BtSerial(port)
self.serial._log_cb = self._log
if not self.serial.open():
self.serial = None
return False
# 启动轮询线程
self._start_poll()
return True
def disconnect(self):
self._stop_poll()
if self.serial:
self.serial.close()
self.serial = None
# ── 后台轮询 ──
def _start_poll(self):
self._poll_thread = threading.Thread(
target=self._poll_loop, daemon=True)
self._poll_thread.start()
def _stop_poll(self):
if self._poll_thread and self._poll_thread.is_alive():
self._poll_thread.join(timeout=2)
self._poll_thread = None
def _poll_loop(self):
"""后台轮询 — 处理收数据 + 自动响应"""
while self.serial and self.serial.running:
try:
data = self.serial.read_all()
if data:
responses, has_query = self.proto.feed(data)
# 发送协议响应 (握手/控制码)
for rsp in responses:
self.serial.write(rsp)
# 手机轮询 (±0100) + 有待发送数据 → 发送
if has_query and self._pending:
self._send_data()
time.sleep(0.05)
except Exception:
time.sleep(0.1)
# ── 数据发送 ──
def queue_measurement(self, staff_reading: float, distance: float):
"""将测量数据加入发送队列"""
line1, line2 = MeasurementBuilder.build(staff_reading, distance)
self._pending.append((line1, line2))
self._log(f"+ 队列: R={staff_reading:.5f}m HD={distance:.3f}m "
f"(共{len(self._pending)}条)")
def _send_data(self):
"""发送队列中第一条 (由轮询触发)"""
if not self._pending or not self.serial:
return
line1, line2 = self._pending.pop(0)
self.serial.write(line1)
time.sleep(0.03) # 两帧间隔 (模拟水准仪)
self.serial.write(line2)
self._sent_count += 1
self._log(f"↑ 发送 #{self._sent_count}")
def force_send(self, ignore_phone: bool = False) -> bool:
"""强制发送 (不等轮询)"""
if not self.serial or not self.serial.running:
self._log("⚠ 串口未打开")
return False
if not self.serial.phone_connected and not ignore_phone:
self._log("⚠ 手机未连接")
return False
if not self._pending:
self._log("⚠ 无待发送数据")
return False
self._send_data()
return True
def clear_queue(self):
self._pending.clear()
# ════════════════════════════════════════════════════════════════
# FastAPI — 全局状态
# ════════════════════════════════════════════════════════════════
svc = MobileService()
svc_lock = threading.Lock()
class ConnectRequest(BaseModel):
port: str
class CommandRequest(BaseModel):
cmd: str
# ════════════════════════════════════════════════════════════════
@asynccontextmanager
async def lifespan(app: FastAPI):
print("[mobile] 手机蓝牙通信服务启动")
yield
with svc_lock:
svc.disconnect()
print("[mobile] 手机蓝牙通信服务已停止")
app = FastAPI(
title="DiNi Mobile Bluetooth Service",
description="PC蓝牙SPP服务端 — 模拟水准仪, 向手机APP发送M5测量数据",
version="4.0",
lifespan=lifespan,
)
# ── GET / ──
@app.get("/")
async def root():
return {
"service": "DiNi Mobile Bluetooth Service v4.0",
"connected": svc.connected,
"port": svc.serial.port if svc.serial else None,
"phone_connected": svc.phone_connected,
"com_ports": list_com_ports() if not svc.connected else None,
}
# ── GET /status ──
@app.get("/status")
async def status():
info = {
"connected": svc.connected,
"phone_connected": svc.phone_connected,
"pending_count": svc.pending_count,
"sent_count": svc.sent_count,
}
if svc.serial:
info["port"] = svc.serial.port
info["total_rx"] = svc.serial.total_rx
info["total_tx"] = svc.serial.total_tx
return info
# ── GET/POST /test ──
@app.api_route("/test", methods=["GET", "POST"])
async def test():
if not svc.connected:
return {
"connected": False,
"message": "未连接 — 返回可连接的 COM 口列表",
"com_ports": list_com_ports(),
}
return {
"connected": True,
"port": svc.serial.port,
"phone_connected": svc.phone_connected,
"pending_count": svc.pending_count,
"sent_count": svc.sent_count,
"total_rx": svc.serial.total_rx,
"total_tx": svc.serial.total_tx,
}
# ── POST /connect ──
@app.post("/connect")
async def connect(req: ConnectRequest):
def _do():
with svc_lock:
ok = svc.connect(req.port)
if not ok:
raise HTTPException(
status_code=500,
detail=f"无法打开 {req.port},请检查 COM 口是否存在或已被占用"
)
return {"status": "connected", "port": req.port}
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _do)
# ── POST /disconnect ──
@app.post("/disconnect")
async def disconnect():
def _do():
with svc_lock:
svc.disconnect()
return {"status": "disconnected"}
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _do)
# ── POST /command ──
@app.post("/command")
async def send_command(req: CommandRequest):
"""
发送命令。
支持的命令:
send <R> <HD> → 添加测量到队列 如: "send 0.89182 3.323"
force → 强制发送队列中数据 (不等轮询)
clear → 清空待发送队列
disconnect → 断开蓝牙连接
"""
cmd = req.cmd.strip()
parts = cmd.split()
action = parts[0].lower() if parts else ""
if action == "disconnect":
def _d():
with svc_lock:
svc.disconnect()
return {"status": "disconnected", "command": "disconnect"}
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _d)
if action == "send":
# 格式: send <staff_reading> <distance>
if len(parts) < 3:
raise HTTPException(
status_code=400,
detail="格式: send <标尺读数> <距离> 如: send 0.89182 3.323"
)
try:
r = float(parts[1])
hd = float(parts[2])
except ValueError:
raise HTTPException(status_code=400, detail="参数必须是数字")
def _queue():
with svc_lock:
if not svc.connected:
raise HTTPException(status_code=400,
detail="未连接, 请先 POST /connect")
svc.queue_measurement(r, hd)
return {
"command": "send",
"staff_reading": r,
"distance": hd,
"pending_count": svc.pending_count,
}
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _queue)
if action == "force":
def _force():
with svc_lock:
ok = svc.force_send(ignore_phone=False)
return {
"command": "force",
"status": "ok" if ok else "skipped",
"pending_count": svc.pending_count,
}
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _force)
if action == "clear":
with svc_lock:
svc.clear_queue()
return {"command": "clear", "pending_count": svc.pending_count}
# ── 原始数据发送 ──
if not svc.connected:
raise HTTPException(status_code=400,
detail="未连接, 请先 POST /connect")
def _raw():
# 支持 hex: "hex:AABBCC" 或纯文本直接发送
if cmd.startswith("hex:"):
raw = bytes.fromhex(cmd[4:].replace(" ", ""))
else:
raw = cmd.encode("ascii")
ok = svc.serial.write(raw)
return {
"command": "raw",
"status": "ok" if ok else "error",
"bytes_sent": len(raw) if ok else 0,
}
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _raw)
# ════════════════════════════════════════════════════════════════
# 启动入口
# ════════════════════════════════════════════════════════════════
def main():
parser = argparse.ArgumentParser(
description="手机蓝牙通信 — FastAPI 服务 (模拟 DiNi 水准仪)")
parser.add_argument("--host", default="0.0.0.0", help="监听地址 (默认 0.0.0.0)")
parser.add_argument("--port", type=int, default=DEFAULT_HTTP_PORT,
help=f"HTTP 服务端口 (默认 {DEFAULT_HTTP_PORT})")
parser.add_argument("--bt-port", default=None,
help="启动时自动连接的蓝牙 COM 口 (如 COM3)")
args = parser.parse_args()
# ── 打印可连接 COM 口 ──
print_com_list()
# ── 可选: 启动时自动连接 ──
if args.bt_port:
print(f"\n启动时自动连接 → {args.bt_port} ...")
try:
ok = svc.connect(args.bt_port)
if ok:
print(f"✓ 启动时已自动连接 {args.bt_port}")
else:
print(f"✗ 自动连接 {args.bt_port} 失败, 服务仍正常启动")
except Exception as e:
print(f"✗ 自动连接失败: {e}")
# ── 启动 uvicorn ──
import uvicorn
print(f"\n▶ 手机蓝牙通信服务启动中...")
print(f" HTTP 地址: http://{args.host}:{args.port}")
print(f" 测试接口: http://{args.host}:{args.port}/test")
print(f" 命令接口: http://{args.host}:{args.port}/command")
print(f" API 文档: http://{args.host}:{args.port}/docs")
print(f" 按 Ctrl+C 停止服务\n")
uvicorn.run(app, host=args.host, port=args.port, workers=1, log_level="info")
if __name__ == "__main__":
main()

617
shuizhunyi/main.py Normal file
View File

@@ -0,0 +1,617 @@
# -*- coding: utf-8 -*-
"""
DiNi 水准仪 — 蓝牙连接 FastAPI 服务 (单文件版)
===============================================
启动即列出可连接 COM 口并运行 HTTP 服务,通过 API 维护持久蓝牙连接。
用法:
python main.py # 默认 0.0.0.0:58000
python main.py --port 8080 # 指定 HTTP 端口
python main.py --dini-port COM6 # 启动时自动连接 COM6
python main.py --baudrate 19200 # 指定波特率
API 接口:
GET / 服务信息 + 当前连接状态
GET /test?measure=true 测试接口 (未连接→COM列表, 已连接→标识+测量)
POST /connect {port,baudrate} 连接 COM 口
POST /disconnect 断开
POST /command {cmd} 发送 DiNi 命令 (disconnect 断开)
客户端调用:
python send.py --type test
python send.py --type connect --data COM6
python send.py --type command --data "?0000"
python send.py --type command --data "FML"
python send.py --type disconnect
依赖: pip install fastapi uvicorn pyserial
"""
import sys
import time
import re
import asyncio
import threading
import argparse
from contextlib import asynccontextmanager
from typing import Optional, Any
import serial
import serial.tools.list_ports
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
# ════════════════════════════════════════════════════════════════
# 常量
# ════════════════════════════════════════════════════════════════
DEFAULT_BAUDRATE = 9600
DEFAULT_TIMEOUT = 3.0
DEFAULT_HTTP_PORT = 58000
BT_SPP_KEYWORDS = [
"bluetooth", "serial over bluetooth", "spp",
"rfcomm", "serial port", "standard serial", "蓝牙",
]
# ════════════════════════════════════════════════════════════════
# DiNi 命令定义
# ════════════════════════════════════════════════════════════════
class DiniCommands:
READ = {
"0000": "仪器标识和软件版本",
"0100": "仪器编号 (IM)",
"Kc ": "视准误差 (i角, DMS)",
"KT30": "30cm 测试状态",
"Krk": "折光系数",
"KLx": "标尺常数",
"KOf": "标尺偏移量",
"KmL": "重复测量最大标准差",
"KmR": "重复测量最大标准差(右)",
"KnM": "最大重复测量次数",
"KEKR": "地球曲率改正",
"KREF": "折光改正",
"KRAD": "记录附加信息",
"KFIR": "倒尺测量",
"KSND": "提示音",
"KAPO": "自动关机",
}
SET = {
"KnM": ("最大重复测量次数", "int"),
"KREF": ("折光改正", "bool"),
"KEKR": ("地球曲率改正", "bool"),
"KFIR": ("倒尺测量", "bool"),
"KSND": ("提示音", "bool"),
"KAPO": ("自动关机", "bool"),
"KRAD": ("记录附加信息", "int"),
"Krk": ("折光系数", "float"),
"KLx": ("标尺常数", "float"),
"KOf": ("标尺偏移量", "float"),
"KmL": ("重复测量最大标准差", "float"),
"KmR": ("重复测量最大标准差(右)", "float"),
}
MODELS = {
"701520": "DiNi (精度 0.3mm/km)",
"701510": "DiNi (精度 0.7mm/km)",
}
ERRORS = {
"E": "命令语法错误",
"E202": "补偿器超出范围 — 请重新整平仪器",
"E320": "运行时错误 — 请重新测量",
"E321": "亮度变化过大 — 请重新测量",
"E323": "无法读取标尺 — 检查环境条件",
"E327": "标尺截取段太小",
}
# ════════════════════════════════════════════════════════════════
# 响应解析
# ════════════════════════════════════════════════════════════════
def parse_dini_response(raw: bytes) -> dict:
result = {
"type": "unknown", "param": None, "raw_text": None,
"raw_hex": raw.hex(" ").upper() if raw else "",
"values": [], "error_code": None, "error_desc": None,
}
if not raw:
result["type"] = "empty"
return result
try:
text = raw.decode("ascii", errors="replace").strip()
except Exception:
text = ""
result["raw_text"] = text
if not text:
result["type"] = "empty"
return result
if re.match(r'^Q\s*$', text):
result["type"] = "ok_ack"; result["values"] = ["Q"]; return result
if re.match(r'^E\d{0,3}$', text):
result["type"] = "error"; result["error_code"] = text
result["error_desc"] = DiniCommands.ERRORS.get(text, f"未知错误: {text}")
return result
if text.startswith("!"):
result["type"] = "param_response"
rest = text[1:].strip()
if "|" in rest:
left, right = rest.split("|", 1)
result["param"] = left.strip()
result["values"] = right.strip().split()
else:
parts = rest.split()
if parts:
result["param"] = parts[0]
result["values"] = parts[1:]
return result
numbers = re.findall(r"[-+]?\d*\.?\d+", text)
if numbers:
result["type"] = "measurement"; result["values"] = numbers; return result
result["type"] = "text"; result["values"] = [text]
return result
# ════════════════════════════════════════════════════════════════
# COM 口扫描
# ════════════════════════════════════════════════════════════════
def list_com_ports() -> list[dict]:
"""列出所有 COM 口,标注蓝牙 SPP"""
result = []
for port in serial.tools.list_ports.comports():
desc = (port.description or "").lower()
hwid = port.hwid or ""
is_bt = any(kw in desc or kw in hwid.lower() for kw in BT_SPP_KEYWORDS)
result.append({
"device": port.device,
"description": port.description or "",
"hwid": hwid,
"vid": port.vid, "pid": port.pid,
"is_bluetooth": bool(is_bt),
})
return result
def find_bluetooth_ports() -> tuple:
"""返回 (incoming_ports, outgoing_ports)"""
incoming, outgoing = [], []
for p in serial.tools.list_ports.comports():
hwid = (p.hwid or "").upper()
if "BTHENUM" in hwid and "00001101" in hwid:
if "000000000000" in hwid:
incoming.append(p.device)
else:
outgoing.append(p.device)
return incoming, outgoing
def list_connectable_ports() -> list[dict]:
"""可连接 COM 口列表,标记推荐 (outgoing SPP)"""
_, outgoing = find_bluetooth_ports()
ports = list_com_ports()
for p in ports:
p["recommended"] = p["device"] in outgoing
return ports
def print_com_list():
"""打印可连接 COM 口到控制台"""
ports = list_connectable_ports()
print("=" * 60)
print(" 可连接 COM 口列表")
print("-" * 60)
if not ports:
print(" (未检测到任何 COM 口)")
else:
bt = [p for p in ports if p.get("is_bluetooth")]
other = [p for p in ports if not p.get("is_bluetooth")]
if bt:
print(f"\n 蓝牙 SPP 端口 ({len(bt)} 个):")
for p in bt:
rec = " ★ 推荐" if p.get("recommended") else ""
print(f" {p['device']:<8}{p['description']}{rec}")
if other:
print(f"\n 其他 COM 口 ({len(other)} 个):")
for p in other:
print(f" {p['device']:<8}{p['description']}")
print("=" * 60)
return ports
# ════════════════════════════════════════════════════════════════
# DiNi 客户端
# ════════════════════════════════════════════════════════════════
class DiniClient:
"""通过蓝牙 Trimble-1780 SPP 虚拟串口与 DiNi 水准仪通信"""
def __init__(self, port: str, baudrate: int = DEFAULT_BAUDRATE,
timeout: float = DEFAULT_TIMEOUT):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.ser: Optional[serial.Serial] = None
self._log_cb = None
def _log(self, msg: str):
if self._log_cb:
self._log_cb(msg)
# ── 连接 ──
def connect(self) -> bool:
try:
self.ser = serial.Serial(
port=self.port, baudrate=self.baudrate,
bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, timeout=self.timeout,
write_timeout=self.timeout,
)
self._log(f"✓ DiNi 已连接 {self.port} @ {self.baudrate}")
return True
except serial.SerialException as e:
self._log(f"✗ 无法打开 {self.port}: {e}")
return False
def disconnect(self):
if self.ser and self.ser.is_open:
self.ser.close()
self._log(f"✓ DiNi 已断开 {self.port}")
self.ser = None
def is_connected(self) -> bool:
return self.ser is not None and self.ser.is_open
# ── 通信 ──
def send_command(self, cmd: str, measure_mode: bool = False) -> dict:
if not self.is_connected():
return {"status": "disconnected", "type": "empty",
"error_desc": "未连接", "raw_hex": "", "raw_text": None,
"values": []}
cmd_ascii = cmd.strip().rstrip(".")
if not cmd_ascii.endswith("\r\n"):
cmd_ascii += "\r\n"
tx_bytes = cmd_ascii.encode("ascii")
self._log(f"TX: {tx_bytes!r}")
self.ser.reset_input_buffer()
self.ser.write(tx_bytes)
self.ser.flush()
time.sleep(2.0 if measure_mode else 0.25)
raw = self._read_response()
if raw:
self._log(f"RX ({len(raw)}B): {raw.hex(' ').upper()}")
else:
self._log("RX: (超时无数据)")
result = parse_dini_response(raw)
result["status"] = {
"param_response": "ok", "measurement": "ok", "text": "ok",
"ok_ack": "ok", "error": "error", "empty": "timeout",
}.get(result["type"], "ok")
result["tx"] = repr(tx_bytes)
return result
def _read_response(self) -> bytes:
response = bytearray()
deadline = time.time() + self.timeout
last_data_time = time.time()
got_data = False
while time.time() < deadline:
w = self.ser.in_waiting
if w > 0:
chunk = self.ser.read(w)
response.extend(chunk)
last_data_time = time.time()
got_data = True
if b"\n" in chunk or b"\r" in chunk:
time.sleep(0.08)
extra = self.ser.read(self.ser.in_waiting)
if extra:
response.extend(extra)
break
else:
if got_data and (time.time() - last_data_time) > 0.4:
break
time.sleep(0.05)
return bytes(response)
# ── 高层 API ──
def read_param(self, param: str) -> dict:
result = self.send_command(f"?{param}")
result["param_requested"] = param
if result["type"] == "param_response" and result["param"] is None:
result["param"] = param
return result
def set_param(self, param: str, value: Any) -> dict:
info = DiniCommands.SET.get(param)
if info:
_, vtype = info
s = str(value)
if vtype == "bool":
val = "1" if s in ("1", "true", "True", "on") else "0"
elif vtype == "float":
val = f"{float(value):.5f}"
elif vtype == "int":
val = str(int(value))
else:
val = s
else:
val = str(value)
result = self.send_command(f"!{param}{val}")
result["param_requested"] = param
result["value_sent"] = val
return result
def measure(self) -> dict:
result = self.send_command("FML", measure_mode=True)
if result["type"] == "measurement" and result["values"]:
vals = result["values"]
if len(vals) >= 1: result["staff_reading"] = float(vals[0])
if len(vals) >= 2: result["distance"] = float(vals[1])
if len(vals) >= 3: result["height_diff"] = float(vals[2])
if len(vals) >= 4: result["std_dev"] = float(vals[3])
return result
def identify(self) -> dict:
result = self.read_param("0000")
info = {"ok": False, "model": "", "model_name": "", "sw_version": "",
"raw_text": result.get("raw_text")}
if result["status"] == "ok" and result["values"]:
vals = result["values"]
model = vals[0] if vals else ""
info["ok"] = True
info["model"] = model
info["model_name"] = DiniCommands.MODELS.get(model, f"型号 {model}")
info["sw_version"] = vals[1] if len(vals) > 1 else ""
else:
info["error_desc"] = result.get("error_desc") or result.get("status")
return info
def shutdown(self) -> dict:
return self.send_command("SEO")
# ════════════════════════════════════════════════════════════════
# FastAPI — 全局状态
# ════════════════════════════════════════════════════════════════
dini: Optional[DiniClient] = None
dini_lock = threading.Lock()
class ConnectRequest(BaseModel):
port: str
baudrate: int = DEFAULT_BAUDRATE
class CommandRequest(BaseModel):
cmd: str
# ════════════════════════════════════════════════════════════════
# 串口操作 (在线程池中执行, 带锁)
# ════════════════════════════════════════════════════════════════
async def _run_serial(func, *args, **kwargs):
loop = asyncio.get_running_loop()
def _locked():
with dini_lock:
return func(*args, **kwargs)
return await loop.run_in_executor(None, _locked)
def _sync_connect(req: ConnectRequest) -> dict:
global dini
if dini and dini.is_connected():
dini.disconnect()
client = DiniClient(port=req.port, baudrate=req.baudrate)
if not client.connect():
dini = None
raise HTTPException(status_code=500,
detail=f"无法连接到 {req.port},请检查 COM 口是否存在或已被占用")
dini = client
return {"status": "connected", "port": client.port, "baudrate": client.baudrate}
def _sync_disconnect() -> dict:
global dini
if dini and dini.is_connected():
port = dini.port
dini.disconnect()
dini = None
return {"status": "disconnected", "port": port}
dini = None
return {"status": "not_connected"}
def _sync_test(client: DiniClient, do_measure: bool) -> dict:
result = {"connected": True, "port": client.port, "baudrate": client.baudrate}
# 1. 仪器标识
ident = client.identify()
result["instrument_info"] = {
"ok": ident.get("ok"),
"model": ident.get("model"),
"model_name": ident.get("model_name"),
"software_version": ident.get("sw_version"),
}
if not ident.get("ok"):
result["instrument_info"]["error"] = ident.get("error_desc", "无响应")
# 2. 测量
if do_measure:
m = client.measure()
result["measurement"] = {
"status": m.get("status"),
"staff_reading": m.get("staff_reading"),
"distance": m.get("distance"),
"height_diff": m.get("height_diff"),
"std_dev": m.get("std_dev"),
"raw_text": m.get("raw_text"),
}
if m.get("status") == "error":
result["measurement"]["error"] = m.get("error_desc")
return result
def _sync_send_command(client: DiniClient, cmd: str) -> dict:
c = cmd.strip()
cu = c.upper().rstrip(".")
if cu == "FML": return client.measure()
if cu == "SEO": return client.shutdown()
if c.startswith("?"): return client.read_param(c[1:].rstrip("."))
if c.startswith("!"):
parts = c[1:].rstrip(".").split(maxsplit=1)
return client.set_param(parts[0], parts[1] if len(parts) > 1 else "")
return client.send_command(c)
def _fmt_result(cmd: str, r: dict) -> dict:
return {
"command": cmd, "status": r.get("status"), "type": r.get("type"),
"param": r.get("param"), "param_requested": r.get("param_requested"),
"values": r.get("values"), "raw_text": r.get("raw_text"),
"raw_hex": r.get("raw_hex"), "error_code": r.get("error_code"),
"error_desc": r.get("error_desc"),
"staff_reading": r.get("staff_reading"),
"distance": r.get("distance"),
"height_diff": r.get("height_diff"),
"std_dev": r.get("std_dev"), "tx": r.get("tx"),
}
# ════════════════════════════════════════════════════════════════
# FastAPI 应用
# ════════════════════════════════════════════════════════════════
@asynccontextmanager
async def lifespan(app: FastAPI):
print("[bt_service] DiNi 蓝牙服务启动")
yield
global dini
with dini_lock:
if dini and dini.is_connected():
dini.disconnect()
print("[bt_service] DiNi 蓝牙服务已停止")
app = FastAPI(
title="DiNi Bluetooth Service",
description="通过 Trimble-1780 蓝牙适配器远程控制天宝 DiNi 水准仪",
version="4.0",
lifespan=lifespan,
)
# ── GET / ──
@app.get("/")
async def root():
client = dini
connected = client is not None and client.is_connected()
info = {"service": "DiNi Bluetooth Service v4.0", "connected": connected}
if connected:
info["port"] = client.port
info["baudrate"] = client.baudrate
else:
info["com_ports"] = list_connectable_ports()
return info
# ── POST /connect ──
@app.post("/connect")
async def connect(req: ConnectRequest):
return await _run_serial(_sync_connect, req)
# ── POST /disconnect ──
@app.post("/disconnect")
async def disconnect():
return await _run_serial(_sync_disconnect)
# ── GET/POST /test ──
@app.api_route("/test", methods=["GET", "POST"])
async def test(
measure: bool = Query(default=True,
description="已连接时是否同时触发测量 (FML)")
):
client = dini
connected = client is not None and client.is_connected()
if not connected:
return {
"connected": False,
"message": "未连接 — 返回可连接的 COM 口列表",
"com_ports": list_connectable_ports(),
}
return await _run_serial(_sync_test, client, measure)
# ── POST /command ──
@app.post("/command")
async def send_command(req: CommandRequest):
cmd = req.cmd.strip()
if cmd.lower() == "disconnect":
return await _run_serial(_sync_disconnect)
client = dini
if client is None or not client.is_connected():
raise HTTPException(status_code=400,
detail="未连接到仪器,请先调用 POST /connect")
result = await _run_serial(_sync_send_command, client, cmd)
return _fmt_result(cmd, result)
# ════════════════════════════════════════════════════════════════
# 启动入口
# ════════════════════════════════════════════════════════════════
def main():
parser = argparse.ArgumentParser(
description="DiNi 水准仪 — 蓝牙连接 FastAPI 服务")
parser.add_argument("--host", default="0.0.0.0", help="监听地址 (默认 0.0.0.0)")
parser.add_argument("--port", type=int, default=DEFAULT_HTTP_PORT,
help=f"HTTP 服务端口 (默认 {DEFAULT_HTTP_PORT})")
parser.add_argument("--dini-port", default=None,
help="启动时自动连接的 COM 口 (如 COM6)")
parser.add_argument("--baudrate", type=int, default=DEFAULT_BAUDRATE,
help=f"DiNi 波特率 (默认 {DEFAULT_BAUDRATE})")
args = parser.parse_args()
# ── 打印可连接 COM 口 ──
print_com_list()
# ── 可选: 启动时自动连接 ──
if args.dini_port:
print(f"\n启动时自动连接 → {args.dini_port} ...")
try:
global dini
client = DiniClient(port=args.dini_port, baudrate=args.baudrate)
if client.connect():
dini = client
print(f"✓ 启动时已自动连接 {args.dini_port}")
else:
print(f"✗ 自动连接 {args.dini_port} 失败, 服务仍正常启动")
except Exception as e:
print(f"✗ 自动连接失败: {e}")
# ── 启动 uvicorn ──
import uvicorn
print(f"\n▶ DiNi 蓝牙服务启动中...")
print(f" HTTP 地址: http://{args.host}:{args.port}")
print(f" 测试接口: http://{args.host}:{args.port}/test")
print(f" 命令接口: http://{args.host}:{args.port}/command")
print(f" API 文档: http://{args.host}:{args.port}/docs")
print(f" 按 Ctrl+C 停止服务\n")
uvicorn.run(app, host=args.host, port=args.port, workers=1, log_level="info")
if __name__ == "__main__":
main()