feat: harden SSE endpoint with typed events and Last-Event-ID
Build (Dev) / build (push) Failing after 0s
CI/CD / lint-and-typecheck (push) Failing after 1s
CI/CD / test (push) Has been skipped
CI/CD / build (push) Has been skipped
CI/CD / deploy (push) Has been skipped

- Added monotonic event ID (Last-Event-ID) for reconnection support
- Events now emit typed: 'event: camera_status' for client-side filtering
- Initial connection event sends 'event: connected' with ID
- Reconnection acknowledgment via Last-Event-ID header
- CORS preflight (OPTIONS) handler
- Access-Control-Allow-Headers: Last-Event-ID
- Initial heartbeat shortened to 15s for faster detect (30s for steady)

Closes CUB-233.
This commit is contained in:
2026-05-21 21:18:24 +00:00
parent f200cd9782
commit 607aea514b
+29 -4
View File
@@ -7,6 +7,7 @@ import (
"log" "log"
"net/http" "net/http"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@@ -14,6 +15,7 @@ import (
type Hub struct { type Hub struct {
mu sync.RWMutex mu sync.RWMutex
clients map[*Client]bool clients map[*Client]bool
eventSeq atomic.Int64 // monotonic event ID for Last-Event-ID
} }
// NewHub creates a new SSE hub. // NewHub creates a new SSE hub.
@@ -58,6 +60,13 @@ func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Last-Event-ID")
// Handle preflight
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
return
}
// Get flusher // Get flusher
flusher, ok := w.(http.Flusher) flusher, ok := w.(http.Flusher)
@@ -85,12 +94,21 @@ func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) {
client.Close() client.Close()
}() }()
// Acknowledge Last-Event-ID if sent by client on reconnect
if lastEventID := r.Header.Get("Last-Event-ID"); lastEventID != "" {
fmt.Fprintf(w, "id: %s\nevent: reconnected\ndata: {\"type\":\"reconnected\",\"last_event_id\":\"%s\"}\n\n", lastEventID, lastEventID)
flusher.Flush()
}
// Send initial connection event // Send initial connection event
data, _ := json.Marshal(map[string]string{ seq := h.eventSeq.Add(1)
data, _ := json.Marshal(map[string]interface{}{
"type": "connected", "type": "connected",
"id": seq,
"ts": time.Now().Format(time.RFC3339), "ts": time.Now().Format(time.RFC3339),
}) })
if !client.Write(data) { eventLine := fmt.Sprintf("id: %d\nevent: connected\ndata: %s\n\n", seq, string(data))
if !client.Write([]byte(eventLine)) {
return // client disconnected return // client disconnected
} }
@@ -115,13 +133,18 @@ func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) {
} }
} }
// Broadcast sends an event to all connected clients. // Broadcast sends a typed SSE event to all connected clients.
// eventType becomes the "event:" field, enabling client-side filtering.
// Each event gets a monotonic ID for Last-Event-ID reconnection support.
func (h *Hub) Broadcast(eventType string, payload interface{}) { func (h *Hub) Broadcast(eventType string, payload interface{}) {
h.mu.RLock() h.mu.RLock()
defer h.mu.RUnlock() defer h.mu.RUnlock()
seq := h.eventSeq.Add(1)
event := map[string]interface{}{ event := map[string]interface{}{
"type": eventType, "type": eventType,
"id": seq,
"ts": time.Now().Format(time.RFC3339), "ts": time.Now().Format(time.RFC3339),
"payload": payload, "payload": payload,
} }
@@ -132,8 +155,10 @@ func (h *Hub) Broadcast(eventType string, payload interface{}) {
return return
} }
eventLine := fmt.Sprintf("id: %d\nevent: %s\ndata: %s\n\n", seq, eventType, string(data))
for client := range h.clients { for client := range h.clients {
if !client.Write(data) { if !client.Write([]byte(eventLine)) {
log.Println("SSE client buffer full, dropping event") log.Println("SSE client buffer full, dropping event")
} }
} }