diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index 5e9cbe5..bdb0778 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -12,6 +12,7 @@ import ( "github.com/CubeCraft-Creations/Extrudex/backend/internal/config" "github.com/CubeCraft-Creations/Extrudex/backend/internal/db" "github.com/CubeCraft-Creations/Extrudex/backend/internal/router" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/sse" ) func main() { @@ -39,15 +40,24 @@ func main() { slog.Info("database connected") + // Create SSE broadcaster and start it + sseBC := sse.NewBroadcaster(128) + sseBC.Start() + defer sseBC.Stop() + + slog.Info("sse broadcaster started") + // Create router - r := router.New(cfg, dbPool) + r := router.New(cfg, dbPool, sseBC) // Create HTTP server + // WriteTimeout is 0 for SSE support — the Chi middleware.Timeout(60s) + // handles request-level timeouts on non-SSE routes. server := &http.Server{ Addr: ":" + cfg.Port, Handler: r, ReadTimeout: 15 * time.Second, - WriteTimeout: 15 * time.Second, + WriteTimeout: 0, // disabled for SSE long-lived connections IdleTimeout: 60 * time.Second, } diff --git a/backend/internal/router/router.go b/backend/internal/router/router.go index d8ce058..61e429f 100644 --- a/backend/internal/router/router.go +++ b/backend/internal/router/router.go @@ -8,13 +8,14 @@ import ( "github.com/CubeCraft-Creations/Extrudex/backend/internal/handlers" "github.com/CubeCraft-Creations/Extrudex/backend/internal/repositories" "github.com/CubeCraft-Creations/Extrudex/backend/internal/services" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/sse" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/jackc/pgx/v5/pgxpool" ) // New creates and configures a Chi router with all middleware and handlers mounted. -func New(cfg *config.Config, dbPool *pgxpool.Pool) chi.Router { +func New(cfg *config.Config, dbPool *pgxpool.Pool, sseBC *sse.Broadcaster) chi.Router { r := chi.NewRouter() // Middleware @@ -22,7 +23,7 @@ func New(cfg *config.Config, dbPool *pgxpool.Pool) chi.Router { r.Use(middleware.RealIP) r.Use(middleware.Logger) r.Use(middleware.Recoverer) - r.Use(middleware.Timeout(60 * time.Second)) + // Timeout middleware is applied per-route below to exclude SSE // CORS r.Use(func(next http.Handler) http.Handler { @@ -38,9 +39,9 @@ func New(cfg *config.Config, dbPool *pgxpool.Pool) chi.Router { }) }) - // Health check + // Health check (with timeout) healthHandler := handlers.NewHealthHandler(dbPool) - r.Get("/health", healthHandler.ServeHTTP) + r.With(middleware.Timeout(30 * time.Second)).Get("/health", healthHandler.ServeHTTP) // ── Repositories ────────────────────────────────────────────────────── materialRepo := repositories.NewMaterialRepository(dbPool) @@ -61,8 +62,9 @@ func New(cfg *config.Config, dbPool *pgxpool.Pool) chi.Router { printJobHandler := handlers.NewPrintJobHandler(printJobService) usageLogHandler := handlers.NewUsageLogHandler(usageLogRepo) - // ── API Routes ──────────────────────────────────────────────────────── + // ── API Routes (with timeout) ───────────────────────────────────────── r.Route("/api", func(r chi.Router) { + r.Use(middleware.Timeout(60 * time.Second)) r.Get("/materials", materialHandler.List) r.Route("/filaments", func(r chi.Router) { @@ -78,6 +80,10 @@ func New(cfg *config.Config, dbPool *pgxpool.Pool) chi.Router { r.Get("/printers", printerHandler.List) r.Get("/print-jobs", printJobHandler.List) r.Get("/usage-logs", usageLogHandler.List) + + // SSE Events stream + sseHandler := sse.NewHandler(sseBC) + r.Get("/events", sseHandler.ServeHTTP) }) return r diff --git a/backend/internal/sse/broadcaster.go b/backend/internal/sse/broadcaster.go new file mode 100644 index 0000000..e1b7cf2 --- /dev/null +++ b/backend/internal/sse/broadcaster.go @@ -0,0 +1,133 @@ +package sse + +import ( + "log/slog" + "sync" +) + +// client represents a single SSE subscriber — identified by its send channel. +type client struct { + ch chan string +} + +// Broadcaster receives Events on its input channel and fans them out to every +// connected client. Subscribe adds a new client; Unsubscribe removes one. +// Start must be called before the broadcaster accepts events. +type Broadcaster struct { + input chan Event + subscribe chan client + unsubscribe chan client + clients map[chan string]struct{} + done chan struct{} + once sync.Once +} + +// NewBroadcaster creates a Broadcaster. bufSize controls the buffer depth for +// the input channel as well as for each per-client outbound channel. +func NewBroadcaster(bufSize int) *Broadcaster { + if bufSize <= 0 { + bufSize = 64 + } + return &Broadcaster{ + input: make(chan Event, bufSize), + subscribe: make(chan client), + unsubscribe: make(chan client), + clients: make(map[chan string]struct{}), + done: make(chan struct{}), + } +} + +// Publish pushes an event into the broadcaster. Safe for concurrent use. +func (b *Broadcaster) Publish(ev Event) { + select { + case b.input <- ev: + case <-b.done: + // Silently drop during shutdown. + } +} + +// Start launches the broadcaster's fan-out loop in a goroutine. +// It must be called before Publish is used. +func (b *Broadcaster) Start() { + go b.loop() +} + +// Stop terminates the fan-out loop and closes all client channels. +// It is safe to call multiple times. +func (b *Broadcaster) Stop() { + b.once.Do(func() { + close(b.done) + }) +} + +// Subscribe returns a new client channel that receives SSE-formatted strings. +func (b *Broadcaster) Subscribe() chan string { + c := client{ch: make(chan string, 64)} + select { + case b.subscribe <- c: + case <-b.done: + // Broadcaster already stopped — return a closed chan so the handler + // can bail out quickly. + ch := make(chan string) + close(ch) + return ch + } + return c.ch +} + +// Unsubscribe removes a client channel and closes it. +func (b *Broadcaster) Unsubscribe(ch chan string) { + c := client{ch: ch} + select { + case b.unsubscribe <- c: + case <-b.done: + // Already shutting down — channels will be cleaned up by Stop. + } +} + +// loop is the core fan-out goroutine. +func (b *Broadcaster) loop() { + for { + select { + case ev := <-b.input: + sse := ev.toSSE() + for ch := range b.clients { + // Non-blocking send — slow clients are dropped. + select { + case ch <- sse: + default: + slog.Warn("sse broadcaster: dropping event for slow client", "type", ev.Type) + } + } + + case c := <-b.subscribe: + b.clients[c.ch] = struct{}{} + slog.Debug("sse broadcaster: client connected", "total_clients", len(b.clients)) + + case c := <-b.unsubscribe: + if _, ok := b.clients[c.ch]; ok { + delete(b.clients, c.ch) + close(c.ch) + slog.Debug("sse broadcaster: client disconnected", "total_clients", len(b.clients)) + } + + case <-b.done: + // Drain remaining events in input before shutting down. + for ev := range b.input { + sse := ev.toSSE() + for ch := range b.clients { + select { + case ch <- sse: + default: + } + } + } + // Close all remaining client channels. + for ch := range b.clients { + close(ch) + } + b.clients = nil + return + } + } +} diff --git a/backend/internal/sse/events.go b/backend/internal/sse/events.go new file mode 100644 index 0000000..76d41ac --- /dev/null +++ b/backend/internal/sse/events.go @@ -0,0 +1,92 @@ +// Package sse provides Server-Sent Events infrastructure for real-time updates. +// Includes event types, a central broadcaster, and an HTTP handler. +package sse + +import ( + "encoding/json" + "time" +) + +// EventType identifies the category of an SSE event. +type EventType string + +const ( + EventPrinterStatus EventType = "printer.status" + EventJobStarted EventType = "job.started" + EventJobCompleted EventType = "job.completed" + EventFilamentLow EventType = "filament.low" +) + +// Event is a JSON-serializable SSE event pushed through the broadcaster. +type Event struct { + Type EventType `json:"type"` + Payload json.RawMessage `json:"payload"` + Timestamp time.Time `json:"timestamp"` +} + +// PrinterStatusPayload carries printer online/offline/printing state. +type PrinterStatusPayload struct { + PrinterID int `json:"printer_id"` + PrinterName string `json:"printer_name"` + Status string `json:"status"` // "online", "offline", "printing" +} + +// JobStartedPayload carries initial print job info. +type JobStartedPayload struct { + JobID int `json:"job_id"` + JobName string `json:"job_name"` + PrinterID int `json:"printer_id"` + SpoolID *int `json:"spool_id,omitempty"` +} + +// JobCompletedPayload carries final print job data including usage. +type JobCompletedPayload struct { + JobID int `json:"job_id"` + JobName string `json:"job_name"` + PrinterID int `json:"printer_id"` + DurationSeconds *int `json:"duration_seconds,omitempty"` + TotalGramsUsed *float64 `json:"total_grams_used,omitempty"` + TotalCostUSD *float64 `json:"total_cost_usd,omitempty"` +} + +// FilamentLowPayload alerts that a spool is below its threshold. +type FilamentLowPayload struct { + SpoolID int `json:"spool_id"` + SpoolName string `json:"spool_name"` + RemainingGrams int `json:"remaining_grams"` + ThresholdGrams int `json:"threshold_grams"` +} + +// NewEvent creates an Event with the current timestamp from a typed payload. +func NewEvent(eventType EventType, payload any) (Event, error) { + raw, err := json.Marshal(payload) + if err != nil { + return Event{}, err + } + return Event{ + Type: eventType, + Payload: raw, + Timestamp: time.Now().UTC(), + }, nil +} + +// MustEvent creates an Event and panics on marshal failure (for use with +// known-good payloads in tests and internal wiring). +func MustEvent(eventType EventType, payload any) Event { + ev, err := NewEvent(eventType, payload) + if err != nil { + panic("sse.MustEvent: failed to marshal payload: " + err.Error()) + } + return ev +} + +// toSSE formats this Event as a standard SSE message string ready to be +// written to a response writer. The format is: +// +// event: +// data: +// +func (e Event) toSSE() string { + data, _ := json.Marshal(e) + return "event: " + string(e.Type) + "\n" + "data: " + string(data) + "\n\n" +} diff --git a/backend/internal/sse/handler.go b/backend/internal/sse/handler.go new file mode 100644 index 0000000..26b42c3 --- /dev/null +++ b/backend/internal/sse/handler.go @@ -0,0 +1,59 @@ +package sse + +import ( + "net/http" +) + +// Handler is the HTTP handler for the GET /api/events SSE stream. +// It registers a client with the broadcaster, streams events as they arrive, +// and unregisters on disconnect. +type Handler struct { + bc *Broadcaster +} + +// NewHandler creates a Handler backed by the given Broadcaster. +func NewHandler(bc *Broadcaster) *Handler { + return &Handler{bc: bc} +} + +// ServeHTTP implements the SSE streaming endpoint. +// Flusher is required; clients that do not support flushing receive a 501. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusNotImplemented) + return + } + + // SSE-specific headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering + + // Write headers immediately + flusher.Flush() + + // Subscribe to the broadcaster + ch := h.bc.Subscribe() + defer h.bc.Unsubscribe(ch) + + // Use request context for cancellation when the client disconnects. + ctx := r.Context() + + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-ch: + if !ok { + return + } + _, err := w.Write([]byte(msg)) + if err != nil { + return + } + flusher.Flush() + } + } +}