CUB-187+188+191+193: Recording handlers, status ingestion, SSE endpoint
ci/verify Branch verified

This commit is contained in:
2026-05-18 17:52:48 -04:00
parent 5cc57446e5
commit 111cce4c84
4 changed files with 360 additions and 7 deletions
+26 -7
View File
@@ -11,8 +11,10 @@ import (
"syscall"
"time"
"github.com/cubecraft/remoterig/internal/api"
"github.com/cubecraft/remoterig/internal/auth"
"github.com/cubecraft/remoterig/internal/db"
"github.com/cubecraft/remoterig/internal/events"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
@@ -47,13 +49,16 @@ func main() {
}
// Open database
db, err := db.Open(cfg.DBPath)
sqlDB, err := db.Open(cfg.DBPath)
if err != nil {
log.Fatalf("Failed to open database: %v", err)
}
defer db.Close()
defer sqlDB.Close()
log.Printf("Database open: %s", cfg.DBPath)
// Create SSE hub for real-time updates
sseHub := events.NewHub()
// Set up router
r := chi.NewRouter()
r.Use(middleware.RequestID)
@@ -69,7 +74,7 @@ func main() {
})
// API routes (auth required if API key is configured)
r.Mount("/api/v1", auth.Middleware(cfg.APIKey)(apiRouter(db)))
r.Mount("/api/v1", auth.Middleware(cfg.APIKey)(apiRouter(sseHub, sqlDB)))
// Create server
httpServer := &http.Server{
@@ -98,10 +103,24 @@ func main() {
}
// apiRouter creates the API route tree.
func apiRouter(database *db.DB) http.Handler {
func apiRouter(sseHub *events.Hub, database *db.DB) http.Handler {
r := chi.NewRouter()
// TODO: register handler routes here
// Example: r.Get("/cameras", handlers.ListCameras(database))
// Camera management routes
r.Get("/cameras", api.ListCameras(database))
r.Post("/cameras", api.RegisterCamera(database))
r.Get("/cameras/{id}", api.GetCameraDetail(database))
// Recording control routes
r.Post("/cameras/{id}/start", api.StartRecording(database))
r.Post("/cameras/{id}/stop", api.StopRecording(database))
// Status ingestion (from ESP32 nodes)
r.Post("/cameras/{id}/status", api.PushStatus(database))
// Real-time events (SSE)
r.Handle("/events/stream", sseHub.Handler())
return r
}
@@ -131,4 +150,4 @@ func loadConfig(path string) (*Config, error) {
}
return &cfg, nil
}
}
+88
View File
@@ -0,0 +1,88 @@
// Package api provides HTTP handlers for camera operations.
package api
import (
"log"
"net/http"
"github.com/cubecraft/remoterig/internal/db"
"github.com/go-chi/chi/v5"
)
// StartRecording returns a handler for POST /cameras/{id}/start.
func StartRecording(database *db.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cameraID := chi.URLParam(r, "id")
if cameraID == "" {
respondJSON(w, http.StatusBadRequest, map[string]string{"error": "camera_id required"})
return
}
// Check if camera is registered
var exists int
err := database.QueryRowContext(r.Context(),
"SELECT COUNT(*) FROM cameras WHERE camera_id = ?", cameraID).Scan(&exists)
if err != nil || exists == 0 {
respondJSON(w, http.StatusNotFound, map[string]string{"error": "camera not registered"})
return
}
// Open recording event
result, err := database.ExecContext(r.Context(), `
INSERT INTO recording_events (camera_id, started_at, reason)
VALUES (?, datetime('now'), 'manual')
`, cameraID)
if err != nil {
log.Printf("Error starting recording: %v", err)
respondJSON(w, http.StatusInternalServerError, map[string]string{"error": "database error"})
return
}
rows, _ := result.RowsAffected()
log.Printf("Recording started on %s (%d rows affected)", cameraID, rows)
respondJSON(w, http.StatusOK, map[string]string{
"status": "recording_started",
"camera_id": cameraID,
})
}
}
// StopRecording returns a handler for POST /cameras/{id}/stop.
func StopRecording(database *db.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cameraID := chi.URLParam(r, "id")
if cameraID == "" {
respondJSON(w, http.StatusBadRequest, map[string]string{"error": "camera_id required"})
return
}
// Check if camera is registered
var exists int
err := database.QueryRowContext(r.Context(),
"SELECT COUNT(*) FROM cameras WHERE camera_id = ?", cameraID).Scan(&exists)
if err != nil || exists == 0 {
respondJSON(w, http.StatusNotFound, map[string]string{"error": "camera not registered"})
return
}
// Close the most recent open recording event
result, err := database.ExecContext(r.Context(), `
UPDATE recording_events SET stopped_at = datetime('now'), reason = 'manual'
WHERE camera_id = ? AND stopped_at IS NULL
`, cameraID)
if err != nil {
log.Printf("Error stopping recording: %v", err)
respondJSON(w, http.StatusInternalServerError, map[string]string{"error": "database error"})
return
}
rows, _ := result.RowsAffected()
log.Printf("Recording stopped on %s (%d rows affected)", cameraID, rows)
respondJSON(w, http.StatusOK, map[string]string{
"status": "recording_stopped",
"camera_id": cameraID,
})
}
}
+106
View File
@@ -0,0 +1,106 @@
// Package api provides HTTP handlers for camera operations.
package api
import (
"encoding/json"
"log"
"net/http"
"github.com/cubecraft/remoterig/internal/db"
"github.com/go-chi/chi/v5"
)
// PushStatus accepts a status update from an ESP32 node and persists it.
func PushStatus(database *db.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cameraID := chi.URLParam(r, "id")
if cameraID == "" {
respondJSON(w, http.StatusBadRequest, map[string]string{"error": "camera_id required"})
return
}
var req struct {
BatteryPct *int `json:"battery_pct"`
VideoRemainingSec *int `json:"video_remaining_sec"`
Recording bool `json:"recording"`
Mode string `json:"mode"`
Resolution string `json:"resolution"`
FPS int `json:"fps"`
Online bool `json:"online"`
RawBatteryPct *float64 `json:"raw_battery_pct"`
Timestamp *string `json:"ts"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
respondJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"})
return
}
// Check if camera is registered
var exists int
err := database.QueryRowContext(r.Context(),
"SELECT COUNT(*) FROM cameras WHERE camera_id = ?", cameraID).Scan(&exists)
if err != nil || exists == 0 {
respondJSON(w, http.StatusNotFound, map[string]string{"error": "camera not registered"})
return
}
// Insert status log
result, err := database.ExecContext(r.Context(), `
INSERT INTO status_logs (camera_id, battery_pct, video_remaining_sec,
recording_state, mode, resolution, fps, online, raw_battery_pct)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`, cameraID, req.BatteryPct, req.VideoRemainingSec,
boolToInt(req.Recording), req.Mode, req.Resolution,
req.FPS, boolToInt(req.Online), req.RawBatteryPct)
if err != nil {
log.Printf("Error inserting status log: %v", err)
respondJSON(w, http.StatusInternalServerError, map[string]string{"error": "database error"})
return
}
// Check if recording state changed - update recording_events if so
var prevRecording int
err = database.QueryRowContext(r.Context(), `
SELECT recording_state FROM status_logs
WHERE camera_id = ? AND recorded_at > datetime('now', '-60 seconds')
ORDER BY recorded_at DESC LIMIT 1
`, cameraID).Scan(&prevRecording)
if err == nil && prevRecording != boolToInt(req.Recording) {
reason := "manual"
if req.Recording {
// Start recording - open a new event
_, err := database.ExecContext(r.Context(), `
INSERT INTO recording_events (camera_id, started_at, reason)
VALUES (?, datetime('now'), ?)
`, cameraID, reason)
if err != nil {
log.Printf("Error inserting recording event: %v", err)
}
} else {
// Stop recording - close the most recent open event
_, err := database.ExecContext(r.Context(), `
UPDATE recording_events SET stopped_at = datetime('now')
WHERE camera_id = ? AND stopped_at IS NULL
ORDER BY started_at DESC LIMIT 1
`, cameraID)
if err != nil {
log.Printf("Error updating recording event: %v", err)
}
}
}
_, _ = result.RowsAffected() // consume the result
respondJSON(w, http.StatusOK, map[string]string{
"status": "accepted",
})
}
}
// boolToInt converts a bool to 0 or 1 for SQLite storage.
func boolToInt(b bool) int {
if b {
return 1
}
return 0
}
+140
View File
@@ -0,0 +1,140 @@
// Package events provides Server-Sent Events (SSE) for real-time camera status.
package events
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// Hub manages SSE client connections and event broadcasting.
type Hub struct {
mu sync.RWMutex
clients map[*Client]bool
}
// NewHub creates a new SSE hub.
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
}
}
// Handler returns an HTTP handler for SSE connections.
func (h *Hub) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h.serveSSE(w, r)
})
}
// Client represents an SSE client connection.
type Client struct {
done chan struct{}
writes chan []byte
}
// Write sends data to the client.
func (c *Client) Write(data []byte) bool {
select {
case c.writes <- data:
return true
default:
return false // buffer full
}
}
// Close stops the client.
func (c *Client) Close() {
close(c.done)
}
// serveSSE handles SSE connections.
func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Get flusher
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
// Create client
client := &Client{
done: make(chan struct{}),
writes: make(chan []byte, 256),
}
// Register client
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
// Cleanup on disconnect
defer func() {
h.mu.Lock()
delete(h.clients, client)
h.mu.Unlock()
client.Close()
}()
// Send initial connection event
data, _ := json.Marshal(map[string]string{
"type": "connected",
"ts": time.Now().Format(time.RFC3339),
})
if !client.Write(data) {
return // client disconnected
}
// Heartbeat and write loop
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-client.done:
return
case <-r.Context().Done():
log.Println("SSE client disconnected")
return
case event := <-client.writes:
fmt.Fprintf(w, "data: %s\n\n", string(event))
flusher.Flush()
case <-heartbeat.C:
fmt.Fprint(w, ": heartbeat\n\n")
flusher.Flush()
}
}
}
// Broadcast sends an event to all connected clients.
func (h *Hub) Broadcast(eventType string, payload interface{}) {
h.mu.RLock()
defer h.mu.RUnlock()
event := map[string]interface{}{
"type": eventType,
"ts": time.Now().Format(time.RFC3339),
"payload": payload,
}
data, err := json.Marshal(event)
if err != nil {
log.Printf("SSE marshal error: %v", err)
return
}
for client := range h.clients {
if !client.Write(data) {
log.Println("SSE client buffer full, dropping event")
}
}
}