Files
2026-06-06 10:23:49 -05:00

202 lines
5.1 KiB
Python

import argparse
import json
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib import request, error
def getUtcTimestamp():
return datetime.now(timezone.utc).isoformat()
def extractJson(output):
start = output.find("{")
end = output.rfind("}")
if start == -1 or end == -1 or end <= start:
raise ValueError(f"No JSON object found in CLI output:\n{output}")
return json.loads(output[start:end + 1])
def runThermoproCli(cliPath):
command = [
sys.executable,
str(cliPath),
"temps",
"--json",
]
result = subprocess.run(
command,
capture_output=True,
text=True,
check=False,
)
combinedOutput = "\n".join(
part for part in [result.stdout, result.stderr] if part.strip()
)
if result.returncode != 0:
raise RuntimeError(
f"thermopro_cli.py exited with code {result.returncode}:\n{combinedOutput}"
)
return extractJson(combinedOutput)
def normalizeReading(rawReading, deviceId, macAddress):
temperatures = rawReading.get("temperatures", [])
probeNames = rawReading.get("probe_names", [])
probeCount = rawReading.get("probe_count", len(temperatures))
probes = []
for index in range(probeCount):
rawTemperature = temperatures[index] if index < len(temperatures) else -999.0
name = probeNames[index] if index < len(probeNames) else f"Probe {index + 1}"
isConnected = rawTemperature != -999.0
probes.append({
"id": index + 1,
"name": name,
"temperature": rawTemperature if isConnected else None,
"connected": isConnected,
})
return {
"deviceId": deviceId,
"device": "ThermoPro TP930",
"mac": macAddress,
"connected": bool(rawReading.get("connected", False)),
"battery": rawReading.get("battery"),
"unit": rawReading.get("unit"),
"probeCount": probeCount,
"probes": probes,
"readingTime": rawReading.get("last_update"),
"bridgeTime": getUtcTimestamp(),
"source": "thermopro_cli",
}
def postReading(endpoint, token, payload):
body = json.dumps(payload).encode("utf-8")
headers = {
"Content-Type": "application/json",
"User-Agent": "thermopro-bridge-poc/0.1",
}
if token:
headers["X-Bridge-Token"] = token
req = request.Request(
endpoint,
data=body,
headers=headers,
method="POST",
)
try:
with request.urlopen(req, timeout=10) as response:
responseBody = response.read().decode("utf-8")
return response.status, responseBody
except error.HTTPError as e:
responseBody = e.read().decode("utf-8")
raise RuntimeError(f"POST failed with HTTP {e.code}: {responseBody}") from e
except error.URLError as e:
raise RuntimeError(f"POST failed: {e}") from e
def main():
parser = argparse.ArgumentParser(
description="ThermoPro TP930 proof-of-concept bridge"
)
parser.add_argument(
"--cli",
default="thermopro_cli.py",
help="Path to thermopro_cli.py",
)
parser.add_argument(
"--endpoint",
default="http://127.0.0.1:8000/api/thermopro/readings",
help="API endpoint to POST readings to",
)
parser.add_argument(
"--token",
default="dev-secret",
help="Shared token sent as X-Bridge-Token",
)
parser.add_argument(
"--device-id",
default="tp930-smoker",
help="Stable ID for this thermometer/bridge",
)
parser.add_argument(
"--mac",
default="C9:48:1D:B1:E1:E7",
help="ThermoPro BLE MAC address",
)
parser.add_argument(
"--interval",
type=int,
default=15,
help="Polling interval in seconds",
)
parser.add_argument(
"--once",
action="store_true",
help="Read and POST once, then exit",
)
args = parser.parse_args()
cliPath = Path(args.cli)
if not cliPath.exists():
print(f"Could not find {cliPath}", file=sys.stderr)
sys.exit(1)
while True:
try:
rawReading = runThermoproCli(cliPath)
payload = normalizeReading(rawReading, args.device_id, args.mac)
status, responseBody = postReading(args.endpoint, args.token, payload)
connectedProbes = [
probe for probe in payload["probes"] if probe["connected"]
]
print(
f"[{payload['bridgeTime']}] POST {status} - "
f"battery={payload['battery']}%, "
f"connected_probes={len(connectedProbes)}"
)
print(json.dumps(payload, indent=2))
if responseBody.strip():
print(responseBody)
except Exception as e:
print(f"[{getUtcTimestamp()}] ERROR: {e}", file=sys.stderr)
if args.once:
break
time.sleep(args.interval)
if __name__ == "__main__":
main()