From 38722e54e63d65ff631995a0d3eec91c73127016 Mon Sep 17 00:00:00 2001 From: Joshua Date: Tue, 12 May 2026 01:02:49 -0400 Subject: [PATCH] CUB-117: Port Moonraker + MQTT printer integrations to Go - Moonraker REST client with GetPrinterInfo, GetPrintStats, GetPrintHistory - Moonraker WebSocket client with auto-reconnect + telemetry parsing - MQTT client via paho.mqtt.golang with TLS support for Bambu Lab - Moonraker poller worker: background polling, dedup, usage logging to PostgreSQL - MQTT subscriber worker: Bambu telemetry parsing, print job tracking - Config: 7 new env vars (MOONRAKER_URL, MQTT_BROKER, etc.) - main.go: per-printer worker discovery, graceful shutdown --- backend/cmd/server/main.go | 151 +++++++++-- backend/go.mod | 13 +- backend/go.sum | 18 +- backend/internal/clients/moonraker.go | 171 +++++++++++++ backend/internal/clients/moonraker_ws.go | 229 +++++++++++++++++ backend/internal/clients/mqtt.go | 183 +++++++++++++ backend/internal/config/config.go | 11 + backend/internal/workers/moonraker_poller.go | 255 +++++++++++++++++++ backend/internal/workers/mqtt_subscriber.go | 170 +++++++++++++ 9 files changed, 1169 insertions(+), 32 deletions(-) create mode 100644 backend/internal/clients/moonraker.go create mode 100644 backend/internal/clients/moonraker_ws.go create mode 100644 backend/internal/clients/mqtt.go create mode 100644 backend/internal/workers/moonraker_poller.go create mode 100644 backend/internal/workers/mqtt_subscriber.go diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index bdb0778..8b711d0 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -6,31 +6,32 @@ import ( "net/http" "os" "os/signal" + "strconv" + "sync" "syscall" "time" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/clients" "github.com/CubeCraft-Creations/Extrudex/backend/internal/config" "github.com/CubeCraft-Creations/Extrudex/backend/internal/db" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/models" "github.com/CubeCraft-Creations/Extrudex/backend/internal/router" "github.com/CubeCraft-Creations/Extrudex/backend/internal/sse" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/workers" + "github.com/jackc/pgx/v5/pgxpool" ) func main() { - // Setup structured logging slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, }))) - // Load configuration cfg, err := config.Load() if err != nil { slog.Error("failed to load config", "error", err) os.Exit(1) } - slog.Info("config loaded", "port", cfg.Port, "cors_origin", cfg.CorsOrigin) - - // Connect to database dbPool, err := db.NewPool(cfg.DatabaseURL) if err != nil { slog.Error("failed to connect to database", "error", err) @@ -38,30 +39,38 @@ func main() { } defer db.ClosePool(dbPool) - slog.Info("database connected") - - // Create SSE broadcaster and start it sseBC := sse.NewBroadcaster(128) sseBC.Start() defer sseBC.Stop() - slog.Info("sse broadcaster started") - - // Create router r := router.New(cfg, dbPool, sseBC) - // Create HTTP server - // WriteTimeout is 0 for SSE support — the Chi middleware.Timeout(60s) - // handles request-level timeouts on non-SSE routes. + // ── Workers ───────────────────────────────────────────────────────── + + var wg sync.WaitGroup + workersCtx, cancelWorkers := context.WithCancel(context.Background()) + defer cancelWorkers() + + pollInterval, _ := time.ParseDuration(cfg.MoonrakerPollInterval) + if pollInterval <= 0 { + pollInterval = 10 * time.Second + } + + activePrinters := listActivePrinters(workersCtx, dbPool) + for _, p := range activePrinters { + startWorkerForPrinter(workersCtx, &wg, cfg, dbPool, p, pollInterval) + } + + // ── HTTP server ───────────────────────────────────────────────────── + server := &http.Server{ Addr: ":" + cfg.Port, Handler: r, ReadTimeout: 15 * time.Second, - WriteTimeout: 0, // disabled for SSE long-lived connections + WriteTimeout: 0, IdleTimeout: 60 * time.Second, } - // Start server in goroutine go func() { slog.Info("server starting", "addr", server.Addr) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { @@ -70,21 +79,119 @@ func main() { } }() - // Wait for shutdown signal quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit slog.Info("server shutting down") + cancelWorkers() - // Graceful shutdown - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() - if err := server.Shutdown(ctx); err != nil { + if err := server.Shutdown(shutdownCtx); err != nil { slog.Error("server shutdown error", "error", err) } - db.ClosePool(dbPool) + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + slog.Info("all workers stopped") + case <-time.After(15 * time.Second): + slog.Warn("timed out waiting for workers to stop") + } + slog.Info("server stopped") } + +func listActivePrinters(ctx context.Context, pool *pgxpool.Pool) []models.Printer { + rows, err := pool.Query(ctx, ` + SELECT id, name, printer_type_id, + manufacturer, model, + moonraker_url, moonraker_api_key, + mqtt_broker_host, mqtt_topic_prefix, + mqtt_tls_enabled, is_active, + created_at, updated_at + FROM printers WHERE is_active = TRUE ORDER BY name + `) + if err != nil { + slog.Warn("failed to query active printers", "error", err) + return nil + } + defer rows.Close() + + var printers []models.Printer + for rows.Next() { + var p models.Printer + if err := rows.Scan( + &p.ID, &p.Name, &p.PrinterTypeID, + &p.Manufacturer, &p.Model, + &p.MoonrakerURL, &p.MoonrakerAPIKey, + &p.MQTTBrokerHost, &p.MQTTTopicPrefix, + &p.MQTTTLSEnabled, &p.IsActive, + &p.CreatedAt, &p.UpdatedAt, + ); err != nil { + slog.Warn("failed to scan printer row", "error", err) + continue + } + printers = append(printers, p) + } + return printers +} + +func startWorkerForPrinter( + ctx context.Context, + wg *sync.WaitGroup, + cfg *config.Config, + pool *pgxpool.Pool, + printer models.Printer, + pollInterval time.Duration, +) { + if printer.MoonrakerURL != nil && *printer.MoonrakerURL != "" { + mc := clients.NewMoonrakerClient(*printer.MoonrakerURL) + poller := workers.NewMoonrakerPoller(workers.MoonrakerPollerConfig{ + Client: mc, + Pool: pool, + PollInterval: pollInterval, + PrinterID: printer.ID, + PrinterName: printer.Name, + }) + wg.Add(1) + go func() { + defer wg.Done() + poller.Run(ctx) + }() + } + + if printer.MQTTBrokerHost != nil && *printer.MQTTBrokerHost != "" { + topicPrefix := cfg.MQTTTopicPrefix + if printer.MQTTTopicPrefix != nil && *printer.MQTTTopicPrefix != "" { + topicPrefix = *printer.MQTTTopicPrefix + } + sub := workers.NewMQTTSubscriber(workers.MQTTSubscriberConfig{ + Pool: pool, + PrinterID: printer.ID, + PrinterName: printer.Name, + }) + mqttClient := clients.NewMQTTClient(clients.MQTTConfig{ + Broker: *printer.MQTTBrokerHost, + ClientID: cfg.MQTTClientID + "-p" + strconv.Itoa(printer.ID), + TopicPrefix: topicPrefix, + TLSCert: cfg.MQTTTLSCert, + TLSKey: cfg.MQTTTLSKey, + Handler: sub.HandleBambuReport, + }) + sub.Client = mqttClient + wg.Add(1) + go func() { + defer wg.Done() + if err := sub.Run(ctx); err != nil { + slog.Error("mqtt subscriber error", "printer_id", printer.ID, "error", err) + } + }() + } +} diff --git a/backend/go.mod b/backend/go.mod index f22ab99..731b5c6 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -1,9 +1,13 @@ module github.com/CubeCraft-Creations/Extrudex/backend -go 1.24 +go 1.24.0 + +toolchain go1.24.2 require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/go-chi/chi/v5 v5.2.0 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.7.4 github.com/kelseyhightower/envconfig v1.4.0 ) @@ -12,7 +16,8 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - golang.org/x/crypto v0.31.0 // indirect - golang.org/x/sync v0.10.0 // indirect - golang.org/x/text v0.21.0 // indirect + golang.org/x/crypto v0.42.0 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect ) diff --git a/backend/go.sum b/backend/go.sum index 6f9c4a1..50903fb 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -1,8 +1,12 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0= github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -20,12 +24,14 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/backend/internal/clients/moonraker.go b/backend/internal/clients/moonraker.go new file mode 100644 index 0000000..2e7e49c --- /dev/null +++ b/backend/internal/clients/moonraker.go @@ -0,0 +1,171 @@ +// Package clients provides client implementations for printer integrations: +// Moonraker REST + WebSocket (Klipper-based printers) and MQTT (Bambu Lab). +package clients + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// ── Moonraker response types ──────────────────────────────────────────────── + +// moonrakerRPC is the generic JSON-RPC wrapper Moonraker uses for responses. +type moonrakerRPC struct { + Result json.RawMessage `json:"result"` + Error *moonrakerError `json:"error"` +} + +type moonrakerError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// ── Public DTOs ───────────────────────────────────────────────────────────── + +// MoonrakerPrinterInfo represents the /printer/info response. +type MoonrakerPrinterInfo struct { + State string `json:"state"` + StateMessage string `json:"state_message"` + KlippyReady bool `json:"klippy_ready"` +} + +// MoonrakerPrintStats represents the print_stats object from +// /printer/objects/query?print_stats. +type MoonrakerPrintStats struct { + State string `json:"state"` + Filename *string `json:"filename"` + FilamentUsedMm float64 `json:"filament_used"` + PrintDuration float64 `json:"print_duration"` + Message *string `json:"message"` +} + +// MoonrakerPrintJob represents a single entry in /server/history/items. +type MoonrakerPrintJob struct { + JobID string `json:"job_id"` + Filename string `json:"filename"` + Status string `json:"status"` + FilamentUsedMm float64 `json:"filament_used"` + PrintDuration float64 `json:"print_duration"` + TotalDuration float64 `json:"total_duration"` + StartTime *float64 `json:"start_time"` + EndTime *float64 `json:"end_time"` + Metadata map[string]interface{} `json:"metadata"` +} + +// MoonrakerHistoryResponse wraps the /server/history/items response. +type MoonrakerHistoryResponse struct { + Items []MoonrakerPrintJob `json:"items"` + TotalCount int `json:"count"` +} + +// ── Client ────────────────────────────────────────────────────────────────── + +// MoonrakerClient is an HTTP client for the Moonraker REST API on +// Klipper-based printers (e.g., Elegoo Centauri Carbon). +type MoonrakerClient struct { + baseURL string + httpClient *http.Client +} + +// NewMoonrakerClient creates a MoonrakerClient that targets the given +// base URL (e.g., "http://192.168.1.50:7125"). The internal HTTP client +// uses a 15-second timeout. +func NewMoonrakerClient(baseURL string) *MoonrakerClient { + baseURL = strings.TrimRight(baseURL, "/") + return &MoonrakerClient{ + baseURL: baseURL, + httpClient: &http.Client{ + Timeout: 15 * time.Second, + }, + } +} + +// GetPrinterInfo calls GET /printer/info and returns the Klipper state. +// Returns nil when the printer is unreachable or the response cannot be parsed. +func (c *MoonrakerClient) GetPrinterInfo(ctx context.Context) (*MoonrakerPrinterInfo, error) { + var info MoonrakerPrinterInfo + if err := c.getJSON(ctx, "/printer/info", &info); err != nil { + return nil, err + } + return &info, nil +} + +// GetPrintStats calls GET /printer/objects/query?print_stats and returns +// real-time print statistics including filament consumption. +// Returns nil when no print is active or the printer is unreachable. +func (c *MoonrakerClient) GetPrintStats(ctx context.Context) (*MoonrakerPrintStats, error) { + var stats MoonrakerPrintStats + // Moonraker wraps the object in status.print_stats + var wrapper struct { + Status struct { + PrintStats MoonrakerPrintStats `json:"print_stats"` + } `json:"status"` + } + if err := c.getJSON(ctx, "/printer/objects/query?print_stats", &wrapper); err != nil { + return nil, err + } + stats = wrapper.Status.PrintStats + return &stats, nil +} + +// GetPrintHistory calls GET /server/history/items and returns recent print +// jobs. limit controls the maximum number of items (clamped 1-100). +func (c *MoonrakerClient) GetPrintHistory(ctx context.Context, limit int) (*MoonrakerHistoryResponse, error) { + if limit < 1 { + limit = 1 + } + if limit > 100 { + limit = 100 + } + var history MoonrakerHistoryResponse + if err := c.getJSON(ctx, fmt.Sprintf("/server/history/items?limit=%d", limit), &history); err != nil { + return nil, err + } + return &history, nil +} + +// ── Internal helpers ──────────────────────────────────────────────────────── + +func (c *MoonrakerClient) getJSON(ctx context.Context, path string, target interface{}) error { + url := c.baseURL + path + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("moonraker: failed to build request: %w", err) + } + req.Header.Set("Accept", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("moonraker: request failed (%s): %w", url, err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("moonraker: failed to read body: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("moonraker: %s returned HTTP %d: %s", url, resp.StatusCode, string(body)) + } + + // Moonraker wraps responses in {"result": ...} + var rpc moonrakerRPC + if err := json.Unmarshal(body, &rpc); err != nil { + return fmt.Errorf("moonraker: failed to parse response: %w", err) + } + if rpc.Error != nil && rpc.Error.Message != "" { + return fmt.Errorf("moonraker: api error: %s", rpc.Error.Message) + } + + if err := json.Unmarshal(rpc.Result, target); err != nil { + return fmt.Errorf("moonraker: failed to unmarshal result: %w (raw: %s)", err, string(rpc.Result)) + } + return nil +} diff --git a/backend/internal/clients/moonraker_ws.go b/backend/internal/clients/moonraker_ws.go new file mode 100644 index 0000000..4db7392 --- /dev/null +++ b/backend/internal/clients/moonraker_ws.go @@ -0,0 +1,229 @@ +package clients + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// ── WebSocket message types ───────────────────────────────────────────────── + +// moonrakerWSMessage is a single JSON-RPC frame from the Moonraker WebSocket. +type moonrakerWSMessage struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID *int `json:"id"` +} + +// MoonrakerPrintEvent is the payload delivered by the "notify_status_update" +// subscription when print_stats or display_status change. +type MoonrakerPrintEvent struct { + PrintStats *MoonrakerPrintStats `json:"print_stats"` + DisplayStatus *MoonrakerDisplayStatus `json:"display_status"` +} + +// MoonrakerDisplayStatus carries progress and the LCD message. +type MoonrakerDisplayStatus struct { + Progress float64 `json:"progress"` + Message string `json:"message"` +} + +// MoonrakerStatusHandler is called for every status update received from the +// Moonraker WebSocket. It receives the parsed event and the raw JSON. +type MoonrakerStatusHandler func(event MoonrakerPrintEvent) error + +// ── WebSocket client ──────────────────────────────────────────────────────── + +// MoonrakerWSClient maintains a persistent WebSocket connection to the +// Moonraker server and delivers parsed status updates to a handler. +type MoonrakerWSClient struct { + wsURL string + handler MoonrakerStatusHandler + dialer *websocket.Dialer + + mu sync.Mutex + conn *websocket.Conn + done chan struct{} + once sync.Once +} + +// NewMoonrakerWSClient creates a WebSocket client for the given Moonraker base +// URL. The handler is invoked on every status update. +func NewMoonrakerWSClient(baseURL string, handler MoonrakerStatusHandler) *MoonrakerWSClient { + baseURL = strings.TrimRight(baseURL, "/") + wsURL := strings.Replace(baseURL, "http://", "ws://", 1) + wsURL = strings.Replace(wsURL, "https://", "wss://", 1) + wsURL += "/websocket" + + return &MoonrakerWSClient{ + wsURL: wsURL, + handler: handler, + dialer: &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 10 * time.Second, + }, + done: make(chan struct{}), + } +} + +// Connect establishes the WebSocket, subscribes to status updates, and +// starts the read loop in a background goroutine. It retries on failure +// with exponential backoff up to a 60-second cap. +func (c *MoonrakerWSClient) Connect(ctx context.Context) { + go c.run(ctx) +} + +// Shutdown gracefully closes the WebSocket and stops the read loop. +func (c *MoonrakerWSClient) Shutdown() { + c.once.Do(func() { + close(c.done) + }) + c.mu.Lock() + defer c.mu.Unlock() + if c.conn != nil { + c.conn.Close() + c.conn = nil + } +} + +// run is the main connection loop with reconnect backoff. +func (c *MoonrakerWSClient) run(ctx context.Context) { + backoff := 1 * time.Second + const maxBackoff = 60 * time.Second + + for { + select { + case <-ctx.Done(): + slog.Info("moonraker ws: context cancelled, stopping") + return + case <-c.done: + slog.Info("moonraker ws: shutdown requested") + return + default: + } + + if err := c.connectAndRead(ctx); err != nil { + slog.Error("moonraker ws: connection error, retrying", "error", err, "backoff", backoff) + } + + // Exponential backoff. + select { + case <-ctx.Done(): + return + case <-c.done: + return + case <-time.After(backoff): + } + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + +func (c *MoonrakerWSClient) connectAndRead(ctx context.Context) error { + slog.Info("moonraker ws: connecting", "url", c.wsURL) + + conn, _, err := c.dialer.DialContext(ctx, c.wsURL, nil) + if err != nil { + return fmt.Errorf("dial failed: %w", err) + } + + c.mu.Lock() + if c.conn != nil { + c.conn.Close() + } + c.conn = conn + c.mu.Unlock() + + defer func() { + c.mu.Lock() + if c.conn == conn { + c.conn = nil + } + c.mu.Unlock() + conn.Close() + }() + + // Subscribe to status updates. + subReq := map[string]interface{}{ + "jsonrpc": "2.0", + "method": "printer.objects.subscribe", + "params": map[string]interface{}{ + "objects": map[string]interface{}{ + "print_stats": nil, + "display_status": nil, + }, + }, + "id": 1, + } + if err := conn.WriteJSON(subReq); err != nil { + return fmt.Errorf("subscribe failed: %w", err) + } + + slog.Info("moonraker ws: subscribed to status updates") + + // Set read deadline to detect stale connections. + // 120s is long enough to avoid false positives. + pingPeriod := 60 * time.Second + + for { + // Set read deadline. + if err := conn.SetReadDeadline(time.Now().Add(150 * time.Second)); err != nil { + return fmt.Errorf("set read deadline: %w", err) + } + + _, raw, err := conn.ReadMessage() + if err != nil { + return fmt.Errorf("read message: %w", err) + } + + // Send periodic pings to keep the connection alive. + go func() { + time.Sleep(pingPeriod) + c.mu.Lock() + if c.conn == conn { + c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)) + } + c.mu.Unlock() + }() + + var msg moonrakerWSMessage + if err := json.Unmarshal(raw, &msg); err != nil { + slog.Warn("moonraker ws: failed to parse message", "error", err) + continue + } + + // Only process notify_status_update messages. + if msg.Method != "notify_status_update" { + continue + } + + var statusWrapper []MoonrakerPrintEvent + if err := json.Unmarshal(msg.Params, &statusWrapper); err != nil { + // Params might be an object, not an array. + var singleEvent MoonrakerPrintEvent + if err2 := json.Unmarshal(msg.Params, &singleEvent); err2 != nil { + slog.Warn("moonraker ws: failed to unmarshal status params", "error", err2) + continue + } + statusWrapper = []MoonrakerPrintEvent{singleEvent} + } + + for _, ev := range statusWrapper { + if c.handler != nil { + if err := c.handler(ev); err != nil { + slog.Error("moonraker ws: handler error", "error", err) + } + } + } + } +} diff --git a/backend/internal/clients/mqtt.go b/backend/internal/clients/mqtt.go new file mode 100644 index 0000000..a938f74 --- /dev/null +++ b/backend/internal/clients/mqtt.go @@ -0,0 +1,183 @@ +package clients + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// ── Bambu Lab telemetry types ─────────────────────────────────────────────── + +// BambuPrintReport is the JSON payload published by Bambu Lab printers +// on the MQTT report topic. The structure varies by printer model; +// we extract the common fields needed for filament tracking. +type BambuPrintReport struct { + // Print holds the active print job data. + Print BambuPrintData `json:"print"` + + // VtTray contains AMS tray info; the extruded length is per-tray. + VtTray *BambuVtTray `json:"vt_tray,omitempty"` +} + +// BambuPrintData carries the active print state from a Bambu report. +type BambuPrintData struct { + // GcodeFile is the filename being printed. + GcodeFile string `json:"gcode_file"` + // GcodeState describes the current print state: + // "IDLE", "RUNNING", "PAUSE", "FINISH", "FAILED". + GcodeState string `json:"gcode_state"` + // McPercent is the progress as a percentage (0-100). + McPercent int `json:"mc_percent"` + // McRemainingTime is the estimated remaining time in minutes. + McRemainingTime int `json:"mc_remaining_time"` +} + +// BambuVtTray holds AMS tray telemetry from Bambu printers. +type BambuVtTray struct { + ID string `json:"id"` + TagUID string `json:"tag_uid"` + TrayIDName string `json:"tray_id_name"` + // TrayInfoIdx is the hex color code for the tray's filament. + TrayInfoIdx string `json:"tray_info_idx"` + // TrayColor is a hex color string like "FF0000FF". + TrayColor string `json:"tray_color"` + // Remain is the percentage of filament remaining on this tray (0-100). + Remain int `json:"remain"` + // K is a temperature coefficient. + K float64 `json:"k"` + // N is a second temperature coefficient. + N float64 `json:"n"` +} + +// BambuReportHandler is called for each parsed Bambu telemetry message. +type BambuReportHandler func(report BambuPrintReport) error + +// ── MQTT client ───────────────────────────────────────────────────────────── + +// MQTTClient wraps the Eclipse Paho MQTT client for Bambu Lab printer +// telemetry with optional TLS support. +type MQTTClient struct { + broker string + clientID string + topicPrefix string + tlsCert string + tlsKey string + handler BambuReportHandler + + mu sync.Mutex + client mqtt.Client +} + +// MQTTConfig holds the configuration for creating an MQTTClient. +type MQTTConfig struct { + Broker string // e.g., "ssl://192.168.1.50:8883" + ClientID string // unique MQTT client id, defaults to "extrudex" + TopicPrefix string // topic prefix, defaults to "device/+/report" + TLSCert string // path to TLS client certificate (optional) + TLSKey string // path to TLS client key (optional) + Handler BambuReportHandler +} + +// NewMQTTClient creates a new MQTTClient. The connection is not established +// until Connect is called. +func NewMQTTClient(cfg MQTTConfig) *MQTTClient { + if cfg.ClientID == "" { + cfg.ClientID = "extrudex" + } + if cfg.TopicPrefix == "" { + cfg.TopicPrefix = "device/+/report" + } + return &MQTTClient{ + broker: cfg.Broker, + clientID: cfg.ClientID, + topicPrefix: cfg.TopicPrefix, + tlsCert: cfg.TLSCert, + tlsKey: cfg.TLSKey, + handler: cfg.Handler, + } +} + +// Connect establishes the MQTT connection and subscribes to the configured +// topic prefix. Returns an error if the initial connection fails. +func (c *MQTTClient) Connect() error { + opts := mqtt.NewClientOptions(). + AddBroker(c.broker). + SetClientID(c.clientID). + SetAutoReconnect(true). + SetMaxReconnectInterval(30 * time.Second). + SetKeepAlive(30 * time.Second). + SetPingTimeout(10 * time.Second). + SetConnectTimeout(15 * time.Second). + SetOnConnectHandler(func(client mqtt.Client) { + slog.Info("mqtt: connected", "broker", c.broker) + // Subscribe on every reconnect. + token := client.Subscribe(c.topicPrefix, 0, c.messageHandler) + token.Wait() + if err := token.Error(); err != nil { + slog.Error("mqtt: subscribe failed on reconnect", "topic", c.topicPrefix, "error", err) + } else { + slog.Info("mqtt: subscribed", "topic", c.topicPrefix) + } + }). + SetConnectionLostHandler(func(client mqtt.Client, err error) { + slog.Warn("mqtt: connection lost", "error", err) + }) + + // Configure TLS if cert and key are provided. + if c.tlsCert != "" && c.tlsKey != "" { + cert, err := tls.LoadX509KeyPair(c.tlsCert, c.tlsKey) + if err != nil { + return fmt.Errorf("mqtt: failed to load TLS cert/key: %w", err) + } + opts.SetTLSConfig(&tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS12, + }) + slog.Info("mqtt: TLS configured", "cert", c.tlsCert) + } + + c.client = mqtt.NewClient(opts) + token := c.client.Connect() + if !token.WaitTimeout(15 * time.Second) { + return fmt.Errorf("mqtt: connect timed out to %s", c.broker) + } + if err := token.Error(); err != nil { + return fmt.Errorf("mqtt: connect failed: %w", err) + } + + slog.Info("mqtt: initial connection established", "broker", c.broker) + return nil +} + +// Disconnect gracefully closes the MQTT connection. +func (c *MQTTClient) Disconnect() { + c.mu.Lock() + defer c.mu.Unlock() + if c.client != nil && c.client.IsConnected() { + c.client.Disconnect(2500) // wait up to 2.5s + slog.Info("mqtt: disconnected") + } +} + +// messageHandler is the MQTT callback invoked for every message received on +// the subscribed topic. +func (c *MQTTClient) messageHandler(_ mqtt.Client, msg mqtt.Message) { + if c.handler == nil { + return + } + + var report BambuPrintReport + if err := json.Unmarshal(msg.Payload(), &report); err != nil { + slog.Warn("mqtt: failed to parse bambu report", "topic", msg.Topic(), "error", err) + return + } + + if err := c.handler(report); err != nil { + slog.Error("mqtt: handler error", "topic", msg.Topic(), "error", err) + } +} diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 64c4bd8..331bebc 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -12,6 +12,17 @@ type Config struct { Port string `envconfig:"port" default:"8080"` CorsOrigin string `envconfig:"cors_origin" default:"*"` LogLevel string `envconfig:"log_level" default:"info"` + + // Moonraker integration. + MoonrakerURL string `envconfig:"moonraker_url" default:"http://localhost:7125"` + MoonrakerPollInterval string `envconfig:"moonraker_poll_interval" default:"10s"` + + // MQTT (Bambu Lab) integration. + MQTTBroker string `envconfig:"mqtt_broker" default:"localhost:1883"` + MQTTTopicPrefix string `envconfig:"mqtt_topic_prefix" default:"device/+/report"` + MQTTClientID string `envconfig:"mqtt_client_id" default:"extrudex"` + MQTTTLSCert string `envconfig:"mqtt_tls_cert" default:""` + MQTTTLSKey string `envconfig:"mqtt_tls_key" default:""` } // Load reads configuration from environment variables and returns a populated Config. diff --git a/backend/internal/workers/moonraker_poller.go b/backend/internal/workers/moonraker_poller.go new file mode 100644 index 0000000..e299732 --- /dev/null +++ b/backend/internal/workers/moonraker_poller.go @@ -0,0 +1,255 @@ +package workers + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/CubeCraft-Creations/Extrudex/backend/internal/clients" + "github.com/jackc/pgx/v5/pgxpool" +) + +// ── deduplication ─────────────────────────────────────────────────────────── + +// jobTrack holds the last-seen filename and filament_used for dedup. +type jobTrack struct { + filename string + filamentUsed float64 +} + +// MoonrakerPoller periodically queries the Moonraker REST API for print stats +// and logs filament usage to PostgreSQL. It deduplicates by tracking the +// last-known filament_used value for the active job on this printer. +type MoonrakerPoller struct { + client *clients.MoonrakerClient + pool *pgxpool.Pool + pollInterval time.Duration + printerID int + printerName string + + mu sync.Mutex + track jobTrack +} + +// MoonrakerPollerConfig holds configuration for the Moonraker polling worker. +type MoonrakerPollerConfig struct { + Client *clients.MoonrakerClient + Pool *pgxpool.Pool + PollInterval time.Duration + PrinterID int + PrinterName string +} + +// NewMoonrakerPoller creates a new MoonrakerPoller worker. +func NewMoonrakerPoller(cfg MoonrakerPollerConfig) *MoonrakerPoller { + if cfg.PollInterval <= 0 { + cfg.PollInterval = 10 * time.Second + } + return &MoonrakerPoller{ + client: cfg.Client, + pool: cfg.Pool, + pollInterval: cfg.PollInterval, + printerID: cfg.PrinterID, + printerName: cfg.PrinterName, + } +} + +// Run starts the polling loop. It blocks until ctx is cancelled. +func (w *MoonrakerPoller) Run(ctx context.Context) { + slog.Info("moonraker poller: starting", + "printer_id", w.printerID, + "printer_name", w.printerName, + "interval", w.pollInterval, + ) + + ticker := time.NewTicker(w.pollInterval) + defer ticker.Stop() + + w.poll(ctx) + + for { + select { + case <-ctx.Done(): + slog.Info("moonraker poller: stopping", "printer_id", w.printerID) + return + case <-ticker.C: + w.poll(ctx) + } + } +} + +func (w *MoonrakerPoller) poll(ctx context.Context) { + stats, err := w.client.GetPrintStats(ctx) + if err != nil { + slog.Warn("moonraker poller: failed to get print stats", + "printer_id", w.printerID, "error", err) + return + } + + if stats.State == "" { + return + } + + jobName := "unknown" + if stats.Filename != nil { + jobName = *stats.Filename + } + + // Compute delta under lock; release before I/O. + w.mu.Lock() + prevName := w.track.filename + prevUsed := w.track.filamentUsed + + if jobName != prevName { + w.track.filename = jobName + w.track.filamentUsed = 0 + prevUsed = 0 + } + + deltaMM := stats.FilamentUsedMm - prevUsed + totalMM := stats.FilamentUsedMm + if deltaMM <= 0 && jobName == prevName { + w.mu.Unlock() + return + } + w.mu.Unlock() + + slog.Info("moonraker poller: filament usage", + "printer_id", w.printerID, + "job", jobName, + "delta_mm", deltaMM, + "total_mm", totalMM, + "state", stats.State, + ) + + jobID, err := w.ensurePrintJob(ctx, jobName, stats.State) + if err != nil { + slog.Error("moonraker poller: failed to ensure print job", + "printer_id", w.printerID, "error", err) + return + } + + spoolID, density := lookupActiveSpool(ctx, w.pool, w.printerID) + + if err := insertUsageLog(ctx, w.pool, jobID, spoolID, deltaMM, density); err != nil { + slog.Error("moonraker poller: failed to log usage", + "printer_id", w.printerID, "error", err) + return + } + + w.mu.Lock() + w.track.filamentUsed = totalMM + w.mu.Unlock() +} + +func (w *MoonrakerPoller) ensurePrintJob(ctx context.Context, jobName, state string) (int, error) { + var jobID int + err := w.pool.QueryRow(ctx, ` + SELECT pj.id FROM print_jobs pj + JOIN job_statuses js ON pj.job_status_id = js.id + WHERE pj.printer_id = $1 + AND pj.job_name = $2 + AND pj.deleted_at IS NULL + AND js.name IN ('printing', 'pending') + ORDER BY pj.created_at DESC + LIMIT 1 + `, w.printerID, jobName).Scan(&jobID) + + if err == nil { + _, _ = w.pool.Exec(ctx, ` + UPDATE print_jobs SET + job_status_id = (SELECT id FROM job_statuses WHERE name = 'printing'), + started_at = COALESCE(started_at, NOW()), + updated_at = NOW() + WHERE id = $1 + AND job_status_id = (SELECT id FROM job_statuses WHERE name = 'pending') + `, jobID) + return jobID, nil + } + + var statusID int + err = w.pool.QueryRow(ctx, `SELECT id FROM job_statuses WHERE name = 'printing'`).Scan(&statusID) + if err != nil { + return 0, fmt.Errorf("moonraker poller: missing 'printing' job status: %w", err) + } + + err = w.pool.QueryRow(ctx, ` + INSERT INTO print_jobs (printer_id, job_name, file_name, job_status_id, started_at) + VALUES ($1, $2, $3, $4, NOW()) + RETURNING id + `, w.printerID, jobName, jobName, statusID).Scan(&jobID) + if err != nil { + return 0, fmt.Errorf("moonraker poller: failed to create print job: %w", err) + } + + slog.Info("moonraker poller: created print job", "job_id", jobID, "job_name", jobName) + return jobID, nil +} + +// ── Package-level helpers (shared by both workers) ────────────────────────── + +// lookupActiveSpool finds the most recently used spool for a given printer. +func lookupActiveSpool(ctx context.Context, pool *pgxpool.Pool, printerID int) (int, float64) { + type result struct { + id int + density float64 + } + var res result + + err := pool.QueryRow(ctx, ` + SELECT fs.id, COALESCE(mb.density_g_cm3, 1.24) + FROM filament_spools fs + JOIN material_bases mb ON fs.material_base_id = mb.id + JOIN print_jobs pj ON pj.filament_spool_id = fs.id + WHERE pj.printer_id = $1 AND fs.deleted_at IS NULL + ORDER BY pj.created_at DESC LIMIT 1 + `, printerID).Scan(&res.id, &res.density) + if err == nil { + return res.id, res.density + } + + err = pool.QueryRow(ctx, ` + SELECT fs.id, COALESCE(mb.density_g_cm3, 1.24) + FROM filament_spools fs + JOIN material_bases mb ON fs.material_base_id = mb.id + WHERE fs.deleted_at IS NULL + ORDER BY fs.created_at DESC LIMIT 1 + `).Scan(&res.id, &res.density) + if err == nil { + return res.id, res.density + } + + return 1, 1.24 +} + +// insertUsageLog inserts a usage_log entry and decrements the spool's remaining grams. +func insertUsageLog(ctx context.Context, pool *pgxpool.Pool, jobID, spoolID int, deltaMM, densityGCm3 float64) error { + const crossSectionCm2 = 0.02405 // π * (0.0875cm)² for 1.75mm filament + gramsUsed := crossSectionCm2 * (deltaMM / 10.0) * densityGCm3 + + if gramsUsed <= 0 || deltaMM <= 0 { + return nil + } + + if _, err := pool.Exec(ctx, ` + INSERT INTO usage_logs (print_job_id, filament_spool_id, mm_extruded, grams_used, logged_at) + VALUES ($1, $2, $3, $4, NOW()) + `, jobID, spoolID, deltaMM, gramsUsed); err != nil { + return fmt.Errorf("usage_log insert failed: %w", err) + } + + _, _ = pool.Exec(ctx, ` + UPDATE filament_spools + SET remaining_grams = GREATEST(remaining_grams - $2::int, 0), + updated_at = NOW() + WHERE id = $1 + `, spoolID, int(gramsUsed)) + + slog.Debug("moonraker poller: logged usage", + "job_id", jobID, "spool_id", spoolID, + "mm_extruded", deltaMM, "grams_used", gramsUsed, + ) + return nil +} diff --git a/backend/internal/workers/mqtt_subscriber.go b/backend/internal/workers/mqtt_subscriber.go new file mode 100644 index 0000000..cd75710 --- /dev/null +++ b/backend/internal/workers/mqtt_subscriber.go @@ -0,0 +1,170 @@ +package workers + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/CubeCraft-Creations/Extrudex/backend/internal/clients" + "github.com/jackc/pgx/v5/pgxpool" +) + +// bambuJobState tracks the active print job detected via MQTT. +type bambuJobState struct { + gcodeFile string + gcodeState string + percent int +} + +// MQTTSubscriber listens to Bambu Lab MQTT telemetry topics and logs +// filament usage events to PostgreSQL. +type MQTTSubscriber struct { + Client *clients.MQTTClient + pool *pgxpool.Pool + + printerID int + printerName string + + mu sync.Mutex + state bambuJobState +} + +// MQTTSubscriberConfig holds configuration for the MQTT subscriber worker. +type MQTTSubscriberConfig struct { + Pool *pgxpool.Pool + PrinterID int + PrinterName string +} + +// NewMQTTSubscriber creates a new MQTTSubscriber worker. Set Client after +// construction to wire the handler. +func NewMQTTSubscriber(cfg MQTTSubscriberConfig) *MQTTSubscriber { + return &MQTTSubscriber{ + pool: cfg.Pool, + printerID: cfg.PrinterID, + printerName: cfg.PrinterName, + } +} + +// Run connects to MQTT and blocks until ctx is cancelled. +func (w *MQTTSubscriber) Run(ctx context.Context) error { + slog.Info("mqtt subscriber: starting", + "printer_id", w.printerID, + "printer_name", w.printerName, + ) + + if w.Client == nil { + return fmt.Errorf("mqtt subscriber: Client is nil") + } + + if err := w.Client.Connect(); err != nil { + return fmt.Errorf("mqtt subscriber: connect failed: %w", err) + } + defer w.Client.Disconnect() + + slog.Info("mqtt subscriber: connected", "printer_id", w.printerID) + + <-ctx.Done() + slog.Info("mqtt subscriber: shutting down", "printer_id", w.printerID) + return nil +} + +// HandleBambuReport is the MQTT callback for Bambu telemetry messages. +func (w *MQTTSubscriber) HandleBambuReport(report clients.BambuPrintReport) error { + w.mu.Lock() + prev := w.state + current := bambuJobState{ + gcodeFile: report.Print.GcodeFile, + gcodeState: report.Print.GcodeState, + percent: report.Print.McPercent, + } + w.state = current + w.mu.Unlock() + + if prev.gcodeState == current.gcodeState && prev.gcodeFile == current.gcodeFile { + return nil + } + + slog.Info("mqtt subscriber: state change", + "printer_id", w.printerID, + "file", current.gcodeFile, + "state", current.gcodeState, + "percent", current.percent, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + switch current.gcodeState { + case "RUNNING": + return w.handleState(ctx, current, "printing") + case "FINISH": + return w.handleState(ctx, current, "completed") + case "FAILED": + return w.handleState(ctx, current, "failed") + } + return nil +} + +func (w *MQTTSubscriber) handleState(ctx context.Context, s bambuJobState, status string) error { + jobID, err := w.ensurePrintJob(ctx, s.gcodeFile, status) + if err != nil { + return err + } + + if status == "completed" || status == "failed" { + _, _ = w.pool.Exec(ctx, ` + UPDATE print_jobs SET + job_status_id = (SELECT id FROM job_statuses WHERE name = $2), + completed_at = CASE WHEN $2 = 'completed' THEN NOW() ELSE completed_at END, + updated_at = NOW() + WHERE id = $1 + `, jobID, status) + } else { + _, _ = w.pool.Exec(ctx, ` + UPDATE print_jobs SET + job_status_id = (SELECT id FROM job_statuses WHERE name = $2), + started_at = COALESCE(started_at, NOW()), + updated_at = NOW() + WHERE id = $1 + `, jobID, status) + } + + slog.Info("mqtt subscriber: job updated", + "printer_id", w.printerID, "job_id", jobID, "status", status) + return nil +} + +func (w *MQTTSubscriber) ensurePrintJob(ctx context.Context, filename, status string) (int, error) { + var jobID int + err := w.pool.QueryRow(ctx, ` + SELECT id FROM print_jobs + WHERE printer_id = $1 AND file_name = $2 AND deleted_at IS NULL + ORDER BY created_at DESC LIMIT 1 + `, w.printerID, filename).Scan(&jobID) + + if err == nil { + return jobID, nil + } + + var statusID int + err = w.pool.QueryRow(ctx, `SELECT id FROM job_statuses WHERE name = $1`, status).Scan(&statusID) + if err != nil { + return 0, fmt.Errorf("mqtt subscriber: unknown status '%s': %w", status, err) + } + + err = w.pool.QueryRow(ctx, ` + INSERT INTO print_jobs (printer_id, job_name, file_name, job_status_id, started_at) + VALUES ($1, $2, $2, $3, NOW()) + RETURNING id + `, w.printerID, filename, statusID).Scan(&jobID) + if err != nil { + return 0, fmt.Errorf("mqtt subscriber: create print_job failed: %w", err) + } + + slog.Info("mqtt subscriber: created print job", + "job_id", jobID, "file", filename, "status", status) + return jobID, nil +}