992 lines
32 KiB
Python
992 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ThermoPro Bluetooth Thermometer CLI
|
|
Reverse-engineered protocol implementation for Linux
|
|
|
|
Usage:
|
|
thermopro_cli.py [--debug] scan
|
|
thermopro_cli.py [--debug] connect [--addr ADDRESS] [--probe-names NAMES]
|
|
thermopro_cli.py [--debug] temps [--addr ADDRESS] [--json] [--unit F|C] [--probe-names NAMES]
|
|
thermopro_cli.py [--debug] monitor [--addr ADDRESS] [--interval 1] [--json] [--unit F|C] [--probe-names NAMES]
|
|
thermopro_cli.py [--debug] mqtt --addr ADDRESS [--interval 5] [--unit F|C] [--broker HOST] [--probe-names NAMES]
|
|
|
|
If --addr is omitted (except for mqtt), the tool auto-scans for ThermoPro
|
|
devices and prompts for selection if multiple are found.
|
|
|
|
--probe-names accepts comma-separated labels (e.g. 'Brisket,Ambient,,Ribs').
|
|
Empty slots keep the default 'Probe N' naming.
|
|
|
|
Environment variables for MQTT (also read from .env file in script directory):
|
|
MQTT_BROKER - MQTT broker address
|
|
MQTT_PORT - MQTT broker port (default: 1883)
|
|
MQTT_USERNAME - MQTT username
|
|
MQTT_PASSWORD - MQTT password
|
|
"""
|
|
|
|
import asyncio
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import re
|
|
import signal
|
|
import sys
|
|
import time
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Optional, List
|
|
|
|
try:
|
|
from bleak import BleakClient, BleakScanner
|
|
except ImportError:
|
|
print("Error: bleak library not installed. Run: pip install bleak")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
import paho.mqtt.client as mqtt
|
|
|
|
try:
|
|
from paho.mqtt.enums import CallbackAPIVersion
|
|
|
|
MQTT_V2 = True
|
|
except ImportError:
|
|
MQTT_V2 = False
|
|
|
|
MQTT_AVAILABLE = True
|
|
except ImportError:
|
|
MQTT_AVAILABLE = False
|
|
|
|
# BLE UUIDs
|
|
SERVICE_UUID = "1086FFF0-3343-4817-8BB2-B32206336CE8"
|
|
NOTIFY_UUID = "1086FFF2-3343-4817-8BB2-B32206336CE8"
|
|
WRITE_UUID = "1086FFF1-3343-4817-8BB2-B32206336CE8"
|
|
|
|
# Sentinel values from decode_temperature() that are not real readings
|
|
TEMP_SENTINELS = {-999.0, -100.0, 666.0}
|
|
MAX_PROBES = 4
|
|
|
|
log = logging.getLogger("thermopro")
|
|
|
|
|
|
def is_valid_temp(t: float) -> bool:
|
|
return t not in TEMP_SENTINELS
|
|
|
|
|
|
def parse_probe_names(
|
|
raw: Optional[str], max_probes: int = MAX_PROBES
|
|
) -> List[Optional[str]]:
|
|
"""Parse comma-separated probe names. Empty slots become None."""
|
|
if not raw:
|
|
return [None] * max_probes
|
|
parts = raw.split(",")
|
|
if len(parts) > max_probes:
|
|
print(
|
|
f"Warning: {len(parts)} names provided but max is {max_probes}, "
|
|
f"ignoring extras",
|
|
file=sys.stderr,
|
|
)
|
|
parts = parts[:max_probes]
|
|
names = [(p.strip() or None) for p in parts]
|
|
names.extend([None] * (max_probes - len(names)))
|
|
return names
|
|
|
|
|
|
def get_probe_label(
|
|
probe_names: List[Optional[str]], index: int, compact: bool = False
|
|
) -> str:
|
|
"""Return display label for a probe — custom name or default."""
|
|
name = probe_names[index] if index < len(probe_names) else None
|
|
if name:
|
|
return name[:8] if compact else name
|
|
return f"P{index+1}" if compact else f"Probe {index+1}"
|
|
|
|
|
|
@dataclass
|
|
class RunConfig:
|
|
address: Optional[str] = None
|
|
unit: str = "F"
|
|
interval: int = 1
|
|
as_json: bool = False
|
|
probe_names: List[Optional[str]] = field(
|
|
default_factory=lambda: [None] * MAX_PROBES
|
|
)
|
|
broker: Optional[str] = None
|
|
port: int = 1883
|
|
username: Optional[str] = None
|
|
password: Optional[str] = None
|
|
device_name: str = "thermopro"
|
|
|
|
@classmethod
|
|
def from_args(cls, args) -> "RunConfig":
|
|
return cls(
|
|
address=getattr(args, "addr", None),
|
|
unit=(getattr(args, "unit", None) or "F").upper(),
|
|
interval=getattr(args, "interval", 1),
|
|
as_json=getattr(args, "json", False),
|
|
probe_names=parse_probe_names(getattr(args, "probe_names", None)),
|
|
broker=getattr(args, "broker", None) or os.environ.get("MQTT_BROKER"),
|
|
port=getattr(args, "port", 1883),
|
|
username=getattr(args, "username", None) or os.environ.get("MQTT_USERNAME"),
|
|
password=getattr(args, "password", None) or os.environ.get("MQTT_PASSWORD"),
|
|
device_name=getattr(args, "device_name", "thermopro"),
|
|
)
|
|
|
|
|
|
class ThermoproState:
|
|
def __init__(self):
|
|
self.battery = 0
|
|
self.device_unit = "C"
|
|
self.probe_count = 4
|
|
self.temperatures = [-999.0] * MAX_PROBES
|
|
self.last_update = None
|
|
self.connected = False
|
|
|
|
def get_temperatures(self, unit: str) -> list:
|
|
"""Return temperatures converted to the requested display unit."""
|
|
if self.device_unit == unit:
|
|
return self.temperatures[:]
|
|
temps = []
|
|
for t in self.temperatures:
|
|
if not is_valid_temp(t):
|
|
temps.append(t)
|
|
elif self.device_unit == "C" and unit == "F":
|
|
temps.append(t * 9.0 / 5.0 + 32.0)
|
|
elif self.device_unit == "F" and unit == "C":
|
|
temps.append((t - 32.0) * 5.0 / 9.0)
|
|
else:
|
|
temps.append(t)
|
|
return temps
|
|
|
|
def to_dict(self, unit: str):
|
|
temps = self.get_temperatures(unit)
|
|
return {
|
|
"battery": self.battery,
|
|
"unit": unit,
|
|
"probe_count": self.probe_count,
|
|
"temperatures": [round(t, 1) if is_valid_temp(t) else t for t in temps],
|
|
"last_update": self.last_update.isoformat() if self.last_update else None,
|
|
"connected": self.connected,
|
|
}
|
|
|
|
|
|
def decode_temperature(byte1: int, byte2: int) -> float:
|
|
"""
|
|
Decode temperature from 2-byte BCD format.
|
|
Based on q/c.java:88-99
|
|
"""
|
|
if byte1 == 0xFF and byte2 == 0xFF:
|
|
return -999.0
|
|
if byte1 == 0xDD and byte2 == 0xDD:
|
|
return -100.0
|
|
if byte1 == 0xEE and byte2 == 0xEE:
|
|
return 666.0
|
|
|
|
is_negative = (byte1 & 0x80) != 0
|
|
|
|
hundreds = ((byte1 & 0x70) // 16) * 100
|
|
tens = (byte1 & 0x0F) * 10
|
|
ones = (byte2 & 0xF0) // 16
|
|
decimal = (byte2 & 0x0F) * 0.1
|
|
|
|
temp = hundreds + tens + ones + decimal
|
|
|
|
if is_negative:
|
|
temp = -temp
|
|
|
|
return temp
|
|
|
|
|
|
def calculate_checksum(data: List[int]) -> int:
|
|
"""Calculate checksum for command (sum of all bytes & 0xFF)"""
|
|
return sum(data) & 0xFF
|
|
|
|
|
|
def create_handshake_command() -> bytes:
|
|
"""
|
|
Create handshake/init command (0x01).
|
|
Based on p/b.java:124-163
|
|
|
|
For TP25W, using a known-good handshake from Android app capture.
|
|
The exact random bytes don't seem critical for the protocol.
|
|
"""
|
|
return bytes.fromhex("01098a7a13b73ed68b67c2a0")
|
|
|
|
|
|
def create_timestamp_sync_command() -> bytes:
|
|
"""
|
|
Create timestamp sync command (0x28).
|
|
Based on p/b.java:165-169
|
|
"""
|
|
timestamp = int(time.time()) - 1577808000
|
|
cmd = [
|
|
0x28,
|
|
0x04,
|
|
(timestamp >> 24) & 0xFF,
|
|
(timestamp >> 16) & 0xFF,
|
|
(timestamp >> 8) & 0xFF,
|
|
timestamp & 0xFF,
|
|
]
|
|
checksum = calculate_checksum(cmd)
|
|
return bytes(cmd + [checksum])
|
|
|
|
|
|
class ThermoproClient:
|
|
def __init__(self, address: str):
|
|
self.address = address
|
|
self.client: Optional[BleakClient] = None
|
|
self.state = ThermoproState()
|
|
self.notification_event = asyncio.Event()
|
|
|
|
def parse_temperature_packet(self, data: bytearray):
|
|
"""
|
|
Parse temperature data from command 0x30.
|
|
Handles both TP920 and TP25W formats.
|
|
"""
|
|
if len(data) < 6:
|
|
return
|
|
|
|
self.state.battery = data[2]
|
|
self.state.device_unit = "C"
|
|
|
|
log.debug("0x30 raw: %s", " ".join(f"{b:02X}" for b in data))
|
|
log.debug("byte[3] (unit?): 0x%02X", data[3])
|
|
|
|
if data[4] == 0x00:
|
|
self.state.probe_count = 4
|
|
probe_offset = 5
|
|
else:
|
|
self.state.probe_count = data[4]
|
|
probe_offset = 5
|
|
|
|
temps = []
|
|
for i in range(min(self.state.probe_count, MAX_PROBES)):
|
|
offset = probe_offset + i * 2
|
|
if offset + 1 < len(data):
|
|
t1 = data[offset]
|
|
t2 = data[offset + 1]
|
|
temp = decode_temperature(t1, t2)
|
|
temps.append(temp)
|
|
|
|
while len(temps) < MAX_PROBES:
|
|
temps.append(-999.0)
|
|
|
|
self.state.temperatures = temps[:MAX_PROBES]
|
|
self.state.last_update = datetime.now()
|
|
self.notification_event.set()
|
|
|
|
async def notification_handler(self, sender, data: bytearray):
|
|
"""
|
|
Handle notifications from the device.
|
|
Based on q/c.java protocol parsing
|
|
"""
|
|
if len(data) < 2:
|
|
return
|
|
|
|
cmd = data[0]
|
|
|
|
if cmd == 0x30:
|
|
self.parse_temperature_packet(data)
|
|
elif cmd == 0x01:
|
|
if len(data) >= 4:
|
|
model_code = data[3] if len(data) > 3 else 0
|
|
log.debug("Device model code: 0x%02X", model_code)
|
|
self.notification_event.set()
|
|
elif cmd == 0x41:
|
|
if len(data) >= 3:
|
|
version_byte = data[2]
|
|
version = (version_byte // 16) + ((version_byte % 16) / 10.0)
|
|
log.debug("Device version: %s", version)
|
|
self.notification_event.set()
|
|
elif cmd == 0xE0:
|
|
pass
|
|
else:
|
|
self.notification_event.set()
|
|
|
|
async def connect(self) -> bool:
|
|
"""Connect to the thermometer and initialize."""
|
|
try:
|
|
log.debug("Connecting to %s...", self.address)
|
|
self.client = BleakClient(self.address)
|
|
await self.client.connect()
|
|
self.state.connected = True
|
|
log.debug("Connected!")
|
|
|
|
log.debug("Enabling notifications...")
|
|
await self.client.start_notify(NOTIFY_UUID, self.notification_handler)
|
|
await asyncio.sleep(0.5)
|
|
|
|
log.debug("Sending handshake...")
|
|
handshake = create_handshake_command()
|
|
await self.client.write_gatt_char(WRITE_UUID, handshake)
|
|
|
|
log.debug("Waiting for temperature data...")
|
|
await asyncio.sleep(2.0)
|
|
|
|
log.debug("Initialization complete!")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Connection failed: {e}", file=sys.stderr)
|
|
self.state.connected = False
|
|
return False
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from the thermometer"""
|
|
if self.client and self.client.is_connected:
|
|
await self.client.disconnect()
|
|
self.state.connected = False
|
|
|
|
async def wait_for_update(self, timeout: float = 5.0) -> bool:
|
|
"""Wait for a temperature update via notification"""
|
|
self.notification_event.clear()
|
|
try:
|
|
await asyncio.wait_for(self.notification_event.wait(), timeout)
|
|
return True
|
|
except asyncio.TimeoutError:
|
|
return False
|
|
|
|
|
|
def is_thermopro_device(name: str) -> bool:
|
|
return "thermo" in name.lower() or bool(re.match(r"^TP\d", name))
|
|
|
|
|
|
async def scan_thermopro_devices(timeout: float = 3.0) -> List:
|
|
"""Scan for ThermoPro BLE devices, deduplicated by address."""
|
|
print("Scanning for ThermoPro devices...", file=sys.stderr)
|
|
try:
|
|
devices = await BleakScanner.discover(timeout=timeout)
|
|
except Exception as e:
|
|
print(
|
|
f"Error: BLE scan failed. Is Bluetooth enabled?\nDetails: {e}",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
seen = {}
|
|
for d in devices:
|
|
if is_thermopro_device(d.name or "") and d.address not in seen:
|
|
seen[d.address] = d
|
|
return list(seen.values())
|
|
|
|
|
|
async def resolve_address(addr: Optional[str]) -> str:
|
|
"""Resolve device address: use provided value or auto-discover via scan."""
|
|
if addr:
|
|
return addr
|
|
|
|
thermopro_devices = await scan_thermopro_devices()
|
|
|
|
if not thermopro_devices:
|
|
print(
|
|
"Error: No ThermoPro devices found. Specify --addr manually.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
if len(thermopro_devices) == 1:
|
|
device = thermopro_devices[0]
|
|
print(f"Auto-selected: {device.name} ({device.address})", file=sys.stderr)
|
|
return device.address
|
|
|
|
if not sys.stdin.isatty():
|
|
print(
|
|
"Error: Multiple ThermoPro devices found but stdin is not interactive.\n"
|
|
"Specify --addr to select a device:",
|
|
file=sys.stderr,
|
|
)
|
|
for d in thermopro_devices:
|
|
print(f" {d.name} ({d.address})", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print(f"\nFound {len(thermopro_devices)} ThermoPro devices:\n", file=sys.stderr)
|
|
for i, device in enumerate(thermopro_devices, 1):
|
|
print(f" [{i}] {device.name} ({device.address})", file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
|
|
while True:
|
|
try:
|
|
sys.stderr.write(f"Select device [1-{len(thermopro_devices)}]: ")
|
|
sys.stderr.flush()
|
|
choice = input()
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\nAborted.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
index = int(choice) - 1
|
|
if 0 <= index < len(thermopro_devices):
|
|
selected = thermopro_devices[index]
|
|
print(
|
|
f"Selected: {selected.name} ({selected.address})",
|
|
file=sys.stderr,
|
|
)
|
|
return selected.address
|
|
except ValueError:
|
|
pass
|
|
|
|
print(
|
|
f"Invalid choice. Enter a number between 1 and {len(thermopro_devices)}.",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
|
|
def format_connect_output(
|
|
state: ThermoproState, probe_names: List[Optional[str]], unit: str
|
|
) -> str:
|
|
"""Multi-line diagnostic output for 'connect' command."""
|
|
lines = [
|
|
"\nReceived temperature update!",
|
|
f"Battery: {state.battery}%",
|
|
f"Unit: {unit}",
|
|
f"Probes: {state.probe_count}",
|
|
]
|
|
temps = state.get_temperatures(unit)
|
|
for i, temp in enumerate(temps[: state.probe_count]):
|
|
label = get_probe_label(probe_names, i)
|
|
if is_valid_temp(temp):
|
|
lines.append(f"{label}: {temp:.1f}°{unit} (connected)")
|
|
else:
|
|
lines.append(f"{label}: not connected")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_temps_output(
|
|
state: ThermoproState, probe_names: List[Optional[str]], unit: str
|
|
) -> str:
|
|
"""Multi-line output for 'temps' command."""
|
|
lines = [
|
|
f"Battery: {state.battery}%",
|
|
f"Unit: {unit}",
|
|
]
|
|
temps = state.get_temperatures(unit)
|
|
for i, temp in enumerate(temps[: state.probe_count]):
|
|
label = get_probe_label(probe_names, i)
|
|
if is_valid_temp(temp):
|
|
lines.append(f"{label}: {temp:.1f}°{unit}")
|
|
else:
|
|
lines.append(f"{label}: not connected")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_monitor_line(
|
|
state: ThermoproState, probe_names: List[Optional[str]], unit: str
|
|
) -> str:
|
|
"""Compact one-liner for monitor command."""
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
temps_str = []
|
|
temps = state.get_temperatures(unit)
|
|
for i, temp in enumerate(temps[: state.probe_count]):
|
|
label = get_probe_label(probe_names, i, compact=True)
|
|
if is_valid_temp(temp):
|
|
temps_str.append(f"{label}:{temp:.1f}°{unit}")
|
|
else:
|
|
temps_str.append(f"{label}:---")
|
|
return f"[{timestamp}] Battery:{state.battery:3d}% | {' | '.join(temps_str)}"
|
|
|
|
|
|
def format_json(
|
|
state: ThermoproState,
|
|
probe_names: List[Optional[str]],
|
|
unit: str,
|
|
extra: dict = None,
|
|
compact: bool = False,
|
|
) -> str:
|
|
"""JSON output with probe_names included."""
|
|
output = state.to_dict(unit)
|
|
output["probe_names"] = [
|
|
get_probe_label(probe_names, i) for i in range(state.probe_count)
|
|
]
|
|
if extra:
|
|
output.update(extra)
|
|
return json.dumps(output) if compact else json.dumps(output, indent=2)
|
|
|
|
|
|
async def cmd_scan(cfg: RunConfig) -> int:
|
|
"""Scan for ThermoPro devices"""
|
|
print("Scanning for Bluetooth devices...", file=sys.stderr)
|
|
try:
|
|
devices = await BleakScanner.discover(timeout=5.0)
|
|
except Exception as e:
|
|
print(
|
|
f"Error: BLE scan failed. Is Bluetooth enabled?\nDetails: {e}",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
seen = {}
|
|
for d in devices:
|
|
if is_thermopro_device(d.name or "") and d.address not in seen:
|
|
seen[d.address] = d
|
|
thermopro_devices = list(seen.values())
|
|
|
|
for device in thermopro_devices:
|
|
print(f"Found: {device.name} ({device.address})")
|
|
|
|
if not thermopro_devices:
|
|
print("\nNo ThermoPro devices found.", file=sys.stderr)
|
|
print("Showing all devices:", file=sys.stderr)
|
|
for device in devices:
|
|
print(f" {device.name or 'Unknown'} ({device.address})")
|
|
|
|
return 0 if thermopro_devices else 1
|
|
|
|
|
|
async def cmd_connect(cfg: RunConfig) -> int:
|
|
"""Test connection to a device"""
|
|
address = await resolve_address(cfg.address)
|
|
client = ThermoproClient(address)
|
|
|
|
try:
|
|
if not await client.connect():
|
|
return 1
|
|
|
|
if await client.wait_for_update(timeout=10.0):
|
|
print(format_connect_output(client.state, cfg.probe_names, cfg.unit))
|
|
return 0
|
|
else:
|
|
print("Timeout waiting for temperature data", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
|
|
async def cmd_temps(cfg: RunConfig) -> int:
|
|
"""Get current temperatures"""
|
|
address = await resolve_address(cfg.address)
|
|
client = ThermoproClient(address)
|
|
|
|
try:
|
|
if not await client.connect():
|
|
return 1
|
|
|
|
if await client.wait_for_update(timeout=10.0):
|
|
if cfg.as_json:
|
|
print(format_json(client.state, cfg.probe_names, cfg.unit))
|
|
else:
|
|
print(format_temps_output(client.state, cfg.probe_names, cfg.unit))
|
|
return 0
|
|
else:
|
|
print("Error: Timeout waiting for temperature data", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
|
|
async def connection_loop(address: str):
|
|
"""Yields connected ThermoproClient instances. Reconnects on failure with backoff."""
|
|
max_retry = 300
|
|
delay = 5
|
|
failures = 0
|
|
while True:
|
|
client = ThermoproClient(address)
|
|
try:
|
|
if not await client.connect():
|
|
failures += 1
|
|
delay = min(5 * (2 ** (failures - 1)), max_retry)
|
|
log.warning("Connection failed, retry in %ds", delay)
|
|
await asyncio.sleep(delay)
|
|
continue
|
|
failures = 0
|
|
delay = 5
|
|
yield client
|
|
except Exception as e:
|
|
failures += 1
|
|
delay = min(5 * (2 ** (failures - 1)), max_retry)
|
|
log.warning("%s, reconnecting in %ds", e, delay)
|
|
await asyncio.sleep(delay)
|
|
finally:
|
|
try:
|
|
await client.disconnect()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def cmd_monitor(cfg: RunConfig) -> int:
|
|
"""Monitor temperatures continuously"""
|
|
address = await resolve_address(cfg.address)
|
|
|
|
if not cfg.as_json:
|
|
print("Monitoring temperatures (Ctrl+C to stop)...\n", file=sys.stderr)
|
|
|
|
gen = connection_loop(address)
|
|
try:
|
|
async for client in gen:
|
|
while True:
|
|
if await client.wait_for_update(timeout=10.0):
|
|
if cfg.as_json:
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
print(
|
|
format_json(
|
|
client.state,
|
|
cfg.probe_names,
|
|
cfg.unit,
|
|
extra={"timestamp": timestamp},
|
|
compact=True,
|
|
)
|
|
)
|
|
else:
|
|
print(
|
|
format_monitor_line(client.state, cfg.probe_names, cfg.unit)
|
|
)
|
|
sys.stdout.flush()
|
|
else:
|
|
log.warning("No update received, reconnecting...")
|
|
break
|
|
await asyncio.sleep(cfg.interval)
|
|
except (KeyboardInterrupt, asyncio.CancelledError):
|
|
pass
|
|
finally:
|
|
await gen.aclose()
|
|
|
|
return 0
|
|
|
|
|
|
class MQTTPublisher:
|
|
"""MQTT publisher for Home Assistant integration"""
|
|
|
|
def __init__(
|
|
self,
|
|
broker: str,
|
|
port: int = 1883,
|
|
username: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
device_name: str = "thermopro",
|
|
discovery_prefix: str = "homeassistant",
|
|
):
|
|
self.broker = broker
|
|
self.port = port
|
|
self.username = username
|
|
self.password = password
|
|
self.device_name = device_name
|
|
self.discovery_prefix = discovery_prefix
|
|
if MQTT_V2:
|
|
self.client = mqtt.Client(CallbackAPIVersion.VERSION2)
|
|
else:
|
|
self.client = mqtt.Client()
|
|
self.connected = False
|
|
|
|
if username and password:
|
|
self.client.username_pw_set(username, password)
|
|
|
|
self.client.on_connect = self._on_connect
|
|
self.client.on_disconnect = self._on_disconnect
|
|
|
|
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
|
|
if reason_code == 0:
|
|
self.connected = True
|
|
print("Connected to MQTT broker", file=sys.stderr)
|
|
else:
|
|
print(f"Failed to connect to MQTT broker: {reason_code}", file=sys.stderr)
|
|
|
|
def _on_disconnect(self, client, userdata, *args):
|
|
# v1: (client, userdata, rc) — rc is args[0]
|
|
# v2: (client, userdata, flags, reason_code, properties) — rc is args[1]
|
|
rc = args[1] if len(args) > 1 else args[0] if args else 0
|
|
self.connected = False
|
|
if rc != 0:
|
|
print("Disconnected from MQTT broker", file=sys.stderr)
|
|
|
|
def connect(self):
|
|
"""Connect to MQTT broker"""
|
|
try:
|
|
self.client.connect(self.broker, self.port, 60)
|
|
self.client.loop_start()
|
|
for _ in range(50):
|
|
if self.connected:
|
|
return True
|
|
time.sleep(0.1)
|
|
return False
|
|
except Exception as e:
|
|
print(f"MQTT connection error: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from MQTT broker"""
|
|
self.client.loop_stop()
|
|
self.client.disconnect()
|
|
|
|
def publish_discovery(
|
|
self,
|
|
probe_num: int,
|
|
mac_address: str,
|
|
unit: str = "F",
|
|
probe_name: Optional[str] = None,
|
|
):
|
|
"""Publish Home Assistant MQTT discovery config for a probe"""
|
|
device_id = mac_address.replace(":", "").lower()
|
|
probe_id = f"{self.device_name}_probe{probe_num}"
|
|
display_name = probe_name or f"ThermoPro Probe {probe_num}"
|
|
|
|
device_info = {
|
|
"identifiers": [device_id],
|
|
"name": f"ThermoPro {self.device_name.upper()}",
|
|
"model": "TP25W",
|
|
"manufacturer": "ThermoPro",
|
|
}
|
|
|
|
temp_config = {
|
|
"name": display_name,
|
|
"unique_id": f"{device_id}_probe{probe_num}_temp",
|
|
"state_topic": f"{self.discovery_prefix}/sensor/{probe_id}/state",
|
|
"unit_of_measurement": f"°{unit}",
|
|
"device_class": "temperature",
|
|
"value_template": "{{ value_json.temperature }}",
|
|
"device": device_info,
|
|
}
|
|
|
|
config_topic = f"{self.discovery_prefix}/sensor/{probe_id}/config"
|
|
self.client.publish(config_topic, json.dumps(temp_config), retain=True)
|
|
|
|
def publish_battery_discovery(self, mac_address: str):
|
|
"""Publish Home Assistant MQTT discovery config for battery"""
|
|
device_id = mac_address.replace(":", "").lower()
|
|
sensor_id = f"{self.device_name}_battery"
|
|
|
|
device_info = {
|
|
"identifiers": [device_id],
|
|
"name": f"ThermoPro {self.device_name.upper()}",
|
|
"model": "TP25W",
|
|
"manufacturer": "ThermoPro",
|
|
}
|
|
|
|
battery_config = {
|
|
"name": f"ThermoPro Battery",
|
|
"unique_id": f"{device_id}_battery",
|
|
"state_topic": f"{self.discovery_prefix}/sensor/{sensor_id}/state",
|
|
"unit_of_measurement": "%",
|
|
"device_class": "battery",
|
|
"value_template": "{{ value_json.battery }}",
|
|
"device": device_info,
|
|
}
|
|
|
|
config_topic = f"{self.discovery_prefix}/sensor/{sensor_id}/config"
|
|
self.client.publish(config_topic, json.dumps(battery_config), retain=True)
|
|
|
|
def publish_state(self, probe_num: int, temperature: float, connected: bool = True):
|
|
"""Publish temperature state for a probe"""
|
|
probe_id = f"{self.device_name}_probe{probe_num}"
|
|
state_topic = f"{self.discovery_prefix}/sensor/{probe_id}/state"
|
|
|
|
if not connected:
|
|
self.client.publish(state_topic, json.dumps({"temperature": None}))
|
|
else:
|
|
state = {
|
|
"temperature": round(temperature, 1),
|
|
}
|
|
self.client.publish(state_topic, json.dumps(state))
|
|
|
|
def publish_battery_state(self, battery_percent: int):
|
|
"""Publish battery state"""
|
|
sensor_id = f"{self.device_name}_battery"
|
|
state_topic = f"{self.discovery_prefix}/sensor/{sensor_id}/state"
|
|
|
|
state = {
|
|
"battery": battery_percent,
|
|
}
|
|
self.client.publish(state_topic, json.dumps(state))
|
|
|
|
|
|
async def cmd_mqtt(cfg: RunConfig) -> int:
|
|
"""Monitor temperatures and publish to MQTT for Home Assistant with automatic reconnection"""
|
|
if not MQTT_AVAILABLE:
|
|
print(
|
|
"Error: paho-mqtt library not installed. Run: pip install paho-mqtt",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
if not cfg.broker:
|
|
print(
|
|
"Error: MQTT broker not specified. Use --broker or set MQTT_BROKER environment variable",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
mqtt_pub = MQTTPublisher(
|
|
cfg.broker, cfg.port, cfg.username, cfg.password, cfg.device_name
|
|
)
|
|
if not mqtt_pub.connect():
|
|
print("Error: Failed to connect to MQTT broker", file=sys.stderr)
|
|
return 1
|
|
|
|
gen = connection_loop(cfg.address)
|
|
try:
|
|
print(
|
|
f"Publishing to MQTT broker {cfg.broker}:{cfg.port} (Ctrl+C to stop)...\n",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
async for client in gen:
|
|
log.debug("Publishing Home Assistant discovery configs...")
|
|
for probe_num in range(1, MAX_PROBES + 1):
|
|
name = cfg.probe_names[probe_num - 1]
|
|
mqtt_pub.publish_discovery(
|
|
probe_num, cfg.address, cfg.unit, probe_name=name
|
|
)
|
|
mqtt_pub.publish_battery_discovery(cfg.address)
|
|
print("Connected to thermometer successfully!", file=sys.stderr)
|
|
|
|
while True:
|
|
if await client.wait_for_update(timeout=15.0):
|
|
temps = client.state.get_temperatures(cfg.unit)
|
|
for i, temp in enumerate(temps[:MAX_PROBES]):
|
|
probe_num = i + 1
|
|
connected = is_valid_temp(temp)
|
|
mqtt_pub.publish_state(probe_num, temp, connected)
|
|
log.debug(
|
|
"Published Probe %d: %s",
|
|
probe_num,
|
|
f"{temp:.1f}°{cfg.unit}" if connected else "disconnected",
|
|
)
|
|
|
|
mqtt_pub.publish_battery_state(client.state.battery)
|
|
log.debug("Published Battery: %d%%", client.state.battery)
|
|
|
|
print(
|
|
format_monitor_line(client.state, cfg.probe_names, cfg.unit),
|
|
file=sys.stderr,
|
|
)
|
|
else:
|
|
log.warning("No update from device, reconnecting...")
|
|
break
|
|
|
|
await asyncio.sleep(cfg.interval)
|
|
|
|
except (KeyboardInterrupt, asyncio.CancelledError):
|
|
pass
|
|
finally:
|
|
await gen.aclose()
|
|
mqtt_pub.disconnect()
|
|
|
|
return 0
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="ThermoPro Bluetooth Thermometer CLI",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
|
|
parser.add_argument("--debug", action="store_true", help="Enable debug output")
|
|
|
|
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
|
|
|
|
# Scan
|
|
p = subparsers.add_parser("scan", help="Scan for ThermoPro devices")
|
|
p.set_defaults(func=cmd_scan)
|
|
|
|
# Connect
|
|
p = subparsers.add_parser("connect", help="Test connection to device")
|
|
p.add_argument("--addr", help="Bluetooth address (auto-scans if not provided)")
|
|
p.add_argument(
|
|
"--probe-names",
|
|
help="Comma-separated probe names (e.g. 'Brisket,Ambient,,Ribs')",
|
|
)
|
|
p.set_defaults(func=cmd_connect)
|
|
|
|
# Temps
|
|
p = subparsers.add_parser("temps", help="Get current temperatures")
|
|
p.add_argument("--addr", help="Bluetooth address (auto-scans if not provided)")
|
|
p.add_argument("--json", action="store_true", help="Output as JSON")
|
|
p.add_argument("--unit", choices=["C", "F"], help="Temperature unit")
|
|
p.add_argument(
|
|
"--probe-names",
|
|
help="Comma-separated probe names (e.g. 'Brisket,Ambient,,Ribs')",
|
|
)
|
|
p.set_defaults(func=cmd_temps)
|
|
|
|
# Monitor
|
|
p = subparsers.add_parser("monitor", help="Monitor temperatures continuously")
|
|
p.add_argument("--addr", help="Bluetooth address (auto-scans if not provided)")
|
|
p.add_argument("--interval", type=int, default=1, help="Update interval in seconds")
|
|
p.add_argument("--json", action="store_true", help="Output as JSON")
|
|
p.add_argument("--unit", choices=["C", "F"], help="Temperature unit")
|
|
p.add_argument(
|
|
"--probe-names",
|
|
help="Comma-separated probe names (e.g. 'Brisket,Ambient,,Ribs')",
|
|
)
|
|
p.set_defaults(func=cmd_monitor)
|
|
|
|
# MQTT
|
|
p = subparsers.add_parser(
|
|
"mqtt", help="Publish temperatures to MQTT for Home Assistant"
|
|
)
|
|
p.add_argument("--addr", required=True, help="Bluetooth address")
|
|
p.add_argument(
|
|
"--interval",
|
|
type=int,
|
|
default=5,
|
|
help="Update interval in seconds (default: 5)",
|
|
)
|
|
p.add_argument("--unit", choices=["C", "F"], help="Temperature unit")
|
|
p.add_argument("--broker", help="MQTT broker address (or set MQTT_BROKER env var)")
|
|
p.add_argument(
|
|
"--port", type=int, default=1883, help="MQTT broker port (default: 1883)"
|
|
)
|
|
p.add_argument("--username", help="MQTT username (or set MQTT_USERNAME env var)")
|
|
p.add_argument("--password", help="MQTT password (or set MQTT_PASSWORD env var)")
|
|
p.add_argument(
|
|
"--device-name",
|
|
default="thermopro",
|
|
help="Device name for MQTT topics (default: thermopro)",
|
|
)
|
|
p.add_argument(
|
|
"--probe-names",
|
|
help="Comma-separated probe names (e.g. 'Brisket,Ambient,,Ribs')",
|
|
)
|
|
p.set_defaults(func=cmd_mqtt)
|
|
|
|
return parser
|
|
|
|
|
|
def load_dotenv():
|
|
"""Load .env file from the script's directory into os.environ (no override)."""
|
|
env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
|
|
try:
|
|
with open(env_path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
key = key.strip()
|
|
value = value.strip().strip("\"'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def main():
|
|
load_dotenv()
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
return 1
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.debug else logging.WARNING,
|
|
format="[%(levelname)s] %(message)s",
|
|
stream=sys.stderr,
|
|
)
|
|
|
|
cfg = RunConfig.from_args(args)
|
|
|
|
def _sigint_handler(sig, frame):
|
|
print("\nStopping...", file=sys.stderr)
|
|
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
|
raise KeyboardInterrupt
|
|
|
|
signal.signal(signal.SIGINT, _sigint_handler)
|
|
|
|
try:
|
|
return asyncio.run(args.func(cfg))
|
|
except KeyboardInterrupt:
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|