package sse import ( "log/slog" "sync" ) // client represents a single SSE subscriber — identified by its send channel. type client struct { ch chan string } // Broadcaster receives Events on its input channel and fans them out to every // connected client. Subscribe adds a new client; Unsubscribe removes one. // Start must be called before the broadcaster accepts events. type Broadcaster struct { input chan Event subscribe chan client unsubscribe chan client clients map[chan string]struct{} done chan struct{} once sync.Once } // NewBroadcaster creates a Broadcaster. bufSize controls the buffer depth for // the input channel as well as for each per-client outbound channel. func NewBroadcaster(bufSize int) *Broadcaster { if bufSize <= 0 { bufSize = 64 } return &Broadcaster{ input: make(chan Event, bufSize), subscribe: make(chan client), unsubscribe: make(chan client), clients: make(map[chan string]struct{}), done: make(chan struct{}), } } // Publish pushes an event into the broadcaster. Safe for concurrent use. func (b *Broadcaster) Publish(ev Event) { select { case b.input <- ev: case <-b.done: // Silently drop during shutdown. } } // Start launches the broadcaster's fan-out loop in a goroutine. // It must be called before Publish is used. func (b *Broadcaster) Start() { go b.loop() } // Stop terminates the fan-out loop and closes all client channels. // It is safe to call multiple times. func (b *Broadcaster) Stop() { b.once.Do(func() { close(b.done) }) } // Subscribe returns a new client channel that receives SSE-formatted strings. func (b *Broadcaster) Subscribe() chan string { c := client{ch: make(chan string, 64)} select { case b.subscribe <- c: case <-b.done: // Broadcaster already stopped — return a closed chan so the handler // can bail out quickly. ch := make(chan string) close(ch) return ch } return c.ch } // Unsubscribe removes a client channel and closes it. func (b *Broadcaster) Unsubscribe(ch chan string) { c := client{ch: ch} select { case b.unsubscribe <- c: case <-b.done: // Already shutting down — channels will be cleaned up by Stop. } } // loop is the core fan-out goroutine. func (b *Broadcaster) loop() { for { select { case ev := <-b.input: sse := ev.toSSE() for ch := range b.clients { // Non-blocking send — slow clients are dropped. select { case ch <- sse: default: slog.Warn("sse broadcaster: dropping event for slow client", "type", ev.Type) } } case c := <-b.subscribe: b.clients[c.ch] = struct{}{} slog.Debug("sse broadcaster: client connected", "total_clients", len(b.clients)) case c := <-b.unsubscribe: if _, ok := b.clients[c.ch]; ok { delete(b.clients, c.ch) close(c.ch) slog.Debug("sse broadcaster: client disconnected", "total_clients", len(b.clients)) } case <-b.done: // Drain remaining events in input before shutting down. for ev := range b.input { sse := ev.toSSE() for ch := range b.clients { select { case ch <- sse: default: } } } // Close all remaining client channels. for ch := range b.clients { close(ch) } b.clients = nil return } } }