generated from CubeCraft-Creations/Tracehound
435 lines
12 KiB
Go
435 lines
12 KiB
Go
|
|
// Package mqtt provides the MQTT subscriber that ingests ESP32 camera status
|
||
|
|
// via Mosquitto broker and persists to SQLite with SSE fan-out.
|
||
|
|
package mqtt
|
||
|
|
|
||
|
|
import (
|
||
|
|
"encoding/json"
|
||
|
|
"fmt"
|
||
|
|
"log"
|
||
|
|
"strings"
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/cubecraft/remoterig/internal/db"
|
||
|
|
"github.com/cubecraft/remoterig/internal/events"
|
||
|
|
"github.com/cubecraft/remoterig/pkg/models"
|
||
|
|
|
||
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||
|
|
)
|
||
|
|
|
||
|
|
// Subscriber connects to Mosquitto, subscribes to camera topics, and
|
||
|
|
// processes incoming status, heartbeat, and announce messages.
|
||
|
|
type Subscriber struct {
|
||
|
|
mu sync.Mutex
|
||
|
|
broker string
|
||
|
|
clientID string
|
||
|
|
client mqtt.Client
|
||
|
|
db *db.DB
|
||
|
|
hub *events.Hub
|
||
|
|
|
||
|
|
// Heartbeat tracking: last heartbeat time per camera_id
|
||
|
|
heartbeats map[string]time.Time
|
||
|
|
|
||
|
|
// Shutdown
|
||
|
|
done chan struct{}
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewSubscriber creates a new MQTT subscriber (does not connect yet).
|
||
|
|
func NewSubscriber(broker, clientID string, database *db.DB, sseHub *events.Hub) *Subscriber {
|
||
|
|
return &Subscriber{
|
||
|
|
broker: broker,
|
||
|
|
clientID: clientID,
|
||
|
|
db: database,
|
||
|
|
hub: sseHub,
|
||
|
|
heartbeats: make(map[string]time.Time),
|
||
|
|
done: make(chan struct{}),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Connect establishes the MQTT connection and subscribes to topics.
|
||
|
|
// Returns an error if the initial connection fails.
|
||
|
|
func (s *Subscriber) Connect() error {
|
||
|
|
opts := mqtt.NewClientOptions().
|
||
|
|
AddBroker(fmt.Sprintf("tcp://%s", s.broker)).
|
||
|
|
SetClientID(s.clientID).
|
||
|
|
SetCleanSession(true).
|
||
|
|
SetAutoReconnect(true).
|
||
|
|
SetMaxReconnectInterval(30 * time.Second).
|
||
|
|
SetConnectRetry(true).
|
||
|
|
SetConnectRetryInterval(5 * time.Second).
|
||
|
|
SetOnConnectHandler(s.onConnect).
|
||
|
|
SetDefaultPublishHandler(s.onMessage).
|
||
|
|
SetConnectionLostHandler(func(c mqtt.Client, err error) {
|
||
|
|
log.Printf("MQTT connection lost: %v (will auto-reconnect)", err)
|
||
|
|
})
|
||
|
|
|
||
|
|
s.client = mqtt.NewClient(opts)
|
||
|
|
token := s.client.Connect()
|
||
|
|
if !token.WaitTimeout(10 * time.Second) {
|
||
|
|
return fmt.Errorf("mqtt connect timeout")
|
||
|
|
}
|
||
|
|
if err := token.Error(); err != nil {
|
||
|
|
return fmt.Errorf("mqtt connect: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
log.Printf("MQTT connected to %s as %s", s.broker, s.clientID)
|
||
|
|
|
||
|
|
// Start heartbeat watchdog (runs independently)
|
||
|
|
go s.heartbeatWatchdog()
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// onConnect is called after every (re)connection. It re-subscribes to topics.
|
||
|
|
func (s *Subscriber) onConnect(c mqtt.Client) {
|
||
|
|
topics := map[string]byte{
|
||
|
|
"remoterig/cameras/+/status": 1, // QoS 1
|
||
|
|
"remoterig/cameras/+/heartbeat": 1, // QoS 1
|
||
|
|
"remoterig/cameras/+/announce": 2, // QoS 2
|
||
|
|
"remoterig/cameras/+/command": 2, // QoS 2 (hub publishes, ESP32 receives)
|
||
|
|
}
|
||
|
|
|
||
|
|
token := c.SubscribeMultiple(topics, nil)
|
||
|
|
if token.WaitTimeout(5 * time.Second) {
|
||
|
|
if err := token.Error(); err != nil {
|
||
|
|
log.Printf("MQTT subscribe error: %v", err)
|
||
|
|
} else {
|
||
|
|
log.Printf("MQTT subscribed to %d topics", len(topics))
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
log.Println("MQTT subscribe timeout")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// onMessage is the default message handler. It routes by topic.
|
||
|
|
func (s *Subscriber) onMessage(c mqtt.Client, msg mqtt.Message) {
|
||
|
|
topic := msg.Topic()
|
||
|
|
payload := msg.Payload()
|
||
|
|
|
||
|
|
// Extract camera_id from topic: remoterig/cameras/<camera_id>/<type>
|
||
|
|
cameraID := extractCameraID(topic)
|
||
|
|
if cameraID == "" {
|
||
|
|
log.Printf("MQTT: could not extract camera_id from topic %q", topic)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
switch {
|
||
|
|
case strings.HasSuffix(topic, "/status"):
|
||
|
|
s.handleStatus(cameraID, payload)
|
||
|
|
case strings.HasSuffix(topic, "/heartbeat"):
|
||
|
|
s.handleHeartbeat(cameraID, payload)
|
||
|
|
case strings.HasSuffix(topic, "/announce"):
|
||
|
|
s.handleAnnounce(cameraID, payload)
|
||
|
|
default:
|
||
|
|
log.Printf("MQTT: unhandled topic %q", topic)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Status handler ──────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// statusPayload matches the MQTT contract status message.
|
||
|
|
type statusPayload struct {
|
||
|
|
CameraID string `json:"camera_id"`
|
||
|
|
Timestamp string `json:"timestamp"`
|
||
|
|
BatteryPct *int `json:"battery_pct"`
|
||
|
|
BatteryRaw *int `json:"battery_raw"`
|
||
|
|
VideoRemainingSec *int `json:"video_remaining_sec"`
|
||
|
|
Recording bool `json:"recording"`
|
||
|
|
Mode *string `json:"mode"`
|
||
|
|
Resolution *string `json:"resolution"`
|
||
|
|
FPS *int `json:"fps"`
|
||
|
|
Online bool `json:"online"`
|
||
|
|
RSSI *int `json:"rssi"`
|
||
|
|
UptimeSec *int `json:"uptime_sec"`
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Subscriber) handleStatus(cameraID string, payload []byte) {
|
||
|
|
var sp statusPayload
|
||
|
|
if err := json.Unmarshal(payload, &sp); err != nil {
|
||
|
|
log.Printf("MQTT status parse error for %s: %v", cameraID, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Validate required fields
|
||
|
|
if sp.CameraID == "" || sp.Timestamp == "" {
|
||
|
|
log.Printf("MQTT status missing required fields (camera_id, timestamp) from %s", cameraID)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Validate timestamp sanity (reject >5min future, >24h past)
|
||
|
|
ts, err := time.Parse(time.RFC3339, sp.Timestamp)
|
||
|
|
if err != nil {
|
||
|
|
// Try ISO8601 without timezone
|
||
|
|
ts, err = time.Parse("2006-01-02T15:04:05", sp.Timestamp)
|
||
|
|
if err != nil {
|
||
|
|
log.Printf("MQTT status invalid timestamp %q from %s", sp.Timestamp, cameraID)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
}
|
||
|
|
now := time.Now()
|
||
|
|
if ts.After(now.Add(5 * time.Minute)) {
|
||
|
|
log.Printf("MQTT status timestamp too far in future (%s) from %s — using now", ts, cameraID)
|
||
|
|
ts = now
|
||
|
|
}
|
||
|
|
if ts.Before(now.Add(-24 * time.Hour)) {
|
||
|
|
log.Printf("MQTT status timestamp too far in past (%s) from %s — using now", ts, cameraID)
|
||
|
|
ts = now
|
||
|
|
}
|
||
|
|
|
||
|
|
// Clamp battery_pct to 0-100
|
||
|
|
batteryPct := sp.BatteryPct
|
||
|
|
if batteryPct != nil {
|
||
|
|
if *batteryPct < 0 {
|
||
|
|
v := 0
|
||
|
|
batteryPct = &v
|
||
|
|
}
|
||
|
|
if *batteryPct > 100 {
|
||
|
|
v := 100
|
||
|
|
batteryPct = &v
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
recordingState := 0
|
||
|
|
if sp.Recording {
|
||
|
|
recordingState = 1
|
||
|
|
}
|
||
|
|
onlineState := 0
|
||
|
|
if sp.Online {
|
||
|
|
onlineState = 1
|
||
|
|
}
|
||
|
|
|
||
|
|
// Detect recording state change by checking previous status
|
||
|
|
var prevRecording int
|
||
|
|
row := s.db.QueryRow(`
|
||
|
|
SELECT recording_state FROM status_logs
|
||
|
|
WHERE camera_id = ? AND recorded_at > datetime('now', '-120 seconds')
|
||
|
|
ORDER BY recorded_at DESC LIMIT 1
|
||
|
|
`, cameraID)
|
||
|
|
if err := row.Scan(&prevRecording); err != nil {
|
||
|
|
prevRecording = -1 // no previous status
|
||
|
|
}
|
||
|
|
|
||
|
|
// Insert status_log
|
||
|
|
_, err = s.db.Exec(`
|
||
|
|
INSERT INTO status_logs (camera_id, recorded_at, battery_pct,
|
||
|
|
video_remaining_sec, recording_state, mode, resolution, fps, online)
|
||
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
|
|
`, cameraID, ts, batteryPct, sp.VideoRemainingSec,
|
||
|
|
recordingState, stringPtr(sp.Mode), stringPtr(sp.Resolution),
|
||
|
|
intPtr(sp.FPS), onlineState)
|
||
|
|
if err != nil {
|
||
|
|
log.Printf("MQTT status insert error for %s: %v", cameraID, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Handle recording state transitions
|
||
|
|
if prevRecording >= 0 && prevRecording != recordingState {
|
||
|
|
reason := "mqtt"
|
||
|
|
if recordingState == 1 {
|
||
|
|
// Started recording — open new recording_event
|
||
|
|
_, err = s.db.Exec(`
|
||
|
|
INSERT INTO recording_events (camera_id, started_at, reason)
|
||
|
|
VALUES (?, ?, ?)
|
||
|
|
`, cameraID, ts, reason)
|
||
|
|
if err != nil {
|
||
|
|
log.Printf("MQTT recording_event insert error for %s: %v", cameraID, err)
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
// Stopped recording — close most recent open event
|
||
|
|
_, err = s.db.Exec(`
|
||
|
|
UPDATE recording_events SET stopped_at = ?
|
||
|
|
WHERE camera_id = ? AND stopped_at IS NULL
|
||
|
|
ORDER BY started_at DESC LIMIT 1
|
||
|
|
`, ts, cameraID)
|
||
|
|
if err != nil {
|
||
|
|
log.Printf("MQTT recording_event close error for %s: %v", cameraID, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Broadcast via SSE
|
||
|
|
cam, err := getCamera(s.db, cameraID)
|
||
|
|
if err == nil {
|
||
|
|
sl := models.StatusLog{
|
||
|
|
CameraID: cameraID,
|
||
|
|
RecordedAt: ts,
|
||
|
|
BatteryPct: batteryPct,
|
||
|
|
VideoRemainingSec: sp.VideoRemainingSec,
|
||
|
|
RecordingState: recordingState,
|
||
|
|
Mode: stringPtr(sp.Mode),
|
||
|
|
Resolution: stringPtr(sp.Resolution),
|
||
|
|
FPS: intPtr(sp.FPS),
|
||
|
|
Online: onlineState,
|
||
|
|
}
|
||
|
|
cs := models.NewCameraStatus(cam, sl)
|
||
|
|
s.hub.Broadcast("camera_status", cs)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Heartbeat handler ───────────────────────────────────────────────────
|
||
|
|
|
||
|
|
type heartbeatPayload struct {
|
||
|
|
CameraID string `json:"camera_id"`
|
||
|
|
Timestamp string `json:"timestamp"`
|
||
|
|
UptimeSec *int `json:"uptime_sec"`
|
||
|
|
FreeHeap *int `json:"free_heap"`
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Subscriber) handleHeartbeat(cameraID string, payload []byte) {
|
||
|
|
var hp heartbeatPayload
|
||
|
|
if err := json.Unmarshal(payload, &hp); err != nil {
|
||
|
|
log.Printf("MQTT heartbeat parse error for %s: %v", cameraID, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
s.mu.Lock()
|
||
|
|
s.heartbeats[cameraID] = time.Now()
|
||
|
|
s.mu.Unlock()
|
||
|
|
}
|
||
|
|
|
||
|
|
// heartbeatWatchdog runs every 10 seconds and marks cameras offline if
|
||
|
|
// no heartbeat received in 120 seconds.
|
||
|
|
func (s *Subscriber) heartbeatWatchdog() {
|
||
|
|
ticker := time.NewTicker(10 * time.Second)
|
||
|
|
defer ticker.Stop()
|
||
|
|
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-s.done:
|
||
|
|
return
|
||
|
|
case <-ticker.C:
|
||
|
|
s.mu.Lock()
|
||
|
|
for cameraID, lastBeat := range s.heartbeats {
|
||
|
|
if time.Since(lastBeat) > 120*time.Second {
|
||
|
|
// Camera missed heartbeat — broadcast offline
|
||
|
|
cam, err := getCamera(s.db, cameraID)
|
||
|
|
if err == nil {
|
||
|
|
cs := models.CameraStatus{
|
||
|
|
CameraID: cameraID,
|
||
|
|
FriendlyName: cam.FriendlyName,
|
||
|
|
Online: false,
|
||
|
|
LastSeen: lastBeat,
|
||
|
|
}
|
||
|
|
s.hub.Broadcast("camera_offline", cs)
|
||
|
|
}
|
||
|
|
delete(s.heartbeats, cameraID)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
s.mu.Unlock()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Announce handler (auto-registration) ───────────────────────────────
|
||
|
|
|
||
|
|
type announcePayload struct {
|
||
|
|
MacAddress string `json:"mac_address"`
|
||
|
|
FirmwareVersion string `json:"firmware_version"`
|
||
|
|
Capabilities []string `json:"capabilities"`
|
||
|
|
FriendlyName string `json:"friendly_name"`
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Subscriber) handleAnnounce(cameraID string, payload []byte) {
|
||
|
|
var ap announcePayload
|
||
|
|
if err := json.Unmarshal(payload, &ap); err != nil {
|
||
|
|
log.Printf("MQTT announce parse error for %s: %v", cameraID, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
if ap.MacAddress == "" {
|
||
|
|
log.Printf("MQTT announce missing mac_address from %s", cameraID)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Check if this MAC is already registered
|
||
|
|
var existingID string
|
||
|
|
err := s.db.QueryRow(
|
||
|
|
"SELECT camera_id FROM cameras WHERE mac_address = ?", ap.MacAddress,
|
||
|
|
).Scan(&existingID)
|
||
|
|
|
||
|
|
if err == nil {
|
||
|
|
// Already registered — just update friendly_name
|
||
|
|
_, err = s.db.Exec(
|
||
|
|
"UPDATE cameras SET friendly_name = ?, updated_at = datetime('now') WHERE camera_id = ?",
|
||
|
|
ap.FriendlyName, existingID,
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
log.Printf("MQTT announce update error for %s: %v", existingID, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
log.Printf("MQTT announce: camera %s (%s) re-connected", existingID, ap.FriendlyName)
|
||
|
|
} else {
|
||
|
|
// New camera — generate sequential cam-NNN ID
|
||
|
|
var maxID string
|
||
|
|
s.db.QueryRow("SELECT MAX(camera_id) FROM cameras").Scan(&maxID)
|
||
|
|
|
||
|
|
seq := 1
|
||
|
|
if maxID != "" {
|
||
|
|
fmt.Sscanf(maxID, "cam-%d", &seq)
|
||
|
|
seq++
|
||
|
|
}
|
||
|
|
|
||
|
|
newID := fmt.Sprintf("cam-%03d", seq)
|
||
|
|
_, err = s.db.Exec(`
|
||
|
|
INSERT INTO cameras (camera_id, friendly_name, mac_address, created_at, updated_at)
|
||
|
|
VALUES (?, ?, ?, datetime('now'), datetime('now'))
|
||
|
|
`, newID, ap.FriendlyName, ap.MacAddress)
|
||
|
|
if err != nil {
|
||
|
|
log.Printf("MQTT announce insert error for %s: %v", ap.MacAddress, err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
log.Printf("MQTT announce: new camera registered as %s (%s)", newID, ap.FriendlyName)
|
||
|
|
|
||
|
|
// Broadcast new camera via SSE
|
||
|
|
cam, err := getCamera(s.db, newID)
|
||
|
|
if err == nil {
|
||
|
|
s.hub.Broadcast("camera_registered", cam)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Helpers ─────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// extractCameraID pulls <camera_id> from remoterig/cameras/<camera_id>/<type>
|
||
|
|
func extractCameraID(topic string) string {
|
||
|
|
parts := strings.Split(topic, "/")
|
||
|
|
if len(parts) >= 3 && parts[0] == "remoterig" && parts[1] == "cameras" {
|
||
|
|
return parts[2]
|
||
|
|
}
|
||
|
|
return ""
|
||
|
|
}
|
||
|
|
|
||
|
|
// getCamera fetches a camera by ID from the database.
|
||
|
|
func getCamera(db *db.DB, cameraID string) (models.Camera, error) {
|
||
|
|
var cam models.Camera
|
||
|
|
err := db.QueryRow(
|
||
|
|
"SELECT camera_id, friendly_name, COALESCE(mac_address,''), created_at, updated_at FROM cameras WHERE camera_id = ?",
|
||
|
|
cameraID,
|
||
|
|
).Scan(&cam.CameraID, &cam.FriendlyName, &cam.MacAddress, &cam.CreatedAt, &cam.UpdatedAt)
|
||
|
|
return cam, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func stringPtr(s *string) *string {
|
||
|
|
if s == nil || *s == "" {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return s
|
||
|
|
}
|
||
|
|
|
||
|
|
func intPtr(i *int) *int {
|
||
|
|
if i == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return i
|
||
|
|
}
|
||
|
|
|
||
|
|
// Close shuts down the MQTT subscriber.
|
||
|
|
func (s *Subscriber) Close() {
|
||
|
|
close(s.done)
|
||
|
|
if s.client != nil && s.client.IsConnected() {
|
||
|
|
s.client.Disconnect(250)
|
||
|
|
log.Println("MQTT disconnected")
|
||
|
|
}
|
||
|
|
}
|