Files
Extrudex/backend/internal/sse/broadcaster.go
hex-bot 41f66005a6
All checks were successful
Dev Build / build-test (pull_request) Successful in 2m9s
CUB-136: add SSE endpoint in Go backend
2026-05-07 08:29:34 -04:00

134 lines
3.2 KiB
Go

package sse
import (
"log/slog"
"sync"
)
// client represents a single SSE subscriber — identified by its send channel.
type client struct {
ch chan string
}
// Broadcaster receives Events on its input channel and fans them out to every
// connected client. Subscribe adds a new client; Unsubscribe removes one.
// Start must be called before the broadcaster accepts events.
type Broadcaster struct {
input chan Event
subscribe chan client
unsubscribe chan client
clients map[chan string]struct{}
done chan struct{}
once sync.Once
}
// NewBroadcaster creates a Broadcaster. bufSize controls the buffer depth for
// the input channel as well as for each per-client outbound channel.
func NewBroadcaster(bufSize int) *Broadcaster {
if bufSize <= 0 {
bufSize = 64
}
return &Broadcaster{
input: make(chan Event, bufSize),
subscribe: make(chan client),
unsubscribe: make(chan client),
clients: make(map[chan string]struct{}),
done: make(chan struct{}),
}
}
// Publish pushes an event into the broadcaster. Safe for concurrent use.
func (b *Broadcaster) Publish(ev Event) {
select {
case b.input <- ev:
case <-b.done:
// Silently drop during shutdown.
}
}
// Start launches the broadcaster's fan-out loop in a goroutine.
// It must be called before Publish is used.
func (b *Broadcaster) Start() {
go b.loop()
}
// Stop terminates the fan-out loop and closes all client channels.
// It is safe to call multiple times.
func (b *Broadcaster) Stop() {
b.once.Do(func() {
close(b.done)
})
}
// Subscribe returns a new client channel that receives SSE-formatted strings.
func (b *Broadcaster) Subscribe() chan string {
c := client{ch: make(chan string, 64)}
select {
case b.subscribe <- c:
case <-b.done:
// Broadcaster already stopped — return a closed chan so the handler
// can bail out quickly.
ch := make(chan string)
close(ch)
return ch
}
return c.ch
}
// Unsubscribe removes a client channel and closes it.
func (b *Broadcaster) Unsubscribe(ch chan string) {
c := client{ch: ch}
select {
case b.unsubscribe <- c:
case <-b.done:
// Already shutting down — channels will be cleaned up by Stop.
}
}
// loop is the core fan-out goroutine.
func (b *Broadcaster) loop() {
for {
select {
case ev := <-b.input:
sse := ev.toSSE()
for ch := range b.clients {
// Non-blocking send — slow clients are dropped.
select {
case ch <- sse:
default:
slog.Warn("sse broadcaster: dropping event for slow client", "type", ev.Type)
}
}
case c := <-b.subscribe:
b.clients[c.ch] = struct{}{}
slog.Debug("sse broadcaster: client connected", "total_clients", len(b.clients))
case c := <-b.unsubscribe:
if _, ok := b.clients[c.ch]; ok {
delete(b.clients, c.ch)
close(c.ch)
slog.Debug("sse broadcaster: client disconnected", "total_clients", len(b.clients))
}
case <-b.done:
// Drain remaining events in input before shutting down.
for ev := range b.input {
sse := ev.toSSE()
for ch := range b.clients {
select {
case ch <- sse:
default:
}
}
}
// Close all remaining client channels.
for ch := range b.clients {
close(ch)
}
b.clients = nil
return
}
}
}