Files
remote-rig/internal/events/sse.go
T

141 lines
2.8 KiB
Go
Raw Normal View History

// Package events provides Server-Sent Events (SSE) for real-time camera status.
package events
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// Hub manages SSE client connections and event broadcasting.
type Hub struct {
mu sync.RWMutex
clients map[*Client]bool
}
// NewHub creates a new SSE hub.
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
}
}
// Handler returns an HTTP handler for SSE connections.
func (h *Hub) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h.serveSSE(w, r)
})
}
// Client represents an SSE client connection.
type Client struct {
done chan struct{}
writes chan []byte
}
// Write sends data to the client.
func (c *Client) Write(data []byte) bool {
select {
case c.writes <- data:
return true
default:
return false // buffer full
}
}
// Close stops the client.
func (c *Client) Close() {
close(c.done)
}
// serveSSE handles SSE connections.
func (h *Hub) serveSSE(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Get flusher
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
// Create client
client := &Client{
done: make(chan struct{}),
writes: make(chan []byte, 256),
}
// Register client
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
// Cleanup on disconnect
defer func() {
h.mu.Lock()
delete(h.clients, client)
h.mu.Unlock()
client.Close()
}()
// Send initial connection event
data, _ := json.Marshal(map[string]string{
"type": "connected",
"ts": time.Now().Format(time.RFC3339),
})
if !client.Write(data) {
return // client disconnected
}
// Heartbeat and write loop
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-client.done:
return
case <-r.Context().Done():
log.Println("SSE client disconnected")
return
case event := <-client.writes:
fmt.Fprintf(w, "data: %s\n\n", string(event))
flusher.Flush()
case <-heartbeat.C:
fmt.Fprint(w, ": heartbeat\n\n")
flusher.Flush()
}
}
}
// Broadcast sends an event to all connected clients.
func (h *Hub) Broadcast(eventType string, payload interface{}) {
h.mu.RLock()
defer h.mu.RUnlock()
event := map[string]interface{}{
"type": eventType,
"ts": time.Now().Format(time.RFC3339),
"payload": payload,
}
data, err := json.Marshal(event)
if err != nil {
log.Printf("SSE marshal error: %v", err)
return
}
for client := range h.clients {
if !client.Write(data) {
log.Println("SSE client buffer full, dropping event")
}
}
}