diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index bdb0778..a840161 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -11,8 +11,10 @@ import ( "github.com/CubeCraft-Creations/Extrudex/backend/internal/config" "github.com/CubeCraft-Creations/Extrudex/backend/internal/db" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/repositories" "github.com/CubeCraft-Creations/Extrudex/backend/internal/router" "github.com/CubeCraft-Creations/Extrudex/backend/internal/sse" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/workers" ) func main() { @@ -40,6 +42,11 @@ func main() { slog.Info("database connected") + // Repositories (for background workers) + printerRepo := repositories.NewPrinterRepository(dbPool) + jobRepo := repositories.NewPrintJobRepository(dbPool) + usageLogRepo := repositories.NewUsageLogRepository(dbPool) + // Create SSE broadcaster and start it sseBC := sse.NewBroadcaster(128) sseBC.Start() @@ -47,6 +54,19 @@ func main() { slog.Info("sse broadcaster started") + // Start background workers + mrCfg := workers.DefaultMoonrakerPollerConfig() + mrPoller := workers.NewMoonrakerPoller(mrCfg, dbPool, printerRepo, jobRepo, usageLogRepo, sseBC) + mrPoller.Start() + defer mrPoller.Stop() + + mqttCfg := workers.DefaultMQTTSubscriberConfig() + mqttSub := workers.NewMQTTSubscriber(mqttCfg, dbPool, printerRepo, usageLogRepo, sseBC) + mqttSub.Start() + defer mqttSub.Stop() + + slog.Info("background workers started") + // Create router r := router.New(cfg, dbPool, sseBC) diff --git a/backend/internal/clients/moonraker.go b/backend/internal/clients/moonraker.go new file mode 100644 index 0000000..a9ef78b --- /dev/null +++ b/backend/internal/clients/moonraker.go @@ -0,0 +1,161 @@ +// Package clients provides third-party printer integrations. +package clients + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "time" +) + +// MoonrakerPrinterInfo represents the response from /api/printer/info. +type MoonrakerPrinterInfo struct { + State string `json:"state"` + Hostname string `json:"hostname,omitempty"` + SoftwareVersion string `json:"software_version,omitempty"` +} + +// MoonrakerPrintStats represents the response from /api/printer/print_stats. +type MoonrakerPrintStats struct { + State string `json:"state"` + Filename string `json:"filename,omitempty"` + FilamentUsedMm float64 `json:"filament_used,omitempty"` + TotalDuration float64 `json:"total_duration,omitempty"` + PrintDuration float64 `json:"print_duration,omitempty"` + Message string `json:"message,omitempty"` +} + +// MoonrakerPrintJob represents a single job from the history API. +type MoonrakerPrintJob struct { + JobID string `json:"job_id,omitempty"` + Filename string `json:"filename"` + Status string `json:"status"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time,omitempty"` + FilamentUsedMm float64 `json:"filament_used,omitempty"` + TotalDuration float64 `json:"total_duration,omitempty"` +} + +// MoonrakerHistoryResponse represents the response from /api/server/history/job. +type MoonrakerHistoryResponse struct { + Items []MoonrakerPrintJob `json:"jobs"` +} + +// MoonrakerClient is an HTTP client for the Moonraker API. +type MoonrakerClient struct { + HTTPClient *http.Client +} + +// NewMoonrakerClient creates a MoonrakerClient with the given request timeout. +func NewMoonrakerClient(timeout time.Duration) *MoonrakerClient { + return &MoonrakerClient{ + HTTPClient: &http.Client{Timeout: timeout}, + } +} + +// baseURL builds the Moonraker base URL from host and port. +func (c *MoonrakerClient) baseURL(host string, port int) string { + if port == 0 { + port = 80 + } + return fmt.Sprintf("http://%s:%d", host, port) +} + +// GetPrinterInfo fetches printer info from Moonraker. +func (c *MoonrakerClient) GetPrinterInfo(ctx context.Context, host string, port int, apiKey string) (*MoonrakerPrinterInfo, error) { + url := c.baseURL(host, port) + "/api/printer/info" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + if apiKey != "" { + req.Header.Set("X-Api-Key", apiKey) + } + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("moonraker getPrinterInfo request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("moonraker getPrinterInfo returned status %d", resp.StatusCode) + } + + var body struct { + Result MoonrakerPrinterInfo `json:"result"` + } + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + return nil, fmt.Errorf("moonraker getPrinterInfo decode failed: %w", err) + } + + slog.Debug("moonraker printer info", "host", host, "state", body.Result.State) + return &body.Result, nil +} + +// GetPrintStats fetches current print statistics from Moonraker. +func (c *MoonrakerClient) GetPrintStats(ctx context.Context, host string, port int, apiKey string) (*MoonrakerPrintStats, error) { + url := c.baseURL(host, port) + "/api/printer/print_stats" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + if apiKey != "" { + req.Header.Set("X-Api-Key", apiKey) + } + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("moonraker getPrintStats request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("moonraker getPrintStats returned status %d", resp.StatusCode) + } + + var body struct { + Result MoonrakerPrintStats `json:"result"` + } + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + return nil, fmt.Errorf("moonraker getPrintStats decode failed: %w", err) + } + + slog.Debug("moonraker print stats", "host", host, "state", body.Result.State, "filename", body.Result.Filename) + return &body.Result, nil +} + +// GetPrintHistory fetches completed print job history from Moonraker. +func (c *MoonrakerClient) GetPrintHistory(ctx context.Context, host string, port int, apiKey string, limit int) (*MoonrakerHistoryResponse, error) { + if limit <= 0 { + limit = 25 + } + url := fmt.Sprintf("%s/api/server/history/job?limit=%d", c.baseURL(host, port), limit) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + if apiKey != "" { + req.Header.Set("X-Api-Key", apiKey) + } + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("moonraker getPrintHistory request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("moonraker getPrintHistory returned status %d", resp.StatusCode) + } + + var body MoonrakerHistoryResponse + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + return nil, fmt.Errorf("moonraker getPrintHistory decode failed: %w", err) + } + + slog.Debug("moonraker print history", "host", host, "count", len(body.Items)) + return &body, nil +} diff --git a/backend/internal/clients/mqtt.go b/backend/internal/clients/mqtt.go new file mode 100644 index 0000000..7358497 --- /dev/null +++ b/backend/internal/clients/mqtt.go @@ -0,0 +1,119 @@ +// Package clients provides third-party printer integrations. +package clients + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "log/slog" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// MQTTClient wraps the Eclipse Paho MQTT client for printer telemetry. +type MQTTClient struct { + client mqtt.Client +} + +// MQTTConfig holds per-printer MQTT connection settings. +type MQTTConfig struct { + BrokerHost string + BrokerPort int + TopicPrefix string + TLSEnabled bool + ClientID string +} + +// BambuPrintStatus is the known Bambu Lab print-status payload shape. +type BambuPrintStatus struct { + Print struct { + GcodeFile string `json:"gcode_file,omitempty"` + Stage int `json:"stage,omitempty"` + SubTaskName string `json:"subtask_name,omitempty"` + PrintType string `json:"print_type,omitempty"` + FilamentUsedMm float64 `json:"mc_percent,omitempty"` // placeholder; real telemetry varies + } `json:"print,omitempty"` +} + +// NewMQTTClient creates an MQTT client connected to the given broker. +func NewMQTTClient(cfg MQTTConfig) (*MQTTClient, error) { + if cfg.BrokerPort == 0 { + if cfg.TLSEnabled { + cfg.BrokerPort = 8883 + } else { + cfg.BrokerPort = 1883 + } + } + if cfg.ClientID == "" { + cfg.ClientID = fmt.Sprintf("extrudex-%d", time.Now().Unix()) + } + + opts := mqtt.NewClientOptions(). + AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.BrokerHost, cfg.BrokerPort)). + SetClientID(cfg.ClientID). + SetAutoReconnect(true). + SetConnectTimeout(10 * time.Second). + SetOrderMatters(false) + + if cfg.TLSEnabled { + opts = opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: false}) + } + + client := mqtt.NewClient(opts) + token := client.Connect() + if token.Wait() && token.Error() != nil { + return nil, fmt.Errorf("mqtt connect failed: %w", token.Error()) + } + + slog.Info("mqtt client connected", "broker", cfg.BrokerHost, "port", cfg.BrokerPort, "tls", cfg.TLSEnabled) + return &MQTTClient{client: client}, nil +} + +// Subscribe registers a callback for messages matching topic. +func (c *MQTTClient) Subscribe(topic string, qos byte, callback func([]byte)) error { + token := c.client.Subscribe(topic, qos, func(_ mqtt.Client, msg mqtt.Message) { + callback(msg.Payload()) + }) + if token.Wait() && token.Error() != nil { + return fmt.Errorf("mqtt subscribe failed: %w", token.Error()) + } + slog.Info("mqtt subscribed", "topic", topic, "qos", qos) + return nil +} + +// Unsubscribe removes a subscription. +func (c *MQTTClient) Unsubscribe(topics ...string) error { + token := c.client.Unsubscribe(topics...) + if token.Wait() && token.Error() != nil { + return fmt.Errorf("mqtt unsubscribe failed: %w", token.Error()) + } + return nil +} + +// Disconnect cleanly disconnects the MQTT client. +func (c *MQTTClient) Disconnect(quiesceMs uint) { + c.client.Disconnect(quiesceMs) +} + +// IsConnected returns whether the underlying client is connected. +func (c *MQTTClient) IsConnected() bool { + return c.client.IsConnected() +} + +// ParseBambuTelemetry attempts to parse a Bambu Lab telemetry JSON payload. +func ParseBambuTelemetry(payload []byte) (*BambuPrintStatus, error) { + var msg BambuPrintStatus + if err := json.Unmarshal(payload, &msg); err != nil { + return nil, fmt.Errorf("parse bambu telemetry failed: %w", err) + } + return &msg, nil +} + +// DefaultBambuTopics returns the default topic patterns for Bambu Lab printers. +func DefaultBambuTopics(topicPrefix string) []string { + return []string{ + topicPrefix + "/report", + } +} diff --git a/backend/internal/workers/moonraker_poller.go b/backend/internal/workers/moonraker_poller.go new file mode 100644 index 0000000..44de9f5 --- /dev/null +++ b/backend/internal/workers/moonraker_poller.go @@ -0,0 +1,321 @@ +// Package workers provides background goroutines for printer telemetry. +package workers + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "time" + + "github.com/CubeCraft-Creations/Extrudex/backend/internal/clients" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/models" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/repositories" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/sse" + "github.com/jackc/pgx/v5/pgxpool" +) + +// MoonrakerPollerConfig controls the background polling behaviour. +type MoonrakerPollerConfig struct { + PollInterval time.Duration + RequestTimeout time.Duration +} + +// DefaultMoonrakerPollerConfig returns sensible defaults. +func DefaultMoonrakerPollerConfig() MoonrakerPollerConfig { + return MoonrakerPollerConfig{ + PollInterval: 30 * time.Second, + RequestTimeout: 10 * time.Second, + } +} + +// MoonrakerPoller periodically polls Moonraker printers for status and usage. +type MoonrakerPoller struct { + cfg MoonrakerPollerConfig + client *clients.MoonrakerClient + printerRepo *repositories.PrinterRepository + jobRepo *repositories.PrintJobRepository + usageRepo *repositories.UsageLogRepository + sseBC *sse.Broadcaster + pool *pgxpool.Pool + stop chan struct{} +} + +// NewMoonrakerPoller creates a poller. It uses the pool directly for +// transaction-scoped writes that the repository layer cannot span. +func NewMoonrakerPoller( + cfg MoonrakerPollerConfig, + pool *pgxpool.Pool, + printerRepo *repositories.PrinterRepository, + jobRepo *repositories.PrintJobRepository, + usageRepo *repositories.UsageLogRepository, + sseBC *sse.Broadcaster, +) *MoonrakerPoller { + return &MoonrakerPoller{ + cfg: cfg, + client: clients.NewMoonrakerClient(cfg.RequestTimeout), + printerRepo: printerRepo, + jobRepo: jobRepo, + usageRepo: usageRepo, + sseBC: sseBC, + pool: pool, + stop: make(chan struct{}), + } +} + +// Start begins the polling loop in a goroutine. +func (p *MoonrakerPoller) Start() { + go p.loop() +} + +// Stop signals the loop to exit. +func (p *MoonrakerPoller) Stop() { + close(p.stop) +} + +func (p *MoonrakerPoller) loop() { + ticker := time.NewTicker(p.cfg.PollInterval) + defer ticker.Stop() + + // Immediate first tick. + p.pollCycle() + + for { + select { + case <-ticker.C: + p.pollCycle() + case <-p.stop: + slog.Info("moonraker poller stopped") + return + } + } +} + +func (p *MoonrakerPoller) pollCycle() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + printers, err := p.printerRepo.GetAll(ctx) + if err != nil { + slog.Error("moonraker poller: failed to list printers", "error", err) + return + } + + for _, printer := range printers { + if !printer.IsActive || printer.MoonrakerURL == nil || *printer.MoonrakerURL == "" { + continue + } + + if err := p.pollPrinter(ctx, printer); err != nil { + slog.Warn("moonraker poller: poll failed", + "printer", printer.Name, + "error", err, + ) + } + } +} + +// pollPrinter performs a single Moonraker poll for a printer. +func (p *MoonrakerPoller) pollPrinter(ctx context.Context, printer models.Printer) error { + host := *printer.MoonrakerURL + var apiKey string + if printer.MoonrakerAPIKey != nil { + apiKey = *printer.MoonrakerAPIKey + } + + // Fetch printer info (status) + info, err := p.client.GetPrinterInfo(ctx, host, 80, apiKey) + if err != nil { + p.broadcastStatus(printer.ID, printer.Name, "offline") + return err + } + + status := mapMoonrakerState(info.State) + p.broadcastStatus(printer.ID, printer.Name, status) + + // Fetch print stats + stats, err := p.client.GetPrintStats(ctx, host, 80, apiKey) + if err != nil { + return fmt.Errorf("getPrintStats failed: %w", err) + } + + if status == "printing" && stats.Filename != "" { + p.broadcastJobStarted(printer.ID, stats.Filename) + } + + if isCompleteState(stats.State) && stats.FilamentUsedMm > 0 { + // Record usage + if err := p.recordUsage(ctx, printer, stats); err != nil { + slog.Error("moonraker poller: record usage failed", + "printer", printer.Name, "error", err) + } else { + p.broadcastJobCompleted(printer.ID, stats.Filename, stats.FilamentUsedMm) + } + } + + return nil +} + +func (p *MoonrakerPoller) recordUsage(ctx context.Context, printer models.Printer, stats *clients.MoonrakerPrintStats) error { + // Find active spool for printer — for now use the first active spool + // or fallback to the one referenced by the printer if available. + // In a real scenario we'd query AMS slots or fallback logic. + // Here we simply look for the most recently used spool in usage_logs. + var spoolID int + row := p.pool.QueryRow(ctx, ` + SELECT filament_spool_id FROM usage_logs + WHERE print_job_id IN ( + SELECT id FROM print_jobs WHERE printer_id = $1 + ) + ORDER BY logged_at DESC LIMIT 1 + `, printer.ID) + _ = row.Scan(&spoolID) + + if spoolID == 0 { + // No prior usage — skip recording (no known spool to deduct from) + slog.Warn("moonraker poller: no known spool for printer; skipping usage record", + "printer", printer.Name) + return nil + } + + // Compute grams from mm extruded using defaults (1.75mm diameter, PLA density 1.24) + grams := calculateGrams(stats.FilamentUsedMm, 1.75, 1.24) + + // Create a print job record + var jobID int + err := p.pool.QueryRow(ctx, ` + INSERT INTO print_jobs (printer_id, filament_spool_id, job_name, file_name, job_status_id, + started_at, completed_at, duration_seconds, total_mm_extruded, total_grams_used) + VALUES ($1, $2, $3, $4, 4, $5, $6, $7, $8, $9) + RETURNING id + `, printer.ID, spoolID, stats.Filename, stats.Filename, + time.Now().Add(-time.Duration(stats.TotalDuration)*time.Second), + time.Now(), + int(stats.TotalDuration), + stats.FilamentUsedMm, + grams, + ).Scan(&jobID) + + if err != nil { + return fmt.Errorf("insert print_job failed: %w", err) + } + + // Create usage_log + _, err = p.pool.Exec(ctx, ` + INSERT INTO usage_logs (print_job_id, filament_spool_id, mm_extruded, grams_used, logged_at) + VALUES ($1, $2, $3, $4, NOW()) + `, jobID, spoolID, stats.FilamentUsedMm, grams) + + if err != nil { + return fmt.Errorf("insert usage_log failed: %w", err) + } + + slog.Info("moonraker poller: recorded usage", + "printer", printer.Name, + "job", stats.Filename, + "mm", stats.FilamentUsedMm, + "grams", grams, + ) + return nil +} + +func (p *MoonrakerPoller) broadcastStatus(printerID int, name, status string) { + if p.sseBC == nil { + return + } + ev, err := sse.NewEvent(sse.EventPrinterStatus, sse.PrinterStatusPayload{ + PrinterID: printerID, + PrinterName: name, + Status: status, + }) + if err != nil { + return + } + p.sseBC.Publish(ev) +} + +func (p *MoonrakerPoller) broadcastJobStarted(printerID int, jobName string) { + if p.sseBC == nil { + return + } + ev, err := sse.NewEvent(sse.EventJobStarted, sse.JobStartedPayload{ + JobName: jobName, + PrinterID: printerID, + }) + if err != nil { + return + } + p.sseBC.Publish(ev) +} + +func (p *MoonrakerPoller) broadcastJobCompleted(printerID int, jobName string, mmExtruded float64) { + if p.sseBC == nil { + return + } + grams := calculateGrams(mmExtruded, 1.75, 1.24) + gramsInt := int(grams) + ev, err := sse.NewEvent(sse.EventJobCompleted, sse.JobCompletedPayload{ + JobName: jobName, + PrinterID: printerID, + TotalGramsUsed: &gramsInt, + }) + if err != nil { + return + } + p.sseBC.Publish(ev) +} + +func mapMoonrakerState(state string) string { + switch state { + case "printing": + return "printing" + case "paused": + return "paused" + case "complete", "standby", "cancelled": + return "idle" + case "error": + return "error" + default: + return "offline" + } +} + +func isCompleteState(state string) bool { + return state == "complete" || state == "completed" +} + +func calculateGrams(mmExtruded, diameterMm, densityGcm3 float64) float64 { + if mmExtruded <= 0 { + return 0 + } + radiusCm := diameterMm / 2.0 / 10.0 + crossSection := 3.141592653589793 * radiusCm * radiusCm + volumeCm3 := (mmExtruded / 10.0) * crossSection + return volumeCm3 * densityGcm3 +} + +// --------------------------------------------------------------------------- +// Helper for port parsing (Moonraker URL may contain port) +// --------------------------------------------------------------------------- + +func extractHostPort(rawURL string) (string, int) { + // Very simplistic: if rawURL contains ":" after a dot, parse host:port. + // Otherwise assume host only and return port 80. + if rawURL == "" { + return "", 80 + } + for i := len(rawURL) - 1; i >= 0; i-- { + if rawURL[i] == ':' { + portStr := rawURL[i+1:] + port, err := strconv.Atoi(portStr) + if err == nil { + return rawURL[:i], port + } + break + } + if rawURL[i] == '/' { + break + } + } + return rawURL, 80 +} diff --git a/backend/internal/workers/mqtt_subscriber.go b/backend/internal/workers/mqtt_subscriber.go new file mode 100644 index 0000000..11a8980 --- /dev/null +++ b/backend/internal/workers/mqtt_subscriber.go @@ -0,0 +1,223 @@ +// Package workers provides background goroutines for printer telemetry. +package workers + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/CubeCraft-Creations/Extrudex/backend/internal/clients" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/models" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/repositories" + "github.com/CubeCraft-Creations/Extrudex/backend/internal/sse" + "github.com/jackc/pgx/v5/pgxpool" +) + +// MQTTSubscriberConfig controls MQTT background worker behaviour. +type MQTTSubscriberConfig struct { + ReconnectInterval time.Duration +} + +// DefaultMQTTSubscriberConfig returns sensible defaults. +func DefaultMQTTSubscriberConfig() MQTTSubscriberConfig { + return MQTTSubscriberConfig{ + ReconnectInterval: 30 * time.Second, + } +} + +// MQTTSubscriber manages per-printer MQTT connections and telemetry ingestion. +type MQTTSubscriber struct { + cfg MQTTSubscriberConfig + printerRepo *repositories.PrinterRepository + usageRepo *repositories.UsageLogRepository + sseBC *sse.Broadcaster + pool *pgxpool.Pool + clients map[int]*clients.MQTTClient // keyed by printer ID + stop chan struct{} +} + +// NewMQTTSubscriber creates a new subscriber worker. +func NewMQTTSubscriber( + cfg MQTTSubscriberConfig, + pool *pgxpool.Pool, + printerRepo *repositories.PrinterRepository, + usageRepo *repositories.UsageLogRepository, + sseBC *sse.Broadcaster, +) *MQTTSubscriber { + return &MQTTSubscriber{ + cfg: cfg, + printerRepo: printerRepo, + usageRepo: usageRepo, + sseBC: sseBC, + pool: pool, + clients: make(map[int]*clients.MQTTClient), + stop: make(chan struct{}), + } +} + +// Start begins the connection manager loop. +func (s *MQTTSubscriber) Start() { + go s.loop() +} + +// Stop signals the loop to exit and disconnects all clients. +func (s *MQTTSubscriber) Stop() { + close(s.stop) +} + +func (s *MQTTSubscriber) loop() { + // Initial connect attempt. + s.connectAll() + + ticker := time.NewTicker(s.cfg.ReconnectInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.connectAll() + case <-s.stop: + slog.Info("mqtt subscriber stopped") + for _, c := range s.clients { + c.Disconnect(1000) + } + return + } + } +} + +func (s *MQTTSubscriber) connectAll() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + printers, err := s.printerRepo.GetAll(ctx) + if err != nil { + slog.Error("mqtt subscriber: failed to list printers", "error", err) + return + } + + for _, printer := range printers { + if !printer.IsActive || printer.MQTTBrokerHost == nil || *printer.MQTTBrokerHost == "" { + // Disconnect if previously connected and now inactive + if existing, ok := s.clients[printer.ID]; ok { + existing.Disconnect(1000) + delete(s.clients, printer.ID) + } + continue + } + + if _, ok := s.clients[printer.ID]; ok { + // Already connected — skip + continue + } + + topicPrefix := "" + if printer.MQTTTopicPrefix != nil { + topicPrefix = *printer.MQTTTopicPrefix + } + + cfg := clients.MQTTConfig{ + BrokerHost: *printer.MQTTBrokerHost, + TopicPrefix: topicPrefix, + TLSEnabled: printer.MQTTTLSEnabled, + ClientID: fmt.Sprintf("extrudex-printer-%d", printer.ID), + } + + c, err := clients.NewMQTTClient(cfg) + if err != nil { + slog.Warn("mqtt subscriber: connect failed", + "printer", printer.Name, + "broker", cfg.BrokerHost, + "error", err, + ) + continue + } + + s.clients[printer.ID] = c + + // Subscribe to telemetry topics + topics := clients.DefaultBambuTopics(topicPrefix) + for _, topic := range topics { + if err := c.Subscribe(topic, 0, s.makeHandler(printer)); err != nil { + slog.Warn("mqtt subscriber: subscribe failed", + "printer", printer.Name, + "topic", topic, + "error", err, + ) + } + } + + slog.Info("mqtt subscriber: connected", + "printer", printer.Name, + "broker", cfg.BrokerHost, + "topics", topics, + ) + } +} + +func (s *MQTTSubscriber) makeHandler(printer models.Printer) func([]byte) { + return func(payload []byte) { + slog.Debug("mqtt subscriber: message received", + "printer", printer.Name, + "size", len(payload), + ) + + // Attempt Bambu Lab parse + telemetry, err := clients.ParseBambuTelemetry(payload) + if err != nil { + slog.Debug("mqtt subscriber: not Bambu telemetry; discarding", + "printer", printer.Name, "error", err) + return + } + + // Determine status from telemetry + status := "idle" + if telemetry.Print.Stage > 0 { + status = "printing" + } + s.broadcastStatus(printer.ID, printer.Name, status) + + // If a print just completed, record usage when we see a completed event. + // Bambu telemetry does not carry mm_extruded directly; we approximate + // or skip if not present. Here we broadcast completion if stage == 0 + // and a gcode file was present (naive heuristic). + if telemetry.Print.GcodeFile != "" && telemetry.Print.Stage == 0 { + // In a real implementation we'd extract mm_extruded from Bambu telemetry + // or query the printer after completion. For now broadcast completion. + s.broadcastJobCompleted(printer.ID, telemetry.Print.GcodeFile, 0) + } + } +} + +func (s *MQTTSubscriber) broadcastStatus(printerID int, name, status string) { + if s.sseBC == nil { + return + } + ev, err := sse.NewEvent(sse.EventPrinterStatus, sse.PrinterStatusPayload{ + PrinterID: printerID, + PrinterName: name, + Status: status, + }) + if err != nil { + return + } + s.sseBC.Publish(ev) +} + +func (s *MQTTSubscriber) broadcastJobCompleted(printerID int, jobName string, mmExtruded float64) { + if s.sseBC == nil { + return + } + grams := calculateGrams(mmExtruded, 1.75, 1.24) + gramsInt := int(grams) + ev, err := sse.NewEvent(sse.EventJobCompleted, sse.JobCompletedPayload{ + JobName: jobName, + PrinterID: printerID, + TotalGramsUsed: &gramsInt, + }) + if err != nil { + return + } + s.sseBC.Publish(ev) +}