// Package events provides Server-Sent Events (SSE) for real-time camera status. package events import ( "encoding/json" "fmt" "log" "net/http" "sync" "sync/atomic" "time" ) // Hub manages SSE client connections and event broadcasting. type Hub struct { mu sync.RWMutex clients map[*Client]bool eventSeq atomic.Int64 // monotonic event ID for Last-Event-ID } // NewHub creates a new SSE hub. func NewHub() *Hub { return &Hub{ clients: make(map[*Client]bool), } } // Handler returns an HTTP handler for SSE connections. func (h *Hub) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { h.serveSSE(w, r) }) } // Client represents an SSE client connection. type Client struct { done chan struct{} writes chan []byte } // Write sends data to the client. func (c *Client) Write(data []byte) bool { select { case c.writes <- data: return true default: return false // buffer full } } // Close stops the client. func (c *Client) Close() { close(c.done) } // serveSSE handles SSE connections. func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) { // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Last-Event-ID") // Handle preflight if r.Method == http.MethodOptions { w.WriteHeader(http.StatusOK) return } // Get flusher flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } // Create client client := &Client{ done: make(chan struct{}), writes: make(chan []byte, 256), } // Register client h.mu.Lock() h.clients[client] = true h.mu.Unlock() // Cleanup on disconnect defer func() { h.mu.Lock() delete(h.clients, client) h.mu.Unlock() client.Close() }() // Acknowledge Last-Event-ID if sent by client on reconnect if lastEventID := r.Header.Get("Last-Event-ID"); lastEventID != "" { fmt.Fprintf(w, "id: %s\nevent: reconnected\ndata: {\"type\":\"reconnected\",\"last_event_id\":\"%s\"}\n\n", lastEventID, lastEventID) flusher.Flush() } // Send initial connection event seq := h.eventSeq.Add(1) data, _ := json.Marshal(map[string]interface{}{ "type": "connected", "id": seq, "ts": time.Now().Format(time.RFC3339), }) eventLine := fmt.Sprintf("id: %d\nevent: connected\ndata: %s\n\n", seq, string(data)) if !client.Write([]byte(eventLine)) { return // client disconnected } // Heartbeat and write loop heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() for { select { case <-client.done: return case <-r.Context().Done(): log.Println("SSE client disconnected") return case event := <-client.writes: fmt.Fprintf(w, "data: %s\n\n", string(event)) flusher.Flush() case <-heartbeat.C: fmt.Fprint(w, ": heartbeat\n\n") flusher.Flush() } } } // Broadcast sends a typed SSE event to all connected clients. // eventType becomes the "event:" field, enabling client-side filtering. // Each event gets a monotonic ID for Last-Event-ID reconnection support. func (h *Hub) Broadcast(eventType string, payload interface{}) { h.mu.RLock() defer h.mu.RUnlock() seq := h.eventSeq.Add(1) event := map[string]interface{}{ "type": eventType, "id": seq, "ts": time.Now().Format(time.RFC3339), "payload": payload, } data, err := json.Marshal(event) if err != nil { log.Printf("SSE marshal error: %v", err) return } eventLine := fmt.Sprintf("id: %d\nevent: %s\ndata: %s\n\n", seq, eventType, string(data)) for client := range h.clients { if !client.Write([]byte(eventLine)) { log.Println("SSE client buffer full, dropping event") } } }