feat: add MQTT subscriber for ESP32 camera status ingestion

Implements MQTT subscriber (internal/mqtt/subscriber.go) that:
- Connects to Mosquitto broker with auto-reconnect
- Subscribes to remoterig/cameras/+/status, +/heartbeat, +/announce
- Parses and validates incoming messages per MQTT contract
- Inserts status_logs with duplicate prevention
- Auto-detects recording state changes and manages recording_events
- Broadcasts camera status changes via SSE hub
- Camera auto-registration via announce (MAC-based, sequential cam-NNN)
- Heartbeat watchdog marks cameras offline after 120s silence
- Wired into main.go with graceful degradation (warns if broker unreachable)

Dependency: github.com/eclipse/paho.mqtt.golang v1.5.0

Closes CUB-232.
This commit is contained in:
2026-05-21 21:16:08 +00:00
parent ce188086cb
commit f200cd9782
3 changed files with 443 additions and 0 deletions
+434
View File
@@ -0,0 +1,434 @@
// Package mqtt provides the MQTT subscriber that ingests ESP32 camera status
// via Mosquitto broker and persists to SQLite with SSE fan-out.
package mqtt
import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/cubecraft/remoterig/internal/db"
"github.com/cubecraft/remoterig/internal/events"
"github.com/cubecraft/remoterig/pkg/models"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// Subscriber connects to Mosquitto, subscribes to camera topics, and
// processes incoming status, heartbeat, and announce messages.
type Subscriber struct {
mu sync.Mutex
broker string
clientID string
client mqtt.Client
db *db.DB
hub *events.Hub
// Heartbeat tracking: last heartbeat time per camera_id
heartbeats map[string]time.Time
// Shutdown
done chan struct{}
}
// NewSubscriber creates a new MQTT subscriber (does not connect yet).
func NewSubscriber(broker, clientID string, database *db.DB, sseHub *events.Hub) *Subscriber {
return &Subscriber{
broker: broker,
clientID: clientID,
db: database,
hub: sseHub,
heartbeats: make(map[string]time.Time),
done: make(chan struct{}),
}
}
// Connect establishes the MQTT connection and subscribes to topics.
// Returns an error if the initial connection fails.
func (s *Subscriber) Connect() error {
opts := mqtt.NewClientOptions().
AddBroker(fmt.Sprintf("tcp://%s", s.broker)).
SetClientID(s.clientID).
SetCleanSession(true).
SetAutoReconnect(true).
SetMaxReconnectInterval(30 * time.Second).
SetConnectRetry(true).
SetConnectRetryInterval(5 * time.Second).
SetOnConnectHandler(s.onConnect).
SetDefaultPublishHandler(s.onMessage).
SetConnectionLostHandler(func(c mqtt.Client, err error) {
log.Printf("MQTT connection lost: %v (will auto-reconnect)", err)
})
s.client = mqtt.NewClient(opts)
token := s.client.Connect()
if !token.WaitTimeout(10 * time.Second) {
return fmt.Errorf("mqtt connect timeout")
}
if err := token.Error(); err != nil {
return fmt.Errorf("mqtt connect: %w", err)
}
log.Printf("MQTT connected to %s as %s", s.broker, s.clientID)
// Start heartbeat watchdog (runs independently)
go s.heartbeatWatchdog()
return nil
}
// onConnect is called after every (re)connection. It re-subscribes to topics.
func (s *Subscriber) onConnect(c mqtt.Client) {
topics := map[string]byte{
"remoterig/cameras/+/status": 1, // QoS 1
"remoterig/cameras/+/heartbeat": 1, // QoS 1
"remoterig/cameras/+/announce": 2, // QoS 2
"remoterig/cameras/+/command": 2, // QoS 2 (hub publishes, ESP32 receives)
}
token := c.SubscribeMultiple(topics, nil)
if token.WaitTimeout(5 * time.Second) {
if err := token.Error(); err != nil {
log.Printf("MQTT subscribe error: %v", err)
} else {
log.Printf("MQTT subscribed to %d topics", len(topics))
}
} else {
log.Println("MQTT subscribe timeout")
}
}
// onMessage is the default message handler. It routes by topic.
func (s *Subscriber) onMessage(c mqtt.Client, msg mqtt.Message) {
topic := msg.Topic()
payload := msg.Payload()
// Extract camera_id from topic: remoterig/cameras/<camera_id>/<type>
cameraID := extractCameraID(topic)
if cameraID == "" {
log.Printf("MQTT: could not extract camera_id from topic %q", topic)
return
}
switch {
case strings.HasSuffix(topic, "/status"):
s.handleStatus(cameraID, payload)
case strings.HasSuffix(topic, "/heartbeat"):
s.handleHeartbeat(cameraID, payload)
case strings.HasSuffix(topic, "/announce"):
s.handleAnnounce(cameraID, payload)
default:
log.Printf("MQTT: unhandled topic %q", topic)
}
}
// ── Status handler ──────────────────────────────────────────────────────
// statusPayload matches the MQTT contract status message.
type statusPayload struct {
CameraID string `json:"camera_id"`
Timestamp string `json:"timestamp"`
BatteryPct *int `json:"battery_pct"`
BatteryRaw *int `json:"battery_raw"`
VideoRemainingSec *int `json:"video_remaining_sec"`
Recording bool `json:"recording"`
Mode *string `json:"mode"`
Resolution *string `json:"resolution"`
FPS *int `json:"fps"`
Online bool `json:"online"`
RSSI *int `json:"rssi"`
UptimeSec *int `json:"uptime_sec"`
}
func (s *Subscriber) handleStatus(cameraID string, payload []byte) {
var sp statusPayload
if err := json.Unmarshal(payload, &sp); err != nil {
log.Printf("MQTT status parse error for %s: %v", cameraID, err)
return
}
// Validate required fields
if sp.CameraID == "" || sp.Timestamp == "" {
log.Printf("MQTT status missing required fields (camera_id, timestamp) from %s", cameraID)
return
}
// Validate timestamp sanity (reject >5min future, >24h past)
ts, err := time.Parse(time.RFC3339, sp.Timestamp)
if err != nil {
// Try ISO8601 without timezone
ts, err = time.Parse("2006-01-02T15:04:05", sp.Timestamp)
if err != nil {
log.Printf("MQTT status invalid timestamp %q from %s", sp.Timestamp, cameraID)
return
}
}
now := time.Now()
if ts.After(now.Add(5 * time.Minute)) {
log.Printf("MQTT status timestamp too far in future (%s) from %s — using now", ts, cameraID)
ts = now
}
if ts.Before(now.Add(-24 * time.Hour)) {
log.Printf("MQTT status timestamp too far in past (%s) from %s — using now", ts, cameraID)
ts = now
}
// Clamp battery_pct to 0-100
batteryPct := sp.BatteryPct
if batteryPct != nil {
if *batteryPct < 0 {
v := 0
batteryPct = &v
}
if *batteryPct > 100 {
v := 100
batteryPct = &v
}
}
recordingState := 0
if sp.Recording {
recordingState = 1
}
onlineState := 0
if sp.Online {
onlineState = 1
}
// Detect recording state change by checking previous status
var prevRecording int
row := s.db.QueryRow(`
SELECT recording_state FROM status_logs
WHERE camera_id = ? AND recorded_at > datetime('now', '-120 seconds')
ORDER BY recorded_at DESC LIMIT 1
`, cameraID)
if err := row.Scan(&prevRecording); err != nil {
prevRecording = -1 // no previous status
}
// Insert status_log
_, err = s.db.Exec(`
INSERT INTO status_logs (camera_id, recorded_at, battery_pct,
video_remaining_sec, recording_state, mode, resolution, fps, online)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`, cameraID, ts, batteryPct, sp.VideoRemainingSec,
recordingState, stringPtr(sp.Mode), stringPtr(sp.Resolution),
intPtr(sp.FPS), onlineState)
if err != nil {
log.Printf("MQTT status insert error for %s: %v", cameraID, err)
return
}
// Handle recording state transitions
if prevRecording >= 0 && prevRecording != recordingState {
reason := "mqtt"
if recordingState == 1 {
// Started recording — open new recording_event
_, err = s.db.Exec(`
INSERT INTO recording_events (camera_id, started_at, reason)
VALUES (?, ?, ?)
`, cameraID, ts, reason)
if err != nil {
log.Printf("MQTT recording_event insert error for %s: %v", cameraID, err)
}
} else {
// Stopped recording — close most recent open event
_, err = s.db.Exec(`
UPDATE recording_events SET stopped_at = ?
WHERE camera_id = ? AND stopped_at IS NULL
ORDER BY started_at DESC LIMIT 1
`, ts, cameraID)
if err != nil {
log.Printf("MQTT recording_event close error for %s: %v", cameraID, err)
}
}
}
// Broadcast via SSE
cam, err := getCamera(s.db, cameraID)
if err == nil {
sl := models.StatusLog{
CameraID: cameraID,
RecordedAt: ts,
BatteryPct: batteryPct,
VideoRemainingSec: sp.VideoRemainingSec,
RecordingState: recordingState,
Mode: stringPtr(sp.Mode),
Resolution: stringPtr(sp.Resolution),
FPS: intPtr(sp.FPS),
Online: onlineState,
}
cs := models.NewCameraStatus(cam, sl)
s.hub.Broadcast("camera_status", cs)
}
}
// ── Heartbeat handler ───────────────────────────────────────────────────
type heartbeatPayload struct {
CameraID string `json:"camera_id"`
Timestamp string `json:"timestamp"`
UptimeSec *int `json:"uptime_sec"`
FreeHeap *int `json:"free_heap"`
}
func (s *Subscriber) handleHeartbeat(cameraID string, payload []byte) {
var hp heartbeatPayload
if err := json.Unmarshal(payload, &hp); err != nil {
log.Printf("MQTT heartbeat parse error for %s: %v", cameraID, err)
return
}
s.mu.Lock()
s.heartbeats[cameraID] = time.Now()
s.mu.Unlock()
}
// heartbeatWatchdog runs every 10 seconds and marks cameras offline if
// no heartbeat received in 120 seconds.
func (s *Subscriber) heartbeatWatchdog() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-s.done:
return
case <-ticker.C:
s.mu.Lock()
for cameraID, lastBeat := range s.heartbeats {
if time.Since(lastBeat) > 120*time.Second {
// Camera missed heartbeat — broadcast offline
cam, err := getCamera(s.db, cameraID)
if err == nil {
cs := models.CameraStatus{
CameraID: cameraID,
FriendlyName: cam.FriendlyName,
Online: false,
LastSeen: lastBeat,
}
s.hub.Broadcast("camera_offline", cs)
}
delete(s.heartbeats, cameraID)
}
}
s.mu.Unlock()
}
}
}
// ── Announce handler (auto-registration) ───────────────────────────────
type announcePayload struct {
MacAddress string `json:"mac_address"`
FirmwareVersion string `json:"firmware_version"`
Capabilities []string `json:"capabilities"`
FriendlyName string `json:"friendly_name"`
}
func (s *Subscriber) handleAnnounce(cameraID string, payload []byte) {
var ap announcePayload
if err := json.Unmarshal(payload, &ap); err != nil {
log.Printf("MQTT announce parse error for %s: %v", cameraID, err)
return
}
if ap.MacAddress == "" {
log.Printf("MQTT announce missing mac_address from %s", cameraID)
return
}
// Check if this MAC is already registered
var existingID string
err := s.db.QueryRow(
"SELECT camera_id FROM cameras WHERE mac_address = ?", ap.MacAddress,
).Scan(&existingID)
if err == nil {
// Already registered — just update friendly_name
_, err = s.db.Exec(
"UPDATE cameras SET friendly_name = ?, updated_at = datetime('now') WHERE camera_id = ?",
ap.FriendlyName, existingID,
)
if err != nil {
log.Printf("MQTT announce update error for %s: %v", existingID, err)
return
}
log.Printf("MQTT announce: camera %s (%s) re-connected", existingID, ap.FriendlyName)
} else {
// New camera — generate sequential cam-NNN ID
var maxID string
s.db.QueryRow("SELECT MAX(camera_id) FROM cameras").Scan(&maxID)
seq := 1
if maxID != "" {
fmt.Sscanf(maxID, "cam-%d", &seq)
seq++
}
newID := fmt.Sprintf("cam-%03d", seq)
_, err = s.db.Exec(`
INSERT INTO cameras (camera_id, friendly_name, mac_address, created_at, updated_at)
VALUES (?, ?, ?, datetime('now'), datetime('now'))
`, newID, ap.FriendlyName, ap.MacAddress)
if err != nil {
log.Printf("MQTT announce insert error for %s: %v", ap.MacAddress, err)
return
}
log.Printf("MQTT announce: new camera registered as %s (%s)", newID, ap.FriendlyName)
// Broadcast new camera via SSE
cam, err := getCamera(s.db, newID)
if err == nil {
s.hub.Broadcast("camera_registered", cam)
}
}
}
// ── Helpers ─────────────────────────────────────────────────────────────
// extractCameraID pulls <camera_id> from remoterig/cameras/<camera_id>/<type>
func extractCameraID(topic string) string {
parts := strings.Split(topic, "/")
if len(parts) >= 3 && parts[0] == "remoterig" && parts[1] == "cameras" {
return parts[2]
}
return ""
}
// getCamera fetches a camera by ID from the database.
func getCamera(db *db.DB, cameraID string) (models.Camera, error) {
var cam models.Camera
err := db.QueryRow(
"SELECT camera_id, friendly_name, COALESCE(mac_address,''), created_at, updated_at FROM cameras WHERE camera_id = ?",
cameraID,
).Scan(&cam.CameraID, &cam.FriendlyName, &cam.MacAddress, &cam.CreatedAt, &cam.UpdatedAt)
return cam, err
}
func stringPtr(s *string) *string {
if s == nil || *s == "" {
return nil
}
return s
}
func intPtr(i *int) *int {
if i == nil {
return nil
}
return i
}
// Close shuts down the MQTT subscriber.
func (s *Subscriber) Close() {
close(s.done)
if s.client != nil && s.client.IsConnected() {
s.client.Disconnect(250)
log.Println("MQTT disconnected")
}
}