diff --git a/internal/events/sse.go b/internal/events/sse.go index 0ed13d6..425938e 100644 --- a/internal/events/sse.go +++ b/internal/events/sse.go @@ -7,13 +7,15 @@ import ( "log" "net/http" "sync" + "sync/atomic" "time" ) // Hub manages SSE client connections and event broadcasting. type Hub struct { - mu sync.RWMutex - clients map[*Client]bool + mu sync.RWMutex + clients map[*Client]bool + eventSeq atomic.Int64 // monotonic event ID for Last-Event-ID } // NewHub creates a new SSE hub. @@ -58,6 +60,13 @@ func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Headers", "Last-Event-ID") + + // Handle preflight + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusOK) + return + } // Get flusher flusher, ok := w.(http.Flusher) @@ -85,12 +94,21 @@ func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) { client.Close() }() + // Acknowledge Last-Event-ID if sent by client on reconnect + if lastEventID := r.Header.Get("Last-Event-ID"); lastEventID != "" { + fmt.Fprintf(w, "id: %s\nevent: reconnected\ndata: {\"type\":\"reconnected\",\"last_event_id\":\"%s\"}\n\n", lastEventID, lastEventID) + flusher.Flush() + } + // Send initial connection event - data, _ := json.Marshal(map[string]string{ + seq := h.eventSeq.Add(1) + data, _ := json.Marshal(map[string]interface{}{ "type": "connected", + "id": seq, "ts": time.Now().Format(time.RFC3339), }) - if !client.Write(data) { + eventLine := fmt.Sprintf("id: %d\nevent: connected\ndata: %s\n\n", seq, string(data)) + if !client.Write([]byte(eventLine)) { return // client disconnected } @@ -115,13 +133,18 @@ func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) { } } -// Broadcast sends an event to all connected clients. +// Broadcast sends a typed SSE event to all connected clients. +// eventType becomes the "event:" field, enabling client-side filtering. +// Each event gets a monotonic ID for Last-Event-ID reconnection support. func (h *Hub) Broadcast(eventType string, payload interface{}) { h.mu.RLock() defer h.mu.RUnlock() + seq := h.eventSeq.Add(1) + event := map[string]interface{}{ "type": eventType, + "id": seq, "ts": time.Now().Format(time.RFC3339), "payload": payload, } @@ -132,8 +155,10 @@ func (h *Hub) Broadcast(eventType string, payload interface{}) { return } + eventLine := fmt.Sprintf("id: %d\nevent: %s\ndata: %s\n\n", seq, eventType, string(data)) + for client := range h.clients { - if !client.Write(data) { + if !client.Write([]byte(eventLine)) { log.Println("SSE client buffer full, dropping event") } }