// Package events provides Server-Sent Events (SSE) for real-time camera status. package events import ( "encoding/json" "fmt" "log" "net/http" "sync" "time" ) // Hub manages SSE client connections and event broadcasting. type Hub struct { mu sync.RWMutex clients map[*Client]bool } // NewHub creates a new SSE hub. func NewHub() *Hub { return &Hub{ clients: make(map[*Client]bool), } } // Handler returns an HTTP handler for SSE connections. func (h *Hub) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { h.serveSSE(w, r) }) } // Client represents an SSE client connection. type Client struct { done chan struct{} writes chan []byte } // Write sends data to the client. func (c *Client) Write(data []byte) bool { select { case c.writes <- data: return true default: return false // buffer full } } // Close stops the client. func (c *Client) Close() { close(c.done) } // serveSSE handles SSE connections. func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) { // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") // Get flusher flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } // Create client client := &Client{ done: make(chan struct{}), writes: make(chan []byte, 256), } // Register client h.mu.Lock() h.clients[client] = true h.mu.Unlock() // Cleanup on disconnect defer func() { h.mu.Lock() delete(h.clients, client) h.mu.Unlock() client.Close() }() // Send initial connection event data, _ := json.Marshal(map[string]string{ "type": "connected", "ts": time.Now().Format(time.RFC3339), }) if !client.Write(data) { return // client disconnected } // Heartbeat and write loop heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() for { select { case <-client.done: return case <-r.Context().Done(): log.Println("SSE client disconnected") return case event := <-client.writes: fmt.Fprintf(w, "data: %s\n\n", string(event)) flusher.Flush() case <-heartbeat.C: fmt.Fprint(w, ": heartbeat\n\n") flusher.Flush() } } } // Broadcast sends an event to all connected clients. func (h *Hub) Broadcast(eventType string, payload interface{}) { h.mu.RLock() defer h.mu.RUnlock() event := map[string]interface{}{ "type": eventType, "ts": time.Now().Format(time.RFC3339), "payload": payload, } data, err := json.Marshal(event) if err != nil { log.Printf("SSE marshal error: %v", err) return } for client := range h.clients { if !client.Write(data) { log.Println("SSE client buffer full, dropping event") } } }