Initial proof-of-concept commit

This commit is contained in:
Keith Solomon
2026-06-06 10:23:49 -05:00
commit 122358286d
6 changed files with 1420 additions and 0 deletions
+201
View File
@@ -0,0 +1,201 @@
import argparse
import json
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib import request, error
def getUtcTimestamp():
return datetime.now(timezone.utc).isoformat()
def extractJson(output):
start = output.find("{")
end = output.rfind("}")
if start == -1 or end == -1 or end <= start:
raise ValueError(f"No JSON object found in CLI output:\n{output}")
return json.loads(output[start:end + 1])
def runThermoproCli(cliPath):
command = [
sys.executable,
str(cliPath),
"temps",
"--json",
]
result = subprocess.run(
command,
capture_output=True,
text=True,
check=False,
)
combinedOutput = "\n".join(
part for part in [result.stdout, result.stderr] if part.strip()
)
if result.returncode != 0:
raise RuntimeError(
f"thermopro_cli.py exited with code {result.returncode}:\n{combinedOutput}"
)
return extractJson(combinedOutput)
def normalizeReading(rawReading, deviceId, macAddress):
temperatures = rawReading.get("temperatures", [])
probeNames = rawReading.get("probe_names", [])
probeCount = rawReading.get("probe_count", len(temperatures))
probes = []
for index in range(probeCount):
rawTemperature = temperatures[index] if index < len(temperatures) else -999.0
name = probeNames[index] if index < len(probeNames) else f"Probe {index + 1}"
isConnected = rawTemperature != -999.0
probes.append({
"id": index + 1,
"name": name,
"temperature": rawTemperature if isConnected else None,
"connected": isConnected,
})
return {
"deviceId": deviceId,
"device": "ThermoPro TP930",
"mac": macAddress,
"connected": bool(rawReading.get("connected", False)),
"battery": rawReading.get("battery"),
"unit": rawReading.get("unit"),
"probeCount": probeCount,
"probes": probes,
"readingTime": rawReading.get("last_update"),
"bridgeTime": getUtcTimestamp(),
"source": "thermopro_cli",
}
def postReading(endpoint, token, payload):
body = json.dumps(payload).encode("utf-8")
headers = {
"Content-Type": "application/json",
"User-Agent": "thermopro-bridge-poc/0.1",
}
if token:
headers["X-Bridge-Token"] = token
req = request.Request(
endpoint,
data=body,
headers=headers,
method="POST",
)
try:
with request.urlopen(req, timeout=10) as response:
responseBody = response.read().decode("utf-8")
return response.status, responseBody
except error.HTTPError as e:
responseBody = e.read().decode("utf-8")
raise RuntimeError(f"POST failed with HTTP {e.code}: {responseBody}") from e
except error.URLError as e:
raise RuntimeError(f"POST failed: {e}") from e
def main():
parser = argparse.ArgumentParser(
description="ThermoPro TP930 proof-of-concept bridge"
)
parser.add_argument(
"--cli",
default="thermopro_cli.py",
help="Path to thermopro_cli.py",
)
parser.add_argument(
"--endpoint",
default="http://127.0.0.1:8000/api/thermopro/readings",
help="API endpoint to POST readings to",
)
parser.add_argument(
"--token",
default="dev-secret",
help="Shared token sent as X-Bridge-Token",
)
parser.add_argument(
"--device-id",
default="tp930-smoker",
help="Stable ID for this thermometer/bridge",
)
parser.add_argument(
"--mac",
default="C9:48:1D:B1:E1:E7",
help="ThermoPro BLE MAC address",
)
parser.add_argument(
"--interval",
type=int,
default=15,
help="Polling interval in seconds",
)
parser.add_argument(
"--once",
action="store_true",
help="Read and POST once, then exit",
)
args = parser.parse_args()
cliPath = Path(args.cli)
if not cliPath.exists():
print(f"Could not find {cliPath}", file=sys.stderr)
sys.exit(1)
while True:
try:
rawReading = runThermoproCli(cliPath)
payload = normalizeReading(rawReading, args.device_id, args.mac)
status, responseBody = postReading(args.endpoint, args.token, payload)
connectedProbes = [
probe for probe in payload["probes"] if probe["connected"]
]
print(
f"[{payload['bridgeTime']}] POST {status} - "
f"battery={payload['battery']}%, "
f"connected_probes={len(connectedProbes)}"
)
print(json.dumps(payload, indent=2))
if responseBody.strip():
print(responseBody)
except Exception as e:
print(f"[{getUtcTimestamp()}] ERROR: {e}", file=sys.stderr)
if args.once:
break
time.sleep(args.interval)
if __name__ == "__main__":
main()