// Package handler provides SSE (Server-Sent Events) streaming for the // Control Center API. The Broker manages client connections and broadcasts // typed events in text/event-stream format. package handler import ( "encoding/json" "fmt" "log/slog" "net/http" "sync" ) // SSEEvent represents a single event to stream to connected clients. type SSEEvent struct { EventType string `json:"eventType"` Data any `json:"data"` } // Broker manages SSE client connections and broadcasts events to all // connected listeners. It is safe for concurrent use. type Broker struct { mu sync.RWMutex clients map[chan SSEEvent]struct{} } // NewBroker returns an initialized Broker. func NewBroker() *Broker { return &Broker{ clients: make(map[chan SSEEvent]struct{}), } } // Subscribe registers a new client channel. The caller must read from // this channel and write SSE frames to the HTTP response writer. func (b *Broker) Subscribe() chan SSEEvent { b.mu.Lock() defer b.mu.Unlock() ch := make(chan SSEEvent, 32) // small buffer to avoid blocking bursts b.clients[ch] = struct{}{} return ch } // Unsubscribe removes a client channel and closes it. func (b *Broker) Unsubscribe(ch chan SSEEvent) { b.mu.Lock() defer b.mu.Unlock() if _, ok := b.clients[ch]; ok { delete(b.clients, ch) close(ch) } } // Broadcast sends evt to every connected client. Slow clients that cannot // receive within their buffer are silently dropped (non-blocking send). func (b *Broker) Broadcast(eventType string, data any) { evt := SSEEvent{EventType: eventType, Data: data} b.mu.RLock() defer b.mu.RUnlock() for ch := range b.clients { select { case ch <- evt: default: // Client too slow — drop this event for this client slog.Warn("sse client buffer full, dropping event", "eventType", eventType) } } } // ClientCount returns the number of currently connected SSE clients. func (b *Broker) ClientCount() int { b.mu.RLock() defer b.mu.RUnlock() return len(b.clients) } // ServeHTTP handles GET /api/events. It registers the client, streams // events in text/event-stream format, and cleans up on disconnect. func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Ensure we can flush flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } // SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") // disable nginx buffering ch := b.Subscribe() defer b.Unsubscribe(ch) // Send initial connection event fmt.Fprintf(w, "event: connected\ndata: {\"clientCount\":%d}\n\n", b.ClientCount()) flusher.Flush() ctx := r.Context() for { select { case <-ctx.Done(): // Client disconnected slog.Debug("sse client disconnected") return case evt, ok := <-ch: if !ok { return } data, err := json.Marshal(evt.Data) if err != nil { slog.Error("sse marshal failed", "error", err) continue } fmt.Fprintf(w, "event: %s\ndata: %s\n\n", evt.EventType, string(data)) flusher.Flush() } } }