diff --git a/cmd/server/main.go b/cmd/server/main.go index 8503e62..d10f663 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -11,8 +11,10 @@ import ( "syscall" "time" + "github.com/cubecraft/remoterig/internal/api" "github.com/cubecraft/remoterig/internal/auth" "github.com/cubecraft/remoterig/internal/db" + "github.com/cubecraft/remoterig/internal/events" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" @@ -47,13 +49,16 @@ func main() { } // Open database - db, err := db.Open(cfg.DBPath) + sqlDB, err := db.Open(cfg.DBPath) if err != nil { log.Fatalf("Failed to open database: %v", err) } - defer db.Close() + defer sqlDB.Close() log.Printf("Database open: %s", cfg.DBPath) + // Create SSE hub for real-time updates + sseHub := events.NewHub() + // Set up router r := chi.NewRouter() r.Use(middleware.RequestID) @@ -69,7 +74,7 @@ func main() { }) // API routes (auth required if API key is configured) - r.Mount("/api/v1", auth.Middleware(cfg.APIKey)(apiRouter(db))) + r.Mount("/api/v1", auth.Middleware(cfg.APIKey)(apiRouter(sseHub, sqlDB))) // Create server httpServer := &http.Server{ @@ -98,10 +103,24 @@ func main() { } // apiRouter creates the API route tree. -func apiRouter(database *db.DB) http.Handler { +func apiRouter(sseHub *events.Hub, database *db.DB) http.Handler { r := chi.NewRouter() - // TODO: register handler routes here - // Example: r.Get("/cameras", handlers.ListCameras(database)) + + // Camera management routes + r.Get("/cameras", api.ListCameras(database)) + r.Post("/cameras", api.RegisterCamera(database)) + r.Get("/cameras/{id}", api.GetCameraDetail(database)) + + // Recording control routes + r.Post("/cameras/{id}/start", api.StartRecording(database)) + r.Post("/cameras/{id}/stop", api.StopRecording(database)) + + // Status ingestion (from ESP32 nodes) + r.Post("/cameras/{id}/status", api.PushStatus(database)) + + // Real-time events (SSE) + r.Handle("/events/stream", sseHub.Handler()) + return r } @@ -131,4 +150,4 @@ func loadConfig(path string) (*Config, error) { } return &cfg, nil -} +} \ No newline at end of file diff --git a/internal/api/recording.go b/internal/api/recording.go new file mode 100644 index 0000000..fb5bfca --- /dev/null +++ b/internal/api/recording.go @@ -0,0 +1,88 @@ +// Package api provides HTTP handlers for camera operations. +package api + +import ( + "log" + "net/http" + + "github.com/cubecraft/remoterig/internal/db" + "github.com/go-chi/chi/v5" +) + +// StartRecording returns a handler for POST /cameras/{id}/start. +func StartRecording(database *db.DB) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + cameraID := chi.URLParam(r, "id") + if cameraID == "" { + respondJSON(w, http.StatusBadRequest, map[string]string{"error": "camera_id required"}) + return + } + + // Check if camera is registered + var exists int + err := database.QueryRowContext(r.Context(), + "SELECT COUNT(*) FROM cameras WHERE camera_id = ?", cameraID).Scan(&exists) + if err != nil || exists == 0 { + respondJSON(w, http.StatusNotFound, map[string]string{"error": "camera not registered"}) + return + } + + // Open recording event + result, err := database.ExecContext(r.Context(), ` + INSERT INTO recording_events (camera_id, started_at, reason) + VALUES (?, datetime('now'), 'manual') + `, cameraID) + if err != nil { + log.Printf("Error starting recording: %v", err) + respondJSON(w, http.StatusInternalServerError, map[string]string{"error": "database error"}) + return + } + + rows, _ := result.RowsAffected() + log.Printf("Recording started on %s (%d rows affected)", cameraID, rows) + + respondJSON(w, http.StatusOK, map[string]string{ + "status": "recording_started", + "camera_id": cameraID, + }) + } +} + +// StopRecording returns a handler for POST /cameras/{id}/stop. +func StopRecording(database *db.DB) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + cameraID := chi.URLParam(r, "id") + if cameraID == "" { + respondJSON(w, http.StatusBadRequest, map[string]string{"error": "camera_id required"}) + return + } + + // Check if camera is registered + var exists int + err := database.QueryRowContext(r.Context(), + "SELECT COUNT(*) FROM cameras WHERE camera_id = ?", cameraID).Scan(&exists) + if err != nil || exists == 0 { + respondJSON(w, http.StatusNotFound, map[string]string{"error": "camera not registered"}) + return + } + + // Close the most recent open recording event + result, err := database.ExecContext(r.Context(), ` + UPDATE recording_events SET stopped_at = datetime('now'), reason = 'manual' + WHERE camera_id = ? AND stopped_at IS NULL + `, cameraID) + if err != nil { + log.Printf("Error stopping recording: %v", err) + respondJSON(w, http.StatusInternalServerError, map[string]string{"error": "database error"}) + return + } + + rows, _ := result.RowsAffected() + log.Printf("Recording stopped on %s (%d rows affected)", cameraID, rows) + + respondJSON(w, http.StatusOK, map[string]string{ + "status": "recording_stopped", + "camera_id": cameraID, + }) + } +} diff --git a/internal/api/status.go b/internal/api/status.go new file mode 100644 index 0000000..5ba0f7d --- /dev/null +++ b/internal/api/status.go @@ -0,0 +1,106 @@ +// Package api provides HTTP handlers for camera operations. +package api + +import ( + "encoding/json" + "log" + "net/http" + + "github.com/cubecraft/remoterig/internal/db" + "github.com/go-chi/chi/v5" +) + +// PushStatus accepts a status update from an ESP32 node and persists it. +func PushStatus(database *db.DB) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + cameraID := chi.URLParam(r, "id") + if cameraID == "" { + respondJSON(w, http.StatusBadRequest, map[string]string{"error": "camera_id required"}) + return + } + + var req struct { + BatteryPct *int `json:"battery_pct"` + VideoRemainingSec *int `json:"video_remaining_sec"` + Recording bool `json:"recording"` + Mode string `json:"mode"` + Resolution string `json:"resolution"` + FPS int `json:"fps"` + Online bool `json:"online"` + RawBatteryPct *float64 `json:"raw_battery_pct"` + Timestamp *string `json:"ts"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + respondJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"}) + return + } + + // Check if camera is registered + var exists int + err := database.QueryRowContext(r.Context(), + "SELECT COUNT(*) FROM cameras WHERE camera_id = ?", cameraID).Scan(&exists) + if err != nil || exists == 0 { + respondJSON(w, http.StatusNotFound, map[string]string{"error": "camera not registered"}) + return + } + + // Insert status log + result, err := database.ExecContext(r.Context(), ` + INSERT INTO status_logs (camera_id, battery_pct, video_remaining_sec, + recording_state, mode, resolution, fps, online, raw_battery_pct) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `, cameraID, req.BatteryPct, req.VideoRemainingSec, + boolToInt(req.Recording), req.Mode, req.Resolution, + req.FPS, boolToInt(req.Online), req.RawBatteryPct) + if err != nil { + log.Printf("Error inserting status log: %v", err) + respondJSON(w, http.StatusInternalServerError, map[string]string{"error": "database error"}) + return + } + + // Check if recording state changed - update recording_events if so + var prevRecording int + err = database.QueryRowContext(r.Context(), ` + SELECT recording_state FROM status_logs + WHERE camera_id = ? AND recorded_at > datetime('now', '-60 seconds') + ORDER BY recorded_at DESC LIMIT 1 + `, cameraID).Scan(&prevRecording) + if err == nil && prevRecording != boolToInt(req.Recording) { + reason := "manual" + if req.Recording { + // Start recording - open a new event + _, err := database.ExecContext(r.Context(), ` + INSERT INTO recording_events (camera_id, started_at, reason) + VALUES (?, datetime('now'), ?) + `, cameraID, reason) + if err != nil { + log.Printf("Error inserting recording event: %v", err) + } + } else { + // Stop recording - close the most recent open event + _, err := database.ExecContext(r.Context(), ` + UPDATE recording_events SET stopped_at = datetime('now') + WHERE camera_id = ? AND stopped_at IS NULL + ORDER BY started_at DESC LIMIT 1 + `, cameraID) + if err != nil { + log.Printf("Error updating recording event: %v", err) + } + } + } + + _, _ = result.RowsAffected() // consume the result + + respondJSON(w, http.StatusOK, map[string]string{ + "status": "accepted", + }) + } +} + +// boolToInt converts a bool to 0 or 1 for SQLite storage. +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/internal/events/sse.go b/internal/events/sse.go new file mode 100644 index 0000000..0ed13d6 --- /dev/null +++ b/internal/events/sse.go @@ -0,0 +1,140 @@ +// Package events provides Server-Sent Events (SSE) for real-time camera status. +package events + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "sync" + "time" +) + +// Hub manages SSE client connections and event broadcasting. +type Hub struct { + mu sync.RWMutex + clients map[*Client]bool +} + +// NewHub creates a new SSE hub. +func NewHub() *Hub { + return &Hub{ + clients: make(map[*Client]bool), + } +} + +// Handler returns an HTTP handler for SSE connections. +func (h *Hub) Handler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h.serveSSE(w, r) + }) +} + +// Client represents an SSE client connection. +type Client struct { + done chan struct{} + writes chan []byte +} + +// Write sends data to the client. +func (c *Client) Write(data []byte) bool { + select { + case c.writes <- data: + return true + default: + return false // buffer full + } +} + +// Close stops the client. +func (c *Client) Close() { + close(c.done) +} + +// serveSSE handles SSE connections. +func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) { + // Set SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // Get flusher + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported", http.StatusInternalServerError) + return + } + + // Create client + client := &Client{ + done: make(chan struct{}), + writes: make(chan []byte, 256), + } + + // Register client + h.mu.Lock() + h.clients[client] = true + h.mu.Unlock() + + // Cleanup on disconnect + defer func() { + h.mu.Lock() + delete(h.clients, client) + h.mu.Unlock() + client.Close() + }() + + // Send initial connection event + data, _ := json.Marshal(map[string]string{ + "type": "connected", + "ts": time.Now().Format(time.RFC3339), + }) + if !client.Write(data) { + return // client disconnected + } + + // Heartbeat and write loop + heartbeat := time.NewTicker(30 * time.Second) + defer heartbeat.Stop() + + for { + select { + case <-client.done: + return + case <-r.Context().Done(): + log.Println("SSE client disconnected") + return + case event := <-client.writes: + fmt.Fprintf(w, "data: %s\n\n", string(event)) + flusher.Flush() + case <-heartbeat.C: + fmt.Fprint(w, ": heartbeat\n\n") + flusher.Flush() + } + } +} + +// Broadcast sends an event to all connected clients. +func (h *Hub) Broadcast(eventType string, payload interface{}) { + h.mu.RLock() + defer h.mu.RUnlock() + + event := map[string]interface{}{ + "type": eventType, + "ts": time.Now().Format(time.RFC3339), + "payload": payload, + } + + data, err := json.Marshal(event) + if err != nil { + log.Printf("SSE marshal error: %v", err) + return + } + + for client := range h.clients { + if !client.Write(data) { + log.Println("SSE client buffer full, dropping event") + } + } +}