From f200cd9782f3ad15c1d23bed65618cdf095f352c Mon Sep 17 00:00:00 2001 From: Hermes Date: Thu, 21 May 2026 21:16:08 +0000 Subject: [PATCH] feat: add MQTT subscriber for ESP32 camera status ingestion Implements MQTT subscriber (internal/mqtt/subscriber.go) that: - Connects to Mosquitto broker with auto-reconnect - Subscribes to remoterig/cameras/+/status, +/heartbeat, +/announce - Parses and validates incoming messages per MQTT contract - Inserts status_logs with duplicate prevention - Auto-detects recording state changes and manages recording_events - Broadcasts camera status changes via SSE hub - Camera auto-registration via announce (MAC-based, sequential cam-NNN) - Heartbeat watchdog marks cameras offline after 120s silence - Wired into main.go with graceful degradation (warns if broker unreachable) Dependency: github.com/eclipse/paho.mqtt.golang v1.5.0 Closes CUB-232. --- cmd/server/main.go | 8 + go.mod | 1 + internal/mqtt/subscriber.go | 434 ++++++++++++++++++++++++++++++++++++ 3 files changed, 443 insertions(+) create mode 100644 internal/mqtt/subscriber.go diff --git a/cmd/server/main.go b/cmd/server/main.go index d10f663..7ccbbc0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -15,6 +15,7 @@ import ( "github.com/cubecraft/remoterig/internal/auth" "github.com/cubecraft/remoterig/internal/db" "github.com/cubecraft/remoterig/internal/events" + "github.com/cubecraft/remoterig/internal/mqtt" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" @@ -59,6 +60,13 @@ func main() { // Create SSE hub for real-time updates sseHub := events.NewHub() + // Start MQTT subscriber for ESP32 camera status ingestion + mqttSub := mqtt.NewSubscriber(cfg.MQTT.Broker, cfg.MQTT.ClientID, sqlDB, sseHub) + if err := mqttSub.Connect(); err != nil { + log.Printf("WARNING: MQTT subscriber failed to connect: %v (running without MQTT)", err) + } + defer mqttSub.Close() + // Set up router r := chi.NewRouter() r.Use(middleware.RequestID) diff --git a/go.mod b/go.mod index 38b71cf..10dd764 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cubecraft/remoterig go 1.25.0 require ( + github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/go-chi/chi/v5 v5.2.5 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.50.1 diff --git a/internal/mqtt/subscriber.go b/internal/mqtt/subscriber.go new file mode 100644 index 0000000..4ea0480 --- /dev/null +++ b/internal/mqtt/subscriber.go @@ -0,0 +1,434 @@ +// Package mqtt provides the MQTT subscriber that ingests ESP32 camera status +// via Mosquitto broker and persists to SQLite with SSE fan-out. +package mqtt + +import ( + "encoding/json" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/cubecraft/remoterig/internal/db" + "github.com/cubecraft/remoterig/internal/events" + "github.com/cubecraft/remoterig/pkg/models" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// Subscriber connects to Mosquitto, subscribes to camera topics, and +// processes incoming status, heartbeat, and announce messages. +type Subscriber struct { + mu sync.Mutex + broker string + clientID string + client mqtt.Client + db *db.DB + hub *events.Hub + + // Heartbeat tracking: last heartbeat time per camera_id + heartbeats map[string]time.Time + + // Shutdown + done chan struct{} +} + +// NewSubscriber creates a new MQTT subscriber (does not connect yet). +func NewSubscriber(broker, clientID string, database *db.DB, sseHub *events.Hub) *Subscriber { + return &Subscriber{ + broker: broker, + clientID: clientID, + db: database, + hub: sseHub, + heartbeats: make(map[string]time.Time), + done: make(chan struct{}), + } +} + +// Connect establishes the MQTT connection and subscribes to topics. +// Returns an error if the initial connection fails. +func (s *Subscriber) Connect() error { + opts := mqtt.NewClientOptions(). + AddBroker(fmt.Sprintf("tcp://%s", s.broker)). + SetClientID(s.clientID). + SetCleanSession(true). + SetAutoReconnect(true). + SetMaxReconnectInterval(30 * time.Second). + SetConnectRetry(true). + SetConnectRetryInterval(5 * time.Second). + SetOnConnectHandler(s.onConnect). + SetDefaultPublishHandler(s.onMessage). + SetConnectionLostHandler(func(c mqtt.Client, err error) { + log.Printf("MQTT connection lost: %v (will auto-reconnect)", err) + }) + + s.client = mqtt.NewClient(opts) + token := s.client.Connect() + if !token.WaitTimeout(10 * time.Second) { + return fmt.Errorf("mqtt connect timeout") + } + if err := token.Error(); err != nil { + return fmt.Errorf("mqtt connect: %w", err) + } + + log.Printf("MQTT connected to %s as %s", s.broker, s.clientID) + + // Start heartbeat watchdog (runs independently) + go s.heartbeatWatchdog() + + return nil +} + +// onConnect is called after every (re)connection. It re-subscribes to topics. +func (s *Subscriber) onConnect(c mqtt.Client) { + topics := map[string]byte{ + "remoterig/cameras/+/status": 1, // QoS 1 + "remoterig/cameras/+/heartbeat": 1, // QoS 1 + "remoterig/cameras/+/announce": 2, // QoS 2 + "remoterig/cameras/+/command": 2, // QoS 2 (hub publishes, ESP32 receives) + } + + token := c.SubscribeMultiple(topics, nil) + if token.WaitTimeout(5 * time.Second) { + if err := token.Error(); err != nil { + log.Printf("MQTT subscribe error: %v", err) + } else { + log.Printf("MQTT subscribed to %d topics", len(topics)) + } + } else { + log.Println("MQTT subscribe timeout") + } +} + +// onMessage is the default message handler. It routes by topic. +func (s *Subscriber) onMessage(c mqtt.Client, msg mqtt.Message) { + topic := msg.Topic() + payload := msg.Payload() + + // Extract camera_id from topic: remoterig/cameras// + cameraID := extractCameraID(topic) + if cameraID == "" { + log.Printf("MQTT: could not extract camera_id from topic %q", topic) + return + } + + switch { + case strings.HasSuffix(topic, "/status"): + s.handleStatus(cameraID, payload) + case strings.HasSuffix(topic, "/heartbeat"): + s.handleHeartbeat(cameraID, payload) + case strings.HasSuffix(topic, "/announce"): + s.handleAnnounce(cameraID, payload) + default: + log.Printf("MQTT: unhandled topic %q", topic) + } +} + +// ── Status handler ────────────────────────────────────────────────────── + +// statusPayload matches the MQTT contract status message. +type statusPayload struct { + CameraID string `json:"camera_id"` + Timestamp string `json:"timestamp"` + BatteryPct *int `json:"battery_pct"` + BatteryRaw *int `json:"battery_raw"` + VideoRemainingSec *int `json:"video_remaining_sec"` + Recording bool `json:"recording"` + Mode *string `json:"mode"` + Resolution *string `json:"resolution"` + FPS *int `json:"fps"` + Online bool `json:"online"` + RSSI *int `json:"rssi"` + UptimeSec *int `json:"uptime_sec"` +} + +func (s *Subscriber) handleStatus(cameraID string, payload []byte) { + var sp statusPayload + if err := json.Unmarshal(payload, &sp); err != nil { + log.Printf("MQTT status parse error for %s: %v", cameraID, err) + return + } + + // Validate required fields + if sp.CameraID == "" || sp.Timestamp == "" { + log.Printf("MQTT status missing required fields (camera_id, timestamp) from %s", cameraID) + return + } + + // Validate timestamp sanity (reject >5min future, >24h past) + ts, err := time.Parse(time.RFC3339, sp.Timestamp) + if err != nil { + // Try ISO8601 without timezone + ts, err = time.Parse("2006-01-02T15:04:05", sp.Timestamp) + if err != nil { + log.Printf("MQTT status invalid timestamp %q from %s", sp.Timestamp, cameraID) + return + } + } + now := time.Now() + if ts.After(now.Add(5 * time.Minute)) { + log.Printf("MQTT status timestamp too far in future (%s) from %s — using now", ts, cameraID) + ts = now + } + if ts.Before(now.Add(-24 * time.Hour)) { + log.Printf("MQTT status timestamp too far in past (%s) from %s — using now", ts, cameraID) + ts = now + } + + // Clamp battery_pct to 0-100 + batteryPct := sp.BatteryPct + if batteryPct != nil { + if *batteryPct < 0 { + v := 0 + batteryPct = &v + } + if *batteryPct > 100 { + v := 100 + batteryPct = &v + } + } + + recordingState := 0 + if sp.Recording { + recordingState = 1 + } + onlineState := 0 + if sp.Online { + onlineState = 1 + } + + // Detect recording state change by checking previous status + var prevRecording int + row := s.db.QueryRow(` + SELECT recording_state FROM status_logs + WHERE camera_id = ? AND recorded_at > datetime('now', '-120 seconds') + ORDER BY recorded_at DESC LIMIT 1 + `, cameraID) + if err := row.Scan(&prevRecording); err != nil { + prevRecording = -1 // no previous status + } + + // Insert status_log + _, err = s.db.Exec(` + INSERT INTO status_logs (camera_id, recorded_at, battery_pct, + video_remaining_sec, recording_state, mode, resolution, fps, online) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `, cameraID, ts, batteryPct, sp.VideoRemainingSec, + recordingState, stringPtr(sp.Mode), stringPtr(sp.Resolution), + intPtr(sp.FPS), onlineState) + if err != nil { + log.Printf("MQTT status insert error for %s: %v", cameraID, err) + return + } + + // Handle recording state transitions + if prevRecording >= 0 && prevRecording != recordingState { + reason := "mqtt" + if recordingState == 1 { + // Started recording — open new recording_event + _, err = s.db.Exec(` + INSERT INTO recording_events (camera_id, started_at, reason) + VALUES (?, ?, ?) + `, cameraID, ts, reason) + if err != nil { + log.Printf("MQTT recording_event insert error for %s: %v", cameraID, err) + } + } else { + // Stopped recording — close most recent open event + _, err = s.db.Exec(` + UPDATE recording_events SET stopped_at = ? + WHERE camera_id = ? AND stopped_at IS NULL + ORDER BY started_at DESC LIMIT 1 + `, ts, cameraID) + if err != nil { + log.Printf("MQTT recording_event close error for %s: %v", cameraID, err) + } + } + } + + // Broadcast via SSE + cam, err := getCamera(s.db, cameraID) + if err == nil { + sl := models.StatusLog{ + CameraID: cameraID, + RecordedAt: ts, + BatteryPct: batteryPct, + VideoRemainingSec: sp.VideoRemainingSec, + RecordingState: recordingState, + Mode: stringPtr(sp.Mode), + Resolution: stringPtr(sp.Resolution), + FPS: intPtr(sp.FPS), + Online: onlineState, + } + cs := models.NewCameraStatus(cam, sl) + s.hub.Broadcast("camera_status", cs) + } +} + +// ── Heartbeat handler ─────────────────────────────────────────────────── + +type heartbeatPayload struct { + CameraID string `json:"camera_id"` + Timestamp string `json:"timestamp"` + UptimeSec *int `json:"uptime_sec"` + FreeHeap *int `json:"free_heap"` +} + +func (s *Subscriber) handleHeartbeat(cameraID string, payload []byte) { + var hp heartbeatPayload + if err := json.Unmarshal(payload, &hp); err != nil { + log.Printf("MQTT heartbeat parse error for %s: %v", cameraID, err) + return + } + + s.mu.Lock() + s.heartbeats[cameraID] = time.Now() + s.mu.Unlock() +} + +// heartbeatWatchdog runs every 10 seconds and marks cameras offline if +// no heartbeat received in 120 seconds. +func (s *Subscriber) heartbeatWatchdog() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-s.done: + return + case <-ticker.C: + s.mu.Lock() + for cameraID, lastBeat := range s.heartbeats { + if time.Since(lastBeat) > 120*time.Second { + // Camera missed heartbeat — broadcast offline + cam, err := getCamera(s.db, cameraID) + if err == nil { + cs := models.CameraStatus{ + CameraID: cameraID, + FriendlyName: cam.FriendlyName, + Online: false, + LastSeen: lastBeat, + } + s.hub.Broadcast("camera_offline", cs) + } + delete(s.heartbeats, cameraID) + } + } + s.mu.Unlock() + } + } +} + +// ── Announce handler (auto-registration) ─────────────────────────────── + +type announcePayload struct { + MacAddress string `json:"mac_address"` + FirmwareVersion string `json:"firmware_version"` + Capabilities []string `json:"capabilities"` + FriendlyName string `json:"friendly_name"` +} + +func (s *Subscriber) handleAnnounce(cameraID string, payload []byte) { + var ap announcePayload + if err := json.Unmarshal(payload, &ap); err != nil { + log.Printf("MQTT announce parse error for %s: %v", cameraID, err) + return + } + + if ap.MacAddress == "" { + log.Printf("MQTT announce missing mac_address from %s", cameraID) + return + } + + // Check if this MAC is already registered + var existingID string + err := s.db.QueryRow( + "SELECT camera_id FROM cameras WHERE mac_address = ?", ap.MacAddress, + ).Scan(&existingID) + + if err == nil { + // Already registered — just update friendly_name + _, err = s.db.Exec( + "UPDATE cameras SET friendly_name = ?, updated_at = datetime('now') WHERE camera_id = ?", + ap.FriendlyName, existingID, + ) + if err != nil { + log.Printf("MQTT announce update error for %s: %v", existingID, err) + return + } + log.Printf("MQTT announce: camera %s (%s) re-connected", existingID, ap.FriendlyName) + } else { + // New camera — generate sequential cam-NNN ID + var maxID string + s.db.QueryRow("SELECT MAX(camera_id) FROM cameras").Scan(&maxID) + + seq := 1 + if maxID != "" { + fmt.Sscanf(maxID, "cam-%d", &seq) + seq++ + } + + newID := fmt.Sprintf("cam-%03d", seq) + _, err = s.db.Exec(` + INSERT INTO cameras (camera_id, friendly_name, mac_address, created_at, updated_at) + VALUES (?, ?, ?, datetime('now'), datetime('now')) + `, newID, ap.FriendlyName, ap.MacAddress) + if err != nil { + log.Printf("MQTT announce insert error for %s: %v", ap.MacAddress, err) + return + } + + log.Printf("MQTT announce: new camera registered as %s (%s)", newID, ap.FriendlyName) + + // Broadcast new camera via SSE + cam, err := getCamera(s.db, newID) + if err == nil { + s.hub.Broadcast("camera_registered", cam) + } + } +} + +// ── Helpers ───────────────────────────────────────────────────────────── + +// extractCameraID pulls from remoterig/cameras// +func extractCameraID(topic string) string { + parts := strings.Split(topic, "/") + if len(parts) >= 3 && parts[0] == "remoterig" && parts[1] == "cameras" { + return parts[2] + } + return "" +} + +// getCamera fetches a camera by ID from the database. +func getCamera(db *db.DB, cameraID string) (models.Camera, error) { + var cam models.Camera + err := db.QueryRow( + "SELECT camera_id, friendly_name, COALESCE(mac_address,''), created_at, updated_at FROM cameras WHERE camera_id = ?", + cameraID, + ).Scan(&cam.CameraID, &cam.FriendlyName, &cam.MacAddress, &cam.CreatedAt, &cam.UpdatedAt) + return cam, err +} + +func stringPtr(s *string) *string { + if s == nil || *s == "" { + return nil + } + return s +} + +func intPtr(i *int) *int { + if i == nil { + return nil + } + return i +} + +// Close shuts down the MQTT subscriber. +func (s *Subscriber) Close() { + close(s.done) + if s.client != nil && s.client.IsConnected() { + s.client.Disconnect(250) + log.Println("MQTT disconnected") + } +}