From d538dd3b70c43501c2ae5a0139dea37ea097a4c9 Mon Sep 17 00:00:00 2001 From: Joshua King Date: Fri, 5 Jun 2026 20:28:26 -0400 Subject: [PATCH] hub: actually send start/stop commands over MQTT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The /cameras/{id}/start and /stop handlers only wrote a recording_events row — they never published the command, so the camera never recorded. Add Subscriber.PublishCommand (publishes {"command":...} to remoterig/cameras//command, which the XIAO forwards to the ESP-01S), thread a CommandPublisher into the recording handlers, and wire mqttSub in via apiRouter. Tests pass nil (publish skipped). Co-Authored-By: Claude Opus 4.8 --- cmd/server/main.go | 8 ++++---- internal/api/camera_test.go | 4 ++-- internal/api/recording.go | 28 ++++++++++++++++++++++++++-- internal/mqtt/subscriber.go | 12 ++++++++++++ 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index dd27687..7f7014b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -89,7 +89,7 @@ func main() { }) // API routes (auth required if API key is configured) - r.Mount("/api/v1", auth.Middleware(cfg.APIKey)(apiRouter(sseHub, sqlDB))) + r.Mount("/api/v1", auth.Middleware(cfg.APIKey)(apiRouter(sseHub, sqlDB, mqttSub))) // Serve embedded React frontend with SPA fallback r.Mount("/", frontendHandler()) @@ -122,7 +122,7 @@ func main() { } // apiRouter creates the API route tree. -func apiRouter(sseHub *events.Hub, database *db.DB) http.Handler { +func apiRouter(sseHub *events.Hub, database *db.DB, pub api.CommandPublisher) http.Handler { r := chi.NewRouter() // Camera management routes @@ -131,8 +131,8 @@ func apiRouter(sseHub *events.Hub, database *db.DB) http.Handler { r.Get("/cameras/{id}", api.GetCameraDetail(database)) // Recording control routes - r.Post("/cameras/{id}/start", api.StartRecording(database)) - r.Post("/cameras/{id}/stop", api.StopRecording(database)) + r.Post("/cameras/{id}/start", api.StartRecording(database, pub)) + r.Post("/cameras/{id}/stop", api.StopRecording(database, pub)) // Status ingestion (from ESP32 nodes) r.Post("/cameras/{id}/status", api.PushStatus(database)) diff --git a/internal/api/camera_test.go b/internal/api/camera_test.go index 5d087ac..255ac25 100644 --- a/internal/api/camera_test.go +++ b/internal/api/camera_test.go @@ -26,8 +26,8 @@ func setupTestRouter(t *testing.T) (*db.DB, chi.Router) { r.Get("/cameras", ListCameras(database)) r.Post("/cameras", RegisterCamera(database)) r.Get("/cameras/{id}", GetCameraDetail(database)) - r.Post("/cameras/{id}/start", StartRecording(database)) - r.Post("/cameras/{id}/stop", StopRecording(database)) + r.Post("/cameras/{id}/start", StartRecording(database, nil)) + r.Post("/cameras/{id}/stop", StopRecording(database, nil)) r.Post("/cameras/{id}/status", PushStatus(database)) return database, r diff --git a/internal/api/recording.go b/internal/api/recording.go index e862f81..7e9af92 100644 --- a/internal/api/recording.go +++ b/internal/api/recording.go @@ -9,8 +9,14 @@ import ( "github.com/go-chi/chi/v5" ) +// CommandPublisher sends a command to a camera (implemented by the MQTT +// subscriber). Nil is allowed (e.g. in tests) — the command is then skipped. +type CommandPublisher interface { + PublishCommand(cameraID, command string) error +} + // StartRecording returns a handler for POST /cameras/{id}/start. -func StartRecording(database *db.DB) http.HandlerFunc { +func StartRecording(database *db.DB, pub CommandPublisher) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cameraID := chi.URLParam(r, "id") if !validateCameraID(w, cameraID) { @@ -45,6 +51,15 @@ func StartRecording(database *db.DB) http.HandlerFunc { rowsAffected, _ := result.RowsAffected() log.Printf("Recording started on %s (%d rows affected)", cameraID, rowsAffected) + // Send the actual command to the camera over MQTT. + if pub != nil { + if err := pub.PublishCommand(cameraID, "start_recording"); err != nil { + log.Printf("Error sending start_recording to %s: %v", cameraID, err) + respondError(w, http.StatusBadGateway, "failed to send command to camera", err.Error()) + return + } + } + respondJSON(w, http.StatusOK, map[string]string{ "status": "recording_started", "camera_id": cameraID, @@ -53,7 +68,7 @@ func StartRecording(database *db.DB) http.HandlerFunc { } // StopRecording returns a handler for POST /cameras/{id}/stop. -func StopRecording(database *db.DB) http.HandlerFunc { +func StopRecording(database *db.DB, pub CommandPublisher) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cameraID := chi.URLParam(r, "id") if !validateCameraID(w, cameraID) { @@ -88,6 +103,15 @@ func StopRecording(database *db.DB) http.HandlerFunc { rowsAffected, _ := result.RowsAffected() log.Printf("Recording stopped on %s (%d rows affected)", cameraID, rowsAffected) + // Send the actual command to the camera over MQTT. + if pub != nil { + if err := pub.PublishCommand(cameraID, "stop_recording"); err != nil { + log.Printf("Error sending stop_recording to %s: %v", cameraID, err) + respondError(w, http.StatusBadGateway, "failed to send command to camera", err.Error()) + return + } + } + respondJSON(w, http.StatusOK, map[string]string{ "status": "recording_stopped", "camera_id": cameraID, diff --git a/internal/mqtt/subscriber.go b/internal/mqtt/subscriber.go index abe5cf8..a3ba875 100644 --- a/internal/mqtt/subscriber.go +++ b/internal/mqtt/subscriber.go @@ -143,6 +143,18 @@ type statusPayload struct { UptimeSec *int `json:"uptime_sec"` } +// PublishCommand sends a command (e.g. "start_recording") to a camera's +// command topic, which its ESP32 bridge subscribes to and forwards over UART. +func (s *Subscriber) PublishCommand(cameraID, command string) error { + topic := "remoterig/cameras/" + cameraID + "/command" + payload, _ := json.Marshal(map[string]string{"command": command}) + tok := s.client.Publish(topic, 2, false, payload) + if !tok.WaitTimeout(3 * time.Second) { + return fmt.Errorf("publish to %s timed out", topic) + } + return tok.Error() +} + func (s *Subscriber) handleStatus(cameraID string, payload []byte) { var sp statusPayload if err := json.Unmarshal(payload, &sp); err != nil {