Files
2026-06-06 10:23:49 -05:00

992 lines
32 KiB
Python

#!/usr/bin/env python3
"""
ThermoPro Bluetooth Thermometer CLI
Reverse-engineered protocol implementation for Linux
Usage:
thermopro_cli.py [--debug] scan
thermopro_cli.py [--debug] connect [--addr ADDRESS] [--probe-names NAMES]
thermopro_cli.py [--debug] temps [--addr ADDRESS] [--json] [--unit F|C] [--probe-names NAMES]
thermopro_cli.py [--debug] monitor [--addr ADDRESS] [--interval 1] [--json] [--unit F|C] [--probe-names NAMES]
thermopro_cli.py [--debug] mqtt --addr ADDRESS [--interval 5] [--unit F|C] [--broker HOST] [--probe-names NAMES]
If --addr is omitted (except for mqtt), the tool auto-scans for ThermoPro
devices and prompts for selection if multiple are found.
--probe-names accepts comma-separated labels (e.g. 'Brisket,Ambient,,Ribs').
Empty slots keep the default 'Probe N' naming.
Environment variables for MQTT (also read from .env file in script directory):
MQTT_BROKER - MQTT broker address
MQTT_PORT - MQTT broker port (default: 1883)
MQTT_USERNAME - MQTT username
MQTT_PASSWORD - MQTT password
"""
import asyncio
import argparse
import json
import logging
import re
import signal
import sys
import time
import os
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List
try:
from bleak import BleakClient, BleakScanner
except ImportError:
print("Error: bleak library not installed. Run: pip install bleak")
sys.exit(1)
try:
import paho.mqtt.client as mqtt
try:
from paho.mqtt.enums import CallbackAPIVersion
MQTT_V2 = True
except ImportError:
MQTT_V2 = False
MQTT_AVAILABLE = True
except ImportError:
MQTT_AVAILABLE = False
# BLE UUIDs
SERVICE_UUID = "1086FFF0-3343-4817-8BB2-B32206336CE8"
NOTIFY_UUID = "1086FFF2-3343-4817-8BB2-B32206336CE8"
WRITE_UUID = "1086FFF1-3343-4817-8BB2-B32206336CE8"
# Sentinel values from decode_temperature() that are not real readings
TEMP_SENTINELS = {-999.0, -100.0, 666.0}
MAX_PROBES = 4
log = logging.getLogger("thermopro")
def is_valid_temp(t: float) -> bool:
return t not in TEMP_SENTINELS
def parse_probe_names(
raw: Optional[str], max_probes: int = MAX_PROBES
) -> List[Optional[str]]:
"""Parse comma-separated probe names. Empty slots become None."""
if not raw:
return [None] * max_probes
parts = raw.split(",")
if len(parts) > max_probes:
print(
f"Warning: {len(parts)} names provided but max is {max_probes}, "
f"ignoring extras",
file=sys.stderr,
)
parts = parts[:max_probes]
names = [(p.strip() or None) for p in parts]
names.extend([None] * (max_probes - len(names)))
return names
def get_probe_label(
probe_names: List[Optional[str]], index: int, compact: bool = False
) -> str:
"""Return display label for a probe — custom name or default."""
name = probe_names[index] if index < len(probe_names) else None
if name:
return name[:8] if compact else name
return f"P{index+1}" if compact else f"Probe {index+1}"
@dataclass
class RunConfig:
address: Optional[str] = None
unit: str = "F"
interval: int = 1
as_json: bool = False
probe_names: List[Optional[str]] = field(
default_factory=lambda: [None] * MAX_PROBES
)
broker: Optional[str] = None
port: int = 1883
username: Optional[str] = None
password: Optional[str] = None
device_name: str = "thermopro"
@classmethod
def from_args(cls, args) -> "RunConfig":
return cls(
address=getattr(args, "addr", None),
unit=(getattr(args, "unit", None) or "F").upper(),
interval=getattr(args, "interval", 1),
as_json=getattr(args, "json", False),
probe_names=parse_probe_names(getattr(args, "probe_names", None)),
broker=getattr(args, "broker", None) or os.environ.get("MQTT_BROKER"),
port=getattr(args, "port", 1883),
username=getattr(args, "username", None) or os.environ.get("MQTT_USERNAME"),
password=getattr(args, "password", None) or os.environ.get("MQTT_PASSWORD"),
device_name=getattr(args, "device_name", "thermopro"),
)
class ThermoproState:
def __init__(self):
self.battery = 0
self.device_unit = "C"
self.probe_count = 4
self.temperatures = [-999.0] * MAX_PROBES
self.last_update = None
self.connected = False
def get_temperatures(self, unit: str) -> list:
"""Return temperatures converted to the requested display unit."""
if self.device_unit == unit:
return self.temperatures[:]
temps = []
for t in self.temperatures:
if not is_valid_temp(t):
temps.append(t)
elif self.device_unit == "C" and unit == "F":
temps.append(t * 9.0 / 5.0 + 32.0)
elif self.device_unit == "F" and unit == "C":
temps.append((t - 32.0) * 5.0 / 9.0)
else:
temps.append(t)
return temps
def to_dict(self, unit: str):
temps = self.get_temperatures(unit)
return {
"battery": self.battery,
"unit": unit,
"probe_count": self.probe_count,
"temperatures": [round(t, 1) if is_valid_temp(t) else t for t in temps],
"last_update": self.last_update.isoformat() if self.last_update else None,
"connected": self.connected,
}
def decode_temperature(byte1: int, byte2: int) -> float:
"""
Decode temperature from 2-byte BCD format.
Based on q/c.java:88-99
"""
if byte1 == 0xFF and byte2 == 0xFF:
return -999.0
if byte1 == 0xDD and byte2 == 0xDD:
return -100.0
if byte1 == 0xEE and byte2 == 0xEE:
return 666.0
is_negative = (byte1 & 0x80) != 0
hundreds = ((byte1 & 0x70) // 16) * 100
tens = (byte1 & 0x0F) * 10
ones = (byte2 & 0xF0) // 16
decimal = (byte2 & 0x0F) * 0.1
temp = hundreds + tens + ones + decimal
if is_negative:
temp = -temp
return temp
def calculate_checksum(data: List[int]) -> int:
"""Calculate checksum for command (sum of all bytes & 0xFF)"""
return sum(data) & 0xFF
def create_handshake_command() -> bytes:
"""
Create handshake/init command (0x01).
Based on p/b.java:124-163
For TP25W, using a known-good handshake from Android app capture.
The exact random bytes don't seem critical for the protocol.
"""
return bytes.fromhex("01098a7a13b73ed68b67c2a0")
def create_timestamp_sync_command() -> bytes:
"""
Create timestamp sync command (0x28).
Based on p/b.java:165-169
"""
timestamp = int(time.time()) - 1577808000
cmd = [
0x28,
0x04,
(timestamp >> 24) & 0xFF,
(timestamp >> 16) & 0xFF,
(timestamp >> 8) & 0xFF,
timestamp & 0xFF,
]
checksum = calculate_checksum(cmd)
return bytes(cmd + [checksum])
class ThermoproClient:
def __init__(self, address: str):
self.address = address
self.client: Optional[BleakClient] = None
self.state = ThermoproState()
self.notification_event = asyncio.Event()
def parse_temperature_packet(self, data: bytearray):
"""
Parse temperature data from command 0x30.
Handles both TP920 and TP25W formats.
"""
if len(data) < 6:
return
self.state.battery = data[2]
self.state.device_unit = "C"
log.debug("0x30 raw: %s", " ".join(f"{b:02X}" for b in data))
log.debug("byte[3] (unit?): 0x%02X", data[3])
if data[4] == 0x00:
self.state.probe_count = 4
probe_offset = 5
else:
self.state.probe_count = data[4]
probe_offset = 5
temps = []
for i in range(min(self.state.probe_count, MAX_PROBES)):
offset = probe_offset + i * 2
if offset + 1 < len(data):
t1 = data[offset]
t2 = data[offset + 1]
temp = decode_temperature(t1, t2)
temps.append(temp)
while len(temps) < MAX_PROBES:
temps.append(-999.0)
self.state.temperatures = temps[:MAX_PROBES]
self.state.last_update = datetime.now()
self.notification_event.set()
async def notification_handler(self, sender, data: bytearray):
"""
Handle notifications from the device.
Based on q/c.java protocol parsing
"""
if len(data) < 2:
return
cmd = data[0]
if cmd == 0x30:
self.parse_temperature_packet(data)
elif cmd == 0x01:
if len(data) >= 4:
model_code = data[3] if len(data) > 3 else 0
log.debug("Device model code: 0x%02X", model_code)
self.notification_event.set()
elif cmd == 0x41:
if len(data) >= 3:
version_byte = data[2]
version = (version_byte // 16) + ((version_byte % 16) / 10.0)
log.debug("Device version: %s", version)
self.notification_event.set()
elif cmd == 0xE0:
pass
else:
self.notification_event.set()
async def connect(self) -> bool:
"""Connect to the thermometer and initialize."""
try:
log.debug("Connecting to %s...", self.address)
self.client = BleakClient(self.address)
await self.client.connect()
self.state.connected = True
log.debug("Connected!")
log.debug("Enabling notifications...")
await self.client.start_notify(NOTIFY_UUID, self.notification_handler)
await asyncio.sleep(0.5)
log.debug("Sending handshake...")
handshake = create_handshake_command()
await self.client.write_gatt_char(WRITE_UUID, handshake)
log.debug("Waiting for temperature data...")
await asyncio.sleep(2.0)
log.debug("Initialization complete!")
return True
except Exception as e:
print(f"Connection failed: {e}", file=sys.stderr)
self.state.connected = False
return False
async def disconnect(self):
"""Disconnect from the thermometer"""
if self.client and self.client.is_connected:
await self.client.disconnect()
self.state.connected = False
async def wait_for_update(self, timeout: float = 5.0) -> bool:
"""Wait for a temperature update via notification"""
self.notification_event.clear()
try:
await asyncio.wait_for(self.notification_event.wait(), timeout)
return True
except asyncio.TimeoutError:
return False
def is_thermopro_device(name: str) -> bool:
return "thermo" in name.lower() or bool(re.match(r"^TP\d", name))
async def scan_thermopro_devices(timeout: float = 3.0) -> List:
"""Scan for ThermoPro BLE devices, deduplicated by address."""
print("Scanning for ThermoPro devices...", file=sys.stderr)
try:
devices = await BleakScanner.discover(timeout=timeout)
except Exception as e:
print(
f"Error: BLE scan failed. Is Bluetooth enabled?\nDetails: {e}",
file=sys.stderr,
)
sys.exit(1)
seen = {}
for d in devices:
if is_thermopro_device(d.name or "") and d.address not in seen:
seen[d.address] = d
return list(seen.values())
async def resolve_address(addr: Optional[str]) -> str:
"""Resolve device address: use provided value or auto-discover via scan."""
if addr:
return addr
thermopro_devices = await scan_thermopro_devices()
if not thermopro_devices:
print(
"Error: No ThermoPro devices found. Specify --addr manually.",
file=sys.stderr,
)
sys.exit(1)
if len(thermopro_devices) == 1:
device = thermopro_devices[0]
print(f"Auto-selected: {device.name} ({device.address})", file=sys.stderr)
return device.address
if not sys.stdin.isatty():
print(
"Error: Multiple ThermoPro devices found but stdin is not interactive.\n"
"Specify --addr to select a device:",
file=sys.stderr,
)
for d in thermopro_devices:
print(f" {d.name} ({d.address})", file=sys.stderr)
sys.exit(1)
print(f"\nFound {len(thermopro_devices)} ThermoPro devices:\n", file=sys.stderr)
for i, device in enumerate(thermopro_devices, 1):
print(f" [{i}] {device.name} ({device.address})", file=sys.stderr)
print(file=sys.stderr)
while True:
try:
sys.stderr.write(f"Select device [1-{len(thermopro_devices)}]: ")
sys.stderr.flush()
choice = input()
except (EOFError, KeyboardInterrupt):
print("\nAborted.", file=sys.stderr)
sys.exit(1)
try:
index = int(choice) - 1
if 0 <= index < len(thermopro_devices):
selected = thermopro_devices[index]
print(
f"Selected: {selected.name} ({selected.address})",
file=sys.stderr,
)
return selected.address
except ValueError:
pass
print(
f"Invalid choice. Enter a number between 1 and {len(thermopro_devices)}.",
file=sys.stderr,
)
def format_connect_output(
state: ThermoproState, probe_names: List[Optional[str]], unit: str
) -> str:
"""Multi-line diagnostic output for 'connect' command."""
lines = [
"\nReceived temperature update!",
f"Battery: {state.battery}%",
f"Unit: {unit}",
f"Probes: {state.probe_count}",
]
temps = state.get_temperatures(unit)
for i, temp in enumerate(temps[: state.probe_count]):
label = get_probe_label(probe_names, i)
if is_valid_temp(temp):
lines.append(f"{label}: {temp:.1f}°{unit} (connected)")
else:
lines.append(f"{label}: not connected")
return "\n".join(lines)
def format_temps_output(
state: ThermoproState, probe_names: List[Optional[str]], unit: str
) -> str:
"""Multi-line output for 'temps' command."""
lines = [
f"Battery: {state.battery}%",
f"Unit: {unit}",
]
temps = state.get_temperatures(unit)
for i, temp in enumerate(temps[: state.probe_count]):
label = get_probe_label(probe_names, i)
if is_valid_temp(temp):
lines.append(f"{label}: {temp:.1f}°{unit}")
else:
lines.append(f"{label}: not connected")
return "\n".join(lines)
def format_monitor_line(
state: ThermoproState, probe_names: List[Optional[str]], unit: str
) -> str:
"""Compact one-liner for monitor command."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
temps_str = []
temps = state.get_temperatures(unit)
for i, temp in enumerate(temps[: state.probe_count]):
label = get_probe_label(probe_names, i, compact=True)
if is_valid_temp(temp):
temps_str.append(f"{label}:{temp:.1f}°{unit}")
else:
temps_str.append(f"{label}:---")
return f"[{timestamp}] Battery:{state.battery:3d}% | {' | '.join(temps_str)}"
def format_json(
state: ThermoproState,
probe_names: List[Optional[str]],
unit: str,
extra: dict = None,
compact: bool = False,
) -> str:
"""JSON output with probe_names included."""
output = state.to_dict(unit)
output["probe_names"] = [
get_probe_label(probe_names, i) for i in range(state.probe_count)
]
if extra:
output.update(extra)
return json.dumps(output) if compact else json.dumps(output, indent=2)
async def cmd_scan(cfg: RunConfig) -> int:
"""Scan for ThermoPro devices"""
print("Scanning for Bluetooth devices...", file=sys.stderr)
try:
devices = await BleakScanner.discover(timeout=5.0)
except Exception as e:
print(
f"Error: BLE scan failed. Is Bluetooth enabled?\nDetails: {e}",
file=sys.stderr,
)
return 1
seen = {}
for d in devices:
if is_thermopro_device(d.name or "") and d.address not in seen:
seen[d.address] = d
thermopro_devices = list(seen.values())
for device in thermopro_devices:
print(f"Found: {device.name} ({device.address})")
if not thermopro_devices:
print("\nNo ThermoPro devices found.", file=sys.stderr)
print("Showing all devices:", file=sys.stderr)
for device in devices:
print(f" {device.name or 'Unknown'} ({device.address})")
return 0 if thermopro_devices else 1
async def cmd_connect(cfg: RunConfig) -> int:
"""Test connection to a device"""
address = await resolve_address(cfg.address)
client = ThermoproClient(address)
try:
if not await client.connect():
return 1
if await client.wait_for_update(timeout=10.0):
print(format_connect_output(client.state, cfg.probe_names, cfg.unit))
return 0
else:
print("Timeout waiting for temperature data", file=sys.stderr)
return 1
finally:
await client.disconnect()
async def cmd_temps(cfg: RunConfig) -> int:
"""Get current temperatures"""
address = await resolve_address(cfg.address)
client = ThermoproClient(address)
try:
if not await client.connect():
return 1
if await client.wait_for_update(timeout=10.0):
if cfg.as_json:
print(format_json(client.state, cfg.probe_names, cfg.unit))
else:
print(format_temps_output(client.state, cfg.probe_names, cfg.unit))
return 0
else:
print("Error: Timeout waiting for temperature data", file=sys.stderr)
return 1
finally:
await client.disconnect()
async def connection_loop(address: str):
"""Yields connected ThermoproClient instances. Reconnects on failure with backoff."""
max_retry = 300
delay = 5
failures = 0
while True:
client = ThermoproClient(address)
try:
if not await client.connect():
failures += 1
delay = min(5 * (2 ** (failures - 1)), max_retry)
log.warning("Connection failed, retry in %ds", delay)
await asyncio.sleep(delay)
continue
failures = 0
delay = 5
yield client
except Exception as e:
failures += 1
delay = min(5 * (2 ** (failures - 1)), max_retry)
log.warning("%s, reconnecting in %ds", e, delay)
await asyncio.sleep(delay)
finally:
try:
await client.disconnect()
except Exception:
pass
async def cmd_monitor(cfg: RunConfig) -> int:
"""Monitor temperatures continuously"""
address = await resolve_address(cfg.address)
if not cfg.as_json:
print("Monitoring temperatures (Ctrl+C to stop)...\n", file=sys.stderr)
gen = connection_loop(address)
try:
async for client in gen:
while True:
if await client.wait_for_update(timeout=10.0):
if cfg.as_json:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(
format_json(
client.state,
cfg.probe_names,
cfg.unit,
extra={"timestamp": timestamp},
compact=True,
)
)
else:
print(
format_monitor_line(client.state, cfg.probe_names, cfg.unit)
)
sys.stdout.flush()
else:
log.warning("No update received, reconnecting...")
break
await asyncio.sleep(cfg.interval)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
await gen.aclose()
return 0
class MQTTPublisher:
"""MQTT publisher for Home Assistant integration"""
def __init__(
self,
broker: str,
port: int = 1883,
username: Optional[str] = None,
password: Optional[str] = None,
device_name: str = "thermopro",
discovery_prefix: str = "homeassistant",
):
self.broker = broker
self.port = port
self.username = username
self.password = password
self.device_name = device_name
self.discovery_prefix = discovery_prefix
if MQTT_V2:
self.client = mqtt.Client(CallbackAPIVersion.VERSION2)
else:
self.client = mqtt.Client()
self.connected = False
if username and password:
self.client.username_pw_set(username, password)
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
self.connected = True
print("Connected to MQTT broker", file=sys.stderr)
else:
print(f"Failed to connect to MQTT broker: {reason_code}", file=sys.stderr)
def _on_disconnect(self, client, userdata, *args):
# v1: (client, userdata, rc) — rc is args[0]
# v2: (client, userdata, flags, reason_code, properties) — rc is args[1]
rc = args[1] if len(args) > 1 else args[0] if args else 0
self.connected = False
if rc != 0:
print("Disconnected from MQTT broker", file=sys.stderr)
def connect(self):
"""Connect to MQTT broker"""
try:
self.client.connect(self.broker, self.port, 60)
self.client.loop_start()
for _ in range(50):
if self.connected:
return True
time.sleep(0.1)
return False
except Exception as e:
print(f"MQTT connection error: {e}", file=sys.stderr)
return False
def disconnect(self):
"""Disconnect from MQTT broker"""
self.client.loop_stop()
self.client.disconnect()
def publish_discovery(
self,
probe_num: int,
mac_address: str,
unit: str = "F",
probe_name: Optional[str] = None,
):
"""Publish Home Assistant MQTT discovery config for a probe"""
device_id = mac_address.replace(":", "").lower()
probe_id = f"{self.device_name}_probe{probe_num}"
display_name = probe_name or f"ThermoPro Probe {probe_num}"
device_info = {
"identifiers": [device_id],
"name": f"ThermoPro {self.device_name.upper()}",
"model": "TP25W",
"manufacturer": "ThermoPro",
}
temp_config = {
"name": display_name,
"unique_id": f"{device_id}_probe{probe_num}_temp",
"state_topic": f"{self.discovery_prefix}/sensor/{probe_id}/state",
"unit_of_measurement": f"°{unit}",
"device_class": "temperature",
"value_template": "{{ value_json.temperature }}",
"device": device_info,
}
config_topic = f"{self.discovery_prefix}/sensor/{probe_id}/config"
self.client.publish(config_topic, json.dumps(temp_config), retain=True)
def publish_battery_discovery(self, mac_address: str):
"""Publish Home Assistant MQTT discovery config for battery"""
device_id = mac_address.replace(":", "").lower()
sensor_id = f"{self.device_name}_battery"
device_info = {
"identifiers": [device_id],
"name": f"ThermoPro {self.device_name.upper()}",
"model": "TP25W",
"manufacturer": "ThermoPro",
}
battery_config = {
"name": f"ThermoPro Battery",
"unique_id": f"{device_id}_battery",
"state_topic": f"{self.discovery_prefix}/sensor/{sensor_id}/state",
"unit_of_measurement": "%",
"device_class": "battery",
"value_template": "{{ value_json.battery }}",
"device": device_info,
}
config_topic = f"{self.discovery_prefix}/sensor/{sensor_id}/config"
self.client.publish(config_topic, json.dumps(battery_config), retain=True)
def publish_state(self, probe_num: int, temperature: float, connected: bool = True):
"""Publish temperature state for a probe"""
probe_id = f"{self.device_name}_probe{probe_num}"
state_topic = f"{self.discovery_prefix}/sensor/{probe_id}/state"
if not connected:
self.client.publish(state_topic, json.dumps({"temperature": None}))
else:
state = {
"temperature": round(temperature, 1),
}
self.client.publish(state_topic, json.dumps(state))
def publish_battery_state(self, battery_percent: int):
"""Publish battery state"""
sensor_id = f"{self.device_name}_battery"
state_topic = f"{self.discovery_prefix}/sensor/{sensor_id}/state"
state = {
"battery": battery_percent,
}
self.client.publish(state_topic, json.dumps(state))
async def cmd_mqtt(cfg: RunConfig) -> int:
"""Monitor temperatures and publish to MQTT for Home Assistant with automatic reconnection"""
if not MQTT_AVAILABLE:
print(
"Error: paho-mqtt library not installed. Run: pip install paho-mqtt",
file=sys.stderr,
)
return 1
if not cfg.broker:
print(
"Error: MQTT broker not specified. Use --broker or set MQTT_BROKER environment variable",
file=sys.stderr,
)
return 1
mqtt_pub = MQTTPublisher(
cfg.broker, cfg.port, cfg.username, cfg.password, cfg.device_name
)
if not mqtt_pub.connect():
print("Error: Failed to connect to MQTT broker", file=sys.stderr)
return 1
gen = connection_loop(cfg.address)
try:
print(
f"Publishing to MQTT broker {cfg.broker}:{cfg.port} (Ctrl+C to stop)...\n",
file=sys.stderr,
)
async for client in gen:
log.debug("Publishing Home Assistant discovery configs...")
for probe_num in range(1, MAX_PROBES + 1):
name = cfg.probe_names[probe_num - 1]
mqtt_pub.publish_discovery(
probe_num, cfg.address, cfg.unit, probe_name=name
)
mqtt_pub.publish_battery_discovery(cfg.address)
print("Connected to thermometer successfully!", file=sys.stderr)
while True:
if await client.wait_for_update(timeout=15.0):
temps = client.state.get_temperatures(cfg.unit)
for i, temp in enumerate(temps[:MAX_PROBES]):
probe_num = i + 1
connected = is_valid_temp(temp)
mqtt_pub.publish_state(probe_num, temp, connected)
log.debug(
"Published Probe %d: %s",
probe_num,
f"{temp:.1f}°{cfg.unit}" if connected else "disconnected",
)
mqtt_pub.publish_battery_state(client.state.battery)
log.debug("Published Battery: %d%%", client.state.battery)
print(
format_monitor_line(client.state, cfg.probe_names, cfg.unit),
file=sys.stderr,
)
else:
log.warning("No update from device, reconnecting...")
break
await asyncio.sleep(cfg.interval)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
await gen.aclose()
mqtt_pub.disconnect()
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="ThermoPro Bluetooth Thermometer CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--debug", action="store_true", help="Enable debug output")
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
# Scan
p = subparsers.add_parser("scan", help="Scan for ThermoPro devices")
p.set_defaults(func=cmd_scan)
# Connect
p = subparsers.add_parser("connect", help="Test connection to device")
p.add_argument("--addr", help="Bluetooth address (auto-scans if not provided)")
p.add_argument(
"--probe-names",
help="Comma-separated probe names (e.g. 'Brisket,Ambient,,Ribs')",
)
p.set_defaults(func=cmd_connect)
# Temps
p = subparsers.add_parser("temps", help="Get current temperatures")
p.add_argument("--addr", help="Bluetooth address (auto-scans if not provided)")
p.add_argument("--json", action="store_true", help="Output as JSON")
p.add_argument("--unit", choices=["C", "F"], help="Temperature unit")
p.add_argument(
"--probe-names",
help="Comma-separated probe names (e.g. 'Brisket,Ambient,,Ribs')",
)
p.set_defaults(func=cmd_temps)
# Monitor
p = subparsers.add_parser("monitor", help="Monitor temperatures continuously")
p.add_argument("--addr", help="Bluetooth address (auto-scans if not provided)")
p.add_argument("--interval", type=int, default=1, help="Update interval in seconds")
p.add_argument("--json", action="store_true", help="Output as JSON")
p.add_argument("--unit", choices=["C", "F"], help="Temperature unit")
p.add_argument(
"--probe-names",
help="Comma-separated probe names (e.g. 'Brisket,Ambient,,Ribs')",
)
p.set_defaults(func=cmd_monitor)
# MQTT
p = subparsers.add_parser(
"mqtt", help="Publish temperatures to MQTT for Home Assistant"
)
p.add_argument("--addr", required=True, help="Bluetooth address")
p.add_argument(
"--interval",
type=int,
default=5,
help="Update interval in seconds (default: 5)",
)
p.add_argument("--unit", choices=["C", "F"], help="Temperature unit")
p.add_argument("--broker", help="MQTT broker address (or set MQTT_BROKER env var)")
p.add_argument(
"--port", type=int, default=1883, help="MQTT broker port (default: 1883)"
)
p.add_argument("--username", help="MQTT username (or set MQTT_USERNAME env var)")
p.add_argument("--password", help="MQTT password (or set MQTT_PASSWORD env var)")
p.add_argument(
"--device-name",
default="thermopro",
help="Device name for MQTT topics (default: thermopro)",
)
p.add_argument(
"--probe-names",
help="Comma-separated probe names (e.g. 'Brisket,Ambient,,Ribs')",
)
p.set_defaults(func=cmd_mqtt)
return parser
def load_dotenv():
"""Load .env file from the script's directory into os.environ (no override)."""
env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
try:
with open(env_path) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip("\"'")
if key and key not in os.environ:
os.environ[key] = value
except FileNotFoundError:
pass
def main():
load_dotenv()
parser = build_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="[%(levelname)s] %(message)s",
stream=sys.stderr,
)
cfg = RunConfig.from_args(args)
def _sigint_handler(sig, frame):
print("\nStopping...", file=sys.stderr)
signal.signal(signal.SIGINT, signal.SIG_DFL)
raise KeyboardInterrupt
signal.signal(signal.SIGINT, _sigint_handler)
try:
return asyncio.run(args.func(cfg))
except KeyboardInterrupt:
return 0
if __name__ == "__main__":
sys.exit(main())