// Package workers provides background goroutines for printer telemetry. package workers import ( "context" "fmt" "log/slog" "time" "github.com/CubeCraft-Creations/Extrudex/backend/internal/clients" "github.com/CubeCraft-Creations/Extrudex/backend/internal/models" "github.com/CubeCraft-Creations/Extrudex/backend/internal/repositories" "github.com/CubeCraft-Creations/Extrudex/backend/internal/sse" "github.com/jackc/pgx/v5/pgxpool" ) // MQTTSubscriberConfig controls MQTT background worker behaviour. type MQTTSubscriberConfig struct { ReconnectInterval time.Duration } // DefaultMQTTSubscriberConfig returns sensible defaults. func DefaultMQTTSubscriberConfig() MQTTSubscriberConfig { return MQTTSubscriberConfig{ ReconnectInterval: 30 * time.Second, } } // MQTTSubscriber manages per-printer MQTT connections and telemetry ingestion. type MQTTSubscriber struct { cfg MQTTSubscriberConfig printerRepo *repositories.PrinterRepository usageRepo *repositories.UsageLogRepository sseBC *sse.Broadcaster pool *pgxpool.Pool clients map[int]*clients.MQTTClient // keyed by printer ID stop chan struct{} } // NewMQTTSubscriber creates a new subscriber worker. func NewMQTTSubscriber( cfg MQTTSubscriberConfig, pool *pgxpool.Pool, printerRepo *repositories.PrinterRepository, usageRepo *repositories.UsageLogRepository, sseBC *sse.Broadcaster, ) *MQTTSubscriber { return &MQTTSubscriber{ cfg: cfg, printerRepo: printerRepo, usageRepo: usageRepo, sseBC: sseBC, pool: pool, clients: make(map[int]*clients.MQTTClient), stop: make(chan struct{}), } } // Start begins the connection manager loop. func (s *MQTTSubscriber) Start() { go s.loop() } // Stop signals the loop to exit and disconnects all clients. func (s *MQTTSubscriber) Stop() { close(s.stop) } func (s *MQTTSubscriber) loop() { // Initial connect attempt. s.connectAll() ticker := time.NewTicker(s.cfg.ReconnectInterval) defer ticker.Stop() for { select { case <-ticker.C: s.connectAll() case <-s.stop: slog.Info("mqtt subscriber stopped") for _, c := range s.clients { c.Disconnect(1000) } return } } } func (s *MQTTSubscriber) connectAll() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() printers, err := s.printerRepo.GetAll(ctx) if err != nil { slog.Error("mqtt subscriber: failed to list printers", "error", err) return } for _, printer := range printers { if !printer.IsActive || printer.MQTTBrokerHost == nil || *printer.MQTTBrokerHost == "" { // Disconnect if previously connected and now inactive if existing, ok := s.clients[printer.ID]; ok { existing.Disconnect(1000) delete(s.clients, printer.ID) } continue } if _, ok := s.clients[printer.ID]; ok { // Already connected — skip continue } topicPrefix := "" if printer.MQTTTopicPrefix != nil { topicPrefix = *printer.MQTTTopicPrefix } cfg := clients.MQTTConfig{ BrokerHost: *printer.MQTTBrokerHost, TopicPrefix: topicPrefix, TLSEnabled: printer.MQTTTLSEnabled, ClientID: fmt.Sprintf("extrudex-printer-%d", printer.ID), } c, err := clients.NewMQTTClient(cfg) if err != nil { slog.Warn("mqtt subscriber: connect failed", "printer", printer.Name, "broker", cfg.BrokerHost, "error", err, ) continue } s.clients[printer.ID] = c // Subscribe to telemetry topics topics := clients.DefaultBambuTopics(topicPrefix) for _, topic := range topics { if err := c.Subscribe(topic, 0, s.makeHandler(printer)); err != nil { slog.Warn("mqtt subscriber: subscribe failed", "printer", printer.Name, "topic", topic, "error", err, ) } } slog.Info("mqtt subscriber: connected", "printer", printer.Name, "broker", cfg.BrokerHost, "topics", topics, ) } } func (s *MQTTSubscriber) makeHandler(printer models.Printer) func([]byte) { return func(payload []byte) { slog.Debug("mqtt subscriber: message received", "printer", printer.Name, "size", len(payload), ) // Attempt Bambu Lab parse telemetry, err := clients.ParseBambuTelemetry(payload) if err != nil { slog.Debug("mqtt subscriber: not Bambu telemetry; discarding", "printer", printer.Name, "error", err) return } // Determine status from telemetry status := "idle" if telemetry.Print.Stage > 0 { status = "printing" } s.broadcastStatus(printer.ID, printer.Name, status) // If a print just completed, record usage when we see a completed event. // Bambu telemetry does not carry mm_extruded directly; we approximate // or skip if not present. Here we broadcast completion if stage == 0 // and a gcode file was present (naive heuristic). if telemetry.Print.GcodeFile != "" && telemetry.Print.Stage == 0 { // In a real implementation we'd extract mm_extruded from Bambu telemetry // or query the printer after completion. For now broadcast completion. s.broadcastJobCompleted(printer.ID, telemetry.Print.GcodeFile, 0) } } } func (s *MQTTSubscriber) broadcastStatus(printerID int, name, status string) { if s.sseBC == nil { return } ev, err := sse.NewEvent(sse.EventPrinterStatus, sse.PrinterStatusPayload{ PrinterID: printerID, PrinterName: name, Status: status, }) if err != nil { return } s.sseBC.Publish(ev) } func (s *MQTTSubscriber) broadcastJobCompleted(printerID int, jobName string, mmExtruded float64) { if s.sseBC == nil { return } grams := calculateGrams(mmExtruded, 1.75, 1.24) gramsInt := int(grams) ev, err := sse.NewEvent(sse.EventJobCompleted, sse.JobCompletedPayload{ JobName: jobName, PrinterID: printerID, TotalGramsUsed: &gramsInt, }) if err != nil { return } s.sseBC.Publish(ev) }