初始化
This commit is contained in:
529
ir_send.py
Normal file
529
ir_send.py
Normal file
@@ -0,0 +1,529 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
红外收发板 — 发送脚本 (投影仪遥控)
|
||||
===================================
|
||||
通过 USB-TTL 连接红外收发板,发送红外信号控制投影仪。
|
||||
|
||||
协议说明 (来自板子文档):
|
||||
发送红外信号: {A1, F1, <红外码3字节>}
|
||||
修改通信地址: {A1, F2, <新地址>, 00, 00}
|
||||
修改波特率: {A1, F3, <速率编号>, 00, 00}
|
||||
|
||||
其中:
|
||||
A1 = 帧头
|
||||
F1 = 发射红外指令
|
||||
F2 = 修改通信地址
|
||||
F3 = 修改波特率
|
||||
|
||||
用法:
|
||||
python ir_send.py -p COM8 up # 上
|
||||
python ir_send.py -p COM8 down # 下
|
||||
python ir_send.py -p COM8 left # 左
|
||||
python ir_send.py -p COM8 right # 右
|
||||
python ir_send.py -p COM8 --raw 3BC40D # 发送原始红外码
|
||||
python ir_send.py -p COM8 -i # 交互模式 (w/s/a/d 方向键控制)
|
||||
|
||||
依赖: pip install pyserial
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import argparse
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# 红外板协议常量
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
FRAME_HEADER = 0xA1 # 帧头
|
||||
CMD_SEND_IR = 0xF1 # 发射红外信号
|
||||
CMD_SET_ADDR = 0xF2 # 修改通信地址
|
||||
CMD_SET_BAUD = 0xF3 # 修改波特率
|
||||
|
||||
DEFAULT_BAUDRATE = 9600
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# 遥控器按键码 (从接收中采集到的红外原始码, 3字节)
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
IR_CODES = {
|
||||
# 方向键
|
||||
"up": bytes([0x3B, 0xC4, 0x0D]),
|
||||
"left": bytes([0x3B, 0xC4, 0x10]),
|
||||
"down": bytes([0x3B, 0xC4, 0x15]),
|
||||
"right": bytes([0x3B, 0xC4, 0x12]),
|
||||
# 继续通过接收测试添加...
|
||||
# "ok": bytes([0x3B, 0xC4, 0x??]),
|
||||
# "menu": bytes([0x3B, 0xC4, 0x??]),
|
||||
# "power": bytes([0x3B, 0xC4, 0x??]),
|
||||
}
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# 帧构建
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
def build_send_frame(ir_code: bytes) -> bytes:
|
||||
"""
|
||||
构建红外发射帧: {A1, F1, <3字节红外码>}
|
||||
|
||||
参数:
|
||||
ir_code: 3 字节红外原始码, 如 b'\x3B\xC4\x0D'
|
||||
返回:
|
||||
5 字节完整发送帧
|
||||
"""
|
||||
if len(ir_code) != 3:
|
||||
raise ValueError(f"红外码必须是 3 字节, 实际 {len(ir_code)} 字节")
|
||||
|
||||
return bytes([FRAME_HEADER, CMD_SEND_IR]) + ir_code
|
||||
|
||||
|
||||
def build_set_addr_frame(new_addr: int) -> bytes:
|
||||
"""构建修改地址帧: {A1, F2, <新地址>, 00, 00}"""
|
||||
return bytes([FRAME_HEADER, CMD_SET_ADDR, new_addr & 0xFF, 0x00, 0x00])
|
||||
|
||||
|
||||
def build_set_baud_frame(baud_code: int) -> bytes:
|
||||
"""
|
||||
构建修改波特率帧: {A1, F3, <速率编号>, 00, 00}
|
||||
|
||||
已知波特率编号 (根据文档):
|
||||
01 = 4800bps
|
||||
(其他待补充, 默认 9600 可能对应 00 或其他值)
|
||||
"""
|
||||
return bytes([FRAME_HEADER, CMD_SET_BAUD, baud_code & 0xFF, 0x00, 0x00])
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# 串口扫描
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
USB_TTL_KEYWORDS = [
|
||||
"ch343", "ch340", "ch341", "ch342", "ch344",
|
||||
"usb-enhanced-serial", "usb serial", "usb-serial",
|
||||
"ft232", "ftdi", "cp210", "pl2303", "wch",
|
||||
]
|
||||
|
||||
|
||||
def find_port():
|
||||
"""自动查找 USB-to-TTL 串口"""
|
||||
ports = serial.tools.list_ports.comports()
|
||||
for port in ports:
|
||||
combined = f"{port.description or ''} {port.hwid or ''}".lower()
|
||||
if any(kw in combined for kw in USB_TTL_KEYWORDS):
|
||||
print(f"→ 自动选择: {port.device} — {port.description}")
|
||||
return port.device
|
||||
all_ports = list(ports)
|
||||
if all_ports:
|
||||
print(f"→ 回退使用: {all_ports[0].device}")
|
||||
return all_ports[0].device
|
||||
return None
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# 发送核心
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
def send_ir(ser: serial.Serial, ir_code: bytes, repeat: int = 1):
|
||||
"""
|
||||
|
||||
|
||||
参数:
|
||||
ser: 已打开的串口
|
||||
ir_code: 3字节红外原始码
|
||||
repeat: 重复发送次数
|
||||
"""
|
||||
frame = build_send_frame(ir_code)
|
||||
|
||||
|
||||
|
||||
for i in range(repeat):
|
||||
ser.write(frame)
|
||||
ser.flush()
|
||||
hex_str = ir_code.hex(" ").upper()
|
||||
if repeat > 1:
|
||||
print(f" 发送 [{i+1}/{repeat}]: {hex_str}")
|
||||
else:
|
||||
print(f" 发送: {hex_str} → 帧 {frame.hex(' ').upper()}")
|
||||
|
||||
if i < repeat - 1:
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def send_raw_ir(port: str, hex_str: str, repeat: int = 1):
|
||||
"""发送原始 3 字节红外码 (hex 字符串)"""
|
||||
hex_str = hex_str.replace(" ", "").replace("-", "")
|
||||
if len(hex_str) != 6:
|
||||
print(f"✗ 红外码应为 6 位 hex (3字节), 实际 {len(hex_str)} 位")
|
||||
return
|
||||
|
||||
try:
|
||||
ir_code = bytes.fromhex(hex_str)
|
||||
except ValueError as e:
|
||||
print(f"✗ HEX 格式错误: {e}")
|
||||
return
|
||||
|
||||
ser = serial.Serial(port, baudrate=DEFAULT_BAUDRATE, timeout=0.5)
|
||||
try:
|
||||
send_ir(ser, ir_code, repeat)
|
||||
finally:
|
||||
ser.close()
|
||||
|
||||
|
||||
def send_named(port: str, name: str, repeat: int = 1):
|
||||
"""发送命名按键"""
|
||||
name = name.lower()
|
||||
ir_code = IR_CODES.get(name)
|
||||
if ir_code is None:
|
||||
print(f"✗ 未知按键: {name}")
|
||||
print(f" 已知按键: {', '.join(IR_CODES.keys())}")
|
||||
return
|
||||
|
||||
ser = serial.Serial(port, baudrate=DEFAULT_BAUDRATE, timeout=0.5)
|
||||
try:
|
||||
send_ir(ser, ir_code, repeat)
|
||||
finally:
|
||||
ser.close()
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# 交互模式 (同时收发)
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
INTERACTIVE_HELP = """
|
||||
╔══════════════════════════════════════════════╗
|
||||
║ 投影仪红外遥控 — 收发一体交互模式 ║
|
||||
╠══════════════════════════════════════════════╣
|
||||
║ w/↑=上 s/↓=下 a/←=左 d/→=右 ║
|
||||
║ Enter = 重复上次按键 ║
|
||||
╠══════════════════════════════════════════════╣
|
||||
║ :raw <hex> 发送自定义红外码 ║
|
||||
║ :learn <名> <hex> 学习新按键 ║
|
||||
║ :keys 列出已存按键 ║
|
||||
║ :h 帮助 ║
|
||||
║ :q 退出 ║
|
||||
╠══════════════════════════════════════════════╣
|
||||
║ 接收到的红外信号会实时显示在屏幕上 ║
|
||||
╚══════════════════════════════════════════════╝
|
||||
"""
|
||||
|
||||
KEY_MAP = {
|
||||
'w': 'up', 's': 'down', 'a': 'left', 'd': 'right',
|
||||
'up': 'up', 'down': 'down', 'left': 'left', 'right': 'right',
|
||||
}
|
||||
|
||||
# 接收到的新信号 → 显示的标签
|
||||
RX_LABELS = {
|
||||
0x0D: "↑ 上",
|
||||
0x15: "↓ 下",
|
||||
0x10: "← 左",
|
||||
0x12: "→ 右",
|
||||
}
|
||||
|
||||
|
||||
def _kbhit():
|
||||
"""检查是否有按键 (非阻塞)"""
|
||||
try:
|
||||
import msvcrt
|
||||
return msvcrt.kbhit()
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
def _getch():
|
||||
"""读取单个按键 (阻塞, 仅在 kbhit() 返回 True 后调用)"""
|
||||
try:
|
||||
import msvcrt
|
||||
ch = msvcrt.getch()
|
||||
if ch == b'\xe0':
|
||||
ch2 = msvcrt.getch()
|
||||
return {
|
||||
b'H': 'up', b'P': 'down', b'K': 'left', b'M': 'right',
|
||||
}.get(ch2, '')
|
||||
return ch.decode("utf-8", errors="replace").lower()
|
||||
except ImportError:
|
||||
return ""
|
||||
|
||||
|
||||
def run_interactive(port: str, repeat: int = 1):
|
||||
"""收发一体交互模式 — 同时接收红外信号 + 按键发送"""
|
||||
import msvcrt
|
||||
|
||||
codes = dict(IR_CODES)
|
||||
cfg = {"repeat": repeat} # 可变容器, 允许命令中修改
|
||||
ser = serial.Serial(port, baudrate=DEFAULT_BAUDRATE, timeout=0)
|
||||
print(f"✓ 已连接 {port} @ {DEFAULT_BAUDRATE} baud")
|
||||
print(INTERACTIVE_HELP)
|
||||
|
||||
last_key = None # 上一个按键名 (用于 Enter 重复)
|
||||
rx_buffer = bytearray()
|
||||
rx_last_byte_time = 0
|
||||
|
||||
print("┌──────────────────────────────────────────┐")
|
||||
print("│ 等待按键或红外信号... │")
|
||||
print("└──────────────────────────────────────────┘")
|
||||
|
||||
try:
|
||||
while True:
|
||||
got_input = False
|
||||
|
||||
# ── 1. 检查串口接收 ──
|
||||
try:
|
||||
n = ser.in_waiting
|
||||
except (serial.SerialException, OSError):
|
||||
print("\n✗ 串口断开")
|
||||
break
|
||||
|
||||
if n > 0:
|
||||
data = ser.read(n)
|
||||
now = time.time()
|
||||
|
||||
# 判断是否为新信号 (距上次字节 > 200ms)
|
||||
if rx_buffer and (now - rx_last_byte_time) > 0.2:
|
||||
# 打印上一个信号
|
||||
_print_rx(bytes(rx_buffer))
|
||||
rx_buffer.clear()
|
||||
|
||||
rx_buffer.extend(data)
|
||||
rx_last_byte_time = now
|
||||
got_input = True
|
||||
|
||||
# 缓冲区有数据但已空闲 → 打印
|
||||
if rx_buffer and (time.time() - rx_last_byte_time) > 0.2:
|
||||
_print_rx(bytes(rx_buffer))
|
||||
rx_buffer.clear()
|
||||
got_input = True
|
||||
|
||||
# ── 2. 检查键盘输入 (非阻塞) ──
|
||||
if msvcrt.kbhit():
|
||||
ch = _getch()
|
||||
got_input = True
|
||||
|
||||
# 方向键
|
||||
name = KEY_MAP.get(ch)
|
||||
if name and name in codes:
|
||||
send_ir(ser, codes[name], cfg["repeat"])
|
||||
last_key = name
|
||||
continue
|
||||
|
||||
# 回车 = 重复上次
|
||||
if ch in ('\r', '\n'):
|
||||
if last_key and last_key in codes:
|
||||
send_ir(ser, codes[last_key], cfg["repeat"])
|
||||
continue
|
||||
|
||||
# 命令模式
|
||||
if ch in (':', '/', '\\'):
|
||||
# 读取整行命令
|
||||
print(f"\nIR> :", end="", flush=True)
|
||||
cmd_line = _read_line()
|
||||
_handle_command(ser, codes, cmd_line, cfg, last_key)
|
||||
print("┌──────────────────────────────────────────┐")
|
||||
print("│ 等待按键或红外信号... │")
|
||||
print("└──────────────────────────────────────────┘")
|
||||
continue
|
||||
|
||||
# 退出
|
||||
if ch in ('q', '\x1b', 'esc'):
|
||||
print("\n退出")
|
||||
break
|
||||
|
||||
# 帮助
|
||||
if ch in ('h', '?'):
|
||||
print("\n" + INTERACTIVE_HELP)
|
||||
continue
|
||||
|
||||
# ── 3. 空闲时短暂休眠 ──
|
||||
if not got_input:
|
||||
time.sleep(0.02)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n用户中断")
|
||||
finally:
|
||||
ser.close()
|
||||
print(f"✓ 已关闭 {port}")
|
||||
|
||||
|
||||
def _print_rx(data: bytes):
|
||||
"""格式化打印接收到的红外信号"""
|
||||
|
||||
ts = time.strftime("%H:%M:%S")
|
||||
hex_str = data.hex(" ").upper()
|
||||
|
||||
# 尝试识别
|
||||
label = ""
|
||||
if len(data) == 3 and data[0] == 0x3B and data[1] == 0xC4:
|
||||
label = RX_LABELS.get(data[2], f"键值 0x{data[2]:02X}")
|
||||
|
||||
if label:
|
||||
print(f"\n ← [{ts}] 收到: {hex_str} {label}")
|
||||
else:
|
||||
print(f"\n ← [{ts}] 收到: {hex_str} ({len(data)}字节)")
|
||||
|
||||
|
||||
def _read_line() -> str:
|
||||
"""读取一行输入 (在 kbhit 之后调用)"""
|
||||
import msvcrt
|
||||
line = ""
|
||||
while True:
|
||||
ch = msvcrt.getch()
|
||||
if ch in (b'\r', b'\n'):
|
||||
print()
|
||||
return line
|
||||
if ch == b'\x08': # Backspace
|
||||
if line:
|
||||
line = line[:-1]
|
||||
print('\b \b', end='', flush=True)
|
||||
continue
|
||||
if ch == b'\x1b': # ESC 取消
|
||||
print(" [取消]")
|
||||
return ""
|
||||
try:
|
||||
c = ch.decode("utf-8")
|
||||
line += c
|
||||
print(c, end='', flush=True)
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
|
||||
|
||||
def _handle_command(ser, codes, line, cfg, last_key):
|
||||
"""处理命令行输入"""
|
||||
if not line:
|
||||
return
|
||||
|
||||
parts = line.strip().split()
|
||||
if not parts:
|
||||
return
|
||||
|
||||
cmd = parts[0].lower()
|
||||
|
||||
if cmd in ('q', 'quit', 'exit'):
|
||||
raise KeyboardInterrupt()
|
||||
|
||||
if cmd in ('h', 'help', '?'):
|
||||
print(INTERACTIVE_HELP)
|
||||
return
|
||||
|
||||
if cmd == 'keys':
|
||||
print("已存按键:")
|
||||
for k, v in codes.items():
|
||||
marker = " ← 上次" if k == last_key else ""
|
||||
print(f" {k:<12} → {v.hex(' ').upper()}{marker}")
|
||||
return
|
||||
|
||||
if cmd == 'learn' and len(parts) >= 3:
|
||||
try:
|
||||
data = bytes.fromhex(parts[2].replace(" ", ""))
|
||||
codes[parts[1]] = data
|
||||
print(f" ✓ {parts[1]} → {data.hex(' ').upper()}")
|
||||
except ValueError:
|
||||
print(" ✗ HEX 格式错误")
|
||||
return
|
||||
|
||||
if cmd == 'raw' and len(parts) >= 2:
|
||||
hex_str = parts[1].replace(" ", "")
|
||||
if len(hex_str) != 6:
|
||||
print(f" ✗ 红外码应为 6 位 hex (3字节)")
|
||||
return
|
||||
try:
|
||||
data = bytes.fromhex(hex_str)
|
||||
send_ir(ser, data, cfg["repeat"])
|
||||
except ValueError:
|
||||
print(" ✗ HEX 格式错误")
|
||||
return
|
||||
|
||||
if cmd == 'repeat' and len(parts) >= 2:
|
||||
try:
|
||||
cfg["repeat"] = int(parts[1])
|
||||
print(f" ✓ 重复次数 = {cfg['repeat']}")
|
||||
except ValueError:
|
||||
print(" 用法: repeat <次数>")
|
||||
return
|
||||
|
||||
# 命名按键
|
||||
if cmd in codes:
|
||||
send_ir(ser, codes[cmd], cfg["repeat"])
|
||||
else:
|
||||
print(f" ✗ 未知命令: {cmd}")
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# 命令行入口
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="红外收发板 — 投影仪遥控 (A1 F1 协议)",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
示例:
|
||||
%(prog)s -p COM8 up 发送"上"键
|
||||
%(prog)s -p COM8 down 发送"下"键
|
||||
%(prog)s -p COM8 --raw 3BC40D 发送原始红外码
|
||||
%(prog)s -p COM8 -i 交互模式 (w/s/a/d 方向键控制)
|
||||
%(prog)s -p COM8 -i -n 3 交互模式, 每键重复3次
|
||||
""",
|
||||
)
|
||||
parser.add_argument("key", nargs="?", default=None,
|
||||
help="按键名 (up/down/left/right)")
|
||||
parser.add_argument("-p", "--port", default=None, help="串口 (如 COM8)")
|
||||
parser.add_argument("-b", "--baudrate", type=int, default=DEFAULT_BAUDRATE,
|
||||
help=f"波特率 (默认 {DEFAULT_BAUDRATE})")
|
||||
parser.add_argument("--raw", metavar="HEX", help="发送原始 3 字节红外码")
|
||||
parser.add_argument("-n", "--repeat", type=int, default=1,
|
||||
help="重复发送次数 (默认 1)")
|
||||
parser.add_argument("-i", "--interactive", action="store_true",
|
||||
help="交互模式 (w/s/a/d 方向键控制)")
|
||||
parser.add_argument("--list", action="store_true", help="列出所有已知按键")
|
||||
parser.add_argument("--set-addr", metavar="HEX", help="修改通信地址 (如 A5)")
|
||||
parser.add_argument("--set-baud", metavar="CODE", help="修改波特率编号")
|
||||
args = parser.parse_args()
|
||||
|
||||
# 确定端口
|
||||
port = args.port or find_port()
|
||||
if not port:
|
||||
print("✗ 未找到串口。请用 -p 指定。")
|
||||
return
|
||||
|
||||
# 列出按键
|
||||
if args.list:
|
||||
print("已知红外码:")
|
||||
for key, code in IR_CODES.items():
|
||||
print(f" {key:<12} → {code.hex(' ').upper()}")
|
||||
return
|
||||
|
||||
# 配置命令
|
||||
if args.set_addr:
|
||||
addr = int(args.set_addr, 16)
|
||||
frame = build_set_addr_frame(addr)
|
||||
ser = serial.Serial(port, baudrate=DEFAULT_BAUDRATE, timeout=0.5)
|
||||
ser.write(frame)
|
||||
ser.close()
|
||||
print(f"✓ 已发送修改地址: {frame.hex(' ').upper()}")
|
||||
return
|
||||
|
||||
if args.set_baud:
|
||||
code = int(args.set_baud)
|
||||
frame = build_set_baud_frame(code)
|
||||
ser = serial.Serial(port, baudrate=DEFAULT_BAUDRATE, timeout=0.5)
|
||||
ser.write(frame)
|
||||
ser.close()
|
||||
print(f"✓ 已发送修改波特率: {frame.hex(' ').upper()}")
|
||||
return
|
||||
|
||||
# 交互模式
|
||||
if args.interactive:
|
||||
run_interactive(port, args.repeat)
|
||||
return
|
||||
|
||||
# 单次发送
|
||||
if args.raw:
|
||||
send_raw_ir(port, args.raw, args.repeat)
|
||||
elif args.key:
|
||||
send_named(port, args.key, args.repeat)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user