// Package mqtt provides the MQTT subscriber that ingests ESP32 camera status // via Mosquitto broker and persists to SQLite with SSE fan-out. package mqtt import ( "encoding/json" "fmt" "log" "strings" "sync" "time" "github.com/cubecraft/remoterig/internal/db" "github.com/cubecraft/remoterig/internal/events" "github.com/cubecraft/remoterig/pkg/models" mqtt "github.com/eclipse/paho.mqtt.golang" ) // Subscriber connects to Mosquitto, subscribes to camera topics, and // processes incoming status, heartbeat, and announce messages. type Subscriber struct { mu sync.Mutex broker string clientID string client mqtt.Client db *db.DB hub *events.Hub // Heartbeat tracking: last heartbeat time per camera_id heartbeats map[string]time.Time // Shutdown done chan struct{} } // NewSubscriber creates a new MQTT subscriber (does not connect yet). func NewSubscriber(broker, clientID string, database *db.DB, sseHub *events.Hub) *Subscriber { return &Subscriber{ broker: broker, clientID: clientID, db: database, hub: sseHub, heartbeats: make(map[string]time.Time), done: make(chan struct{}), } } // Connect establishes the MQTT connection and subscribes to topics. // Returns an error if the initial connection fails. func (s *Subscriber) Connect() error { opts := mqtt.NewClientOptions(). AddBroker(fmt.Sprintf("tcp://%s", s.broker)). SetClientID(s.clientID). SetCleanSession(true). SetAutoReconnect(true). SetMaxReconnectInterval(30 * time.Second). SetConnectRetry(true). SetConnectRetryInterval(5 * time.Second). SetOnConnectHandler(s.onConnect). SetDefaultPublishHandler(s.onMessage). SetConnectionLostHandler(func(c mqtt.Client, err error) { log.Printf("MQTT connection lost: %v (will auto-reconnect)", err) }) s.client = mqtt.NewClient(opts) token := s.client.Connect() if !token.WaitTimeout(10 * time.Second) { return fmt.Errorf("mqtt connect timeout") } if err := token.Error(); err != nil { return fmt.Errorf("mqtt connect: %w", err) } log.Printf("MQTT connected to %s as %s", s.broker, s.clientID) // Start heartbeat watchdog (runs independently) go s.heartbeatWatchdog() return nil } // onConnect is called after every (re)connection. It re-subscribes to topics. func (s *Subscriber) onConnect(c mqtt.Client) { topics := map[string]byte{ "remoterig/cameras/+/status": 1, // QoS 1 "remoterig/cameras/+/heartbeat": 1, // QoS 1 "remoterig/cameras/+/announce": 2, // QoS 2 "remoterig/cameras/+/command": 2, // QoS 2 (hub publishes, ESP32 receives) } token := c.SubscribeMultiple(topics, nil) if token.WaitTimeout(5 * time.Second) { if err := token.Error(); err != nil { log.Printf("MQTT subscribe error: %v", err) } else { log.Printf("MQTT subscribed to %d topics", len(topics)) } } else { log.Println("MQTT subscribe timeout") } } // onMessage is the default message handler. It routes by topic. func (s *Subscriber) onMessage(c mqtt.Client, msg mqtt.Message) { topic := msg.Topic() payload := msg.Payload() // Extract camera_id from topic: remoterig/cameras// cameraID := extractCameraID(topic) if cameraID == "" { log.Printf("MQTT: could not extract camera_id from topic %q", topic) return } switch { case strings.HasSuffix(topic, "/status"): s.handleStatus(cameraID, payload) case strings.HasSuffix(topic, "/heartbeat"): s.handleHeartbeat(cameraID, payload) case strings.HasSuffix(topic, "/announce"): s.handleAnnounce(cameraID, payload) default: log.Printf("MQTT: unhandled topic %q", topic) } } // ── Status handler ────────────────────────────────────────────────────── // statusPayload matches the MQTT contract status message. type statusPayload struct { CameraID string `json:"camera_id"` Timestamp string `json:"timestamp"` BatteryPct *int `json:"battery_pct"` BatteryRaw *int `json:"battery_raw"` VideoRemainingSec *int `json:"video_remaining_sec"` Recording bool `json:"recording"` Mode *string `json:"mode"` Resolution *string `json:"resolution"` FPS *int `json:"fps"` Online bool `json:"online"` RSSI *int `json:"rssi"` UptimeSec *int `json:"uptime_sec"` } func (s *Subscriber) handleStatus(cameraID string, payload []byte) { var sp statusPayload if err := json.Unmarshal(payload, &sp); err != nil { log.Printf("MQTT status parse error for %s: %v", cameraID, err) return } // Validate required fields if sp.CameraID == "" || sp.Timestamp == "" { log.Printf("MQTT status missing required fields (camera_id, timestamp) from %s", cameraID) return } // Validate timestamp sanity (reject >5min future, >24h past) ts, err := time.Parse(time.RFC3339, sp.Timestamp) if err != nil { // Try ISO8601 without timezone ts, err = time.Parse("2006-01-02T15:04:05", sp.Timestamp) if err != nil { log.Printf("MQTT status invalid timestamp %q from %s", sp.Timestamp, cameraID) return } } now := time.Now() if ts.After(now.Add(5 * time.Minute)) { log.Printf("MQTT status timestamp too far in future (%s) from %s — using now", ts, cameraID) ts = now } if ts.Before(now.Add(-24 * time.Hour)) { log.Printf("MQTT status timestamp too far in past (%s) from %s — using now", ts, cameraID) ts = now } // Clamp battery_pct to 0-100 batteryPct := sp.BatteryPct if batteryPct != nil { if *batteryPct < 0 { v := 0 batteryPct = &v } if *batteryPct > 100 { v := 100 batteryPct = &v } } recordingState := 0 if sp.Recording { recordingState = 1 } onlineState := 0 if sp.Online { onlineState = 1 } // Detect recording state change by checking previous status var prevRecording int row := s.db.QueryRow(` SELECT recording_state FROM status_logs WHERE camera_id = ? AND recorded_at > datetime('now', '-120 seconds') ORDER BY recorded_at DESC LIMIT 1 `, cameraID) if err := row.Scan(&prevRecording); err != nil { prevRecording = -1 // no previous status } // Insert status_log _, err = s.db.Exec(` INSERT INTO status_logs (camera_id, recorded_at, battery_pct, video_remaining_sec, recording_state, mode, resolution, fps, online) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `, cameraID, ts, batteryPct, sp.VideoRemainingSec, recordingState, stringPtr(sp.Mode), stringPtr(sp.Resolution), intPtr(sp.FPS), onlineState) if err != nil { log.Printf("MQTT status insert error for %s: %v", cameraID, err) return } // Handle recording state transitions if prevRecording >= 0 && prevRecording != recordingState { reason := "mqtt" if recordingState == 1 { // Started recording — open new recording_event _, err = s.db.Exec(` INSERT INTO recording_events (camera_id, started_at, reason) VALUES (?, ?, ?) `, cameraID, ts, reason) if err != nil { log.Printf("MQTT recording_event insert error for %s: %v", cameraID, err) } } else { // Stopped recording — close most recent open event _, err = s.db.Exec(` UPDATE recording_events SET stopped_at = ? WHERE camera_id = ? AND stopped_at IS NULL ORDER BY started_at DESC LIMIT 1 `, ts, cameraID) if err != nil { log.Printf("MQTT recording_event close error for %s: %v", cameraID, err) } } } // Broadcast via SSE cam, err := getCamera(s.db, cameraID) if err == nil { sl := models.StatusLog{ CameraID: cameraID, RecordedAt: ts, BatteryPct: batteryPct, VideoRemainingSec: sp.VideoRemainingSec, RecordingState: recordingState, Mode: stringPtr(sp.Mode), Resolution: stringPtr(sp.Resolution), FPS: intPtr(sp.FPS), Online: onlineState, } cs := models.NewCameraStatus(cam, sl) s.hub.Broadcast("camera_status", cs) } } // ── Heartbeat handler ─────────────────────────────────────────────────── type heartbeatPayload struct { CameraID string `json:"camera_id"` Timestamp string `json:"timestamp"` UptimeSec *int `json:"uptime_sec"` FreeHeap *int `json:"free_heap"` } func (s *Subscriber) handleHeartbeat(cameraID string, payload []byte) { var hp heartbeatPayload if err := json.Unmarshal(payload, &hp); err != nil { log.Printf("MQTT heartbeat parse error for %s: %v", cameraID, err) return } s.mu.Lock() s.heartbeats[cameraID] = time.Now() s.mu.Unlock() } // heartbeatWatchdog runs every 10 seconds and marks cameras offline if // no heartbeat received in 120 seconds. func (s *Subscriber) heartbeatWatchdog() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-s.done: return case <-ticker.C: s.mu.Lock() for cameraID, lastBeat := range s.heartbeats { if time.Since(lastBeat) > 120*time.Second { // Camera missed heartbeat — broadcast offline cam, err := getCamera(s.db, cameraID) if err == nil { cs := models.CameraStatus{ CameraID: cameraID, FriendlyName: cam.FriendlyName, Online: false, LastSeen: lastBeat, } s.hub.Broadcast("camera_offline", cs) } delete(s.heartbeats, cameraID) } } s.mu.Unlock() } } } // ── Announce handler (auto-registration) ─────────────────────────────── type announcePayload struct { MacAddress string `json:"mac_address"` FirmwareVersion string `json:"firmware_version"` Capabilities []string `json:"capabilities"` FriendlyName string `json:"friendly_name"` } func (s *Subscriber) handleAnnounce(cameraID string, payload []byte) { var ap announcePayload if err := json.Unmarshal(payload, &ap); err != nil { log.Printf("MQTT announce parse error for %s: %v", cameraID, err) return } if ap.MacAddress == "" { log.Printf("MQTT announce missing mac_address from %s", cameraID) return } // Check if this MAC is already registered var existingID string err := s.db.QueryRow( "SELECT camera_id FROM cameras WHERE mac_address = ?", ap.MacAddress, ).Scan(&existingID) if err == nil { // Already registered — just update friendly_name _, err = s.db.Exec( "UPDATE cameras SET friendly_name = ?, updated_at = datetime('now') WHERE camera_id = ?", ap.FriendlyName, existingID, ) if err != nil { log.Printf("MQTT announce update error for %s: %v", existingID, err) return } log.Printf("MQTT announce: camera %s (%s) re-connected", existingID, ap.FriendlyName) } else { // New camera — generate sequential cam-NNN ID var maxID string s.db.QueryRow("SELECT MAX(camera_id) FROM cameras").Scan(&maxID) seq := 1 if maxID != "" { fmt.Sscanf(maxID, "cam-%d", &seq) seq++ } newID := fmt.Sprintf("cam-%03d", seq) _, err = s.db.Exec(` INSERT INTO cameras (camera_id, friendly_name, mac_address, created_at, updated_at) VALUES (?, ?, ?, datetime('now'), datetime('now')) `, newID, ap.FriendlyName, ap.MacAddress) if err != nil { log.Printf("MQTT announce insert error for %s: %v", ap.MacAddress, err) return } log.Printf("MQTT announce: new camera registered as %s (%s)", newID, ap.FriendlyName) // Broadcast new camera via SSE cam, err := getCamera(s.db, newID) if err == nil { s.hub.Broadcast("camera_registered", cam) } } } // ── Helpers ───────────────────────────────────────────────────────────── // extractCameraID pulls from remoterig/cameras// func extractCameraID(topic string) string { parts := strings.Split(topic, "/") if len(parts) >= 3 && parts[0] == "remoterig" && parts[1] == "cameras" { return parts[2] } return "" } // getCamera fetches a camera by ID from the database. func getCamera(db *db.DB, cameraID string) (models.Camera, error) { var cam models.Camera err := db.QueryRow( "SELECT camera_id, friendly_name, COALESCE(mac_address,''), COALESCE(battery_calibration_offset, NULL), created_at, updated_at FROM cameras WHERE camera_id = ?", cameraID, ).Scan(&cam.CameraID, &cam.FriendlyName, &cam.MacAddress, &cam.BatteryCalibrationOffset, &cam.CreatedAt, &cam.UpdatedAt) return cam, err } func stringPtr(s *string) *string { if s == nil || *s == "" { return nil } return s } func intPtr(i *int) *int { if i == nil { return nil } return i } // Close shuts down the MQTT subscriber. func (s *Subscriber) Close() { close(s.done) if s.client != nil && s.client.IsConnected() { s.client.Disconnect(250) log.Println("MQTT disconnected") } }