package workers import ( "context" "fmt" "log/slog" "sync" "time" "github.com/CubeCraft-Creations/Extrudex/backend/internal/clients" "github.com/jackc/pgx/v5/pgxpool" ) // bambuJobState tracks the active print job detected via MQTT. type bambuJobState struct { gcodeFile string gcodeState string percent int } // MQTTSubscriber listens to Bambu Lab MQTT telemetry topics and logs // filament usage events to PostgreSQL. type MQTTSubscriber struct { Client *clients.MQTTClient pool *pgxpool.Pool printerID int printerName string mu sync.Mutex state bambuJobState } // MQTTSubscriberConfig holds configuration for the MQTT subscriber worker. type MQTTSubscriberConfig struct { Pool *pgxpool.Pool PrinterID int PrinterName string } // NewMQTTSubscriber creates a new MQTTSubscriber worker. Set Client after // construction to wire the handler. func NewMQTTSubscriber(cfg MQTTSubscriberConfig) *MQTTSubscriber { return &MQTTSubscriber{ pool: cfg.Pool, printerID: cfg.PrinterID, printerName: cfg.PrinterName, } } // Run connects to MQTT and blocks until ctx is cancelled. func (w *MQTTSubscriber) Run(ctx context.Context) error { slog.Info("mqtt subscriber: starting", "printer_id", w.printerID, "printer_name", w.printerName, ) if w.Client == nil { return fmt.Errorf("mqtt subscriber: Client is nil") } if err := w.Client.Connect(); err != nil { return fmt.Errorf("mqtt subscriber: connect failed: %w", err) } defer w.Client.Disconnect() slog.Info("mqtt subscriber: connected", "printer_id", w.printerID) <-ctx.Done() slog.Info("mqtt subscriber: shutting down", "printer_id", w.printerID) return nil } // HandleBambuReport is the MQTT callback for Bambu telemetry messages. func (w *MQTTSubscriber) HandleBambuReport(report clients.BambuPrintReport) error { w.mu.Lock() prev := w.state current := bambuJobState{ gcodeFile: report.Print.GcodeFile, gcodeState: report.Print.GcodeState, percent: report.Print.McPercent, } w.state = current w.mu.Unlock() if prev.gcodeState == current.gcodeState && prev.gcodeFile == current.gcodeFile { return nil } slog.Info("mqtt subscriber: state change", "printer_id", w.printerID, "file", current.gcodeFile, "state", current.gcodeState, "percent", current.percent, ) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() switch current.gcodeState { case "RUNNING": return w.handleState(ctx, current, "printing") case "FINISH": return w.handleState(ctx, current, "completed") case "FAILED": return w.handleState(ctx, current, "failed") } return nil } func (w *MQTTSubscriber) handleState(ctx context.Context, s bambuJobState, status string) error { jobID, err := w.ensurePrintJob(ctx, s.gcodeFile, status) if err != nil { return err } if status == "completed" || status == "failed" { _, _ = w.pool.Exec(ctx, ` UPDATE print_jobs SET job_status_id = (SELECT id FROM job_statuses WHERE name = $2), completed_at = CASE WHEN $2 = 'completed' THEN NOW() ELSE completed_at END, updated_at = NOW() WHERE id = $1 `, jobID, status) } else { _, _ = w.pool.Exec(ctx, ` UPDATE print_jobs SET job_status_id = (SELECT id FROM job_statuses WHERE name = $2), started_at = COALESCE(started_at, NOW()), updated_at = NOW() WHERE id = $1 `, jobID, status) } slog.Info("mqtt subscriber: job updated", "printer_id", w.printerID, "job_id", jobID, "status", status) return nil } func (w *MQTTSubscriber) ensurePrintJob(ctx context.Context, filename, status string) (int, error) { var jobID int err := w.pool.QueryRow(ctx, ` SELECT id FROM print_jobs WHERE printer_id = $1 AND file_name = $2 AND deleted_at IS NULL ORDER BY created_at DESC LIMIT 1 `, w.printerID, filename).Scan(&jobID) if err == nil { return jobID, nil } var statusID int err = w.pool.QueryRow(ctx, `SELECT id FROM job_statuses WHERE name = $1`, status).Scan(&statusID) if err != nil { return 0, fmt.Errorf("mqtt subscriber: unknown status '%s': %w", status, err) } err = w.pool.QueryRow(ctx, ` INSERT INTO print_jobs (printer_id, job_name, file_name, job_status_id, started_at) VALUES ($1, $2, $2, $3, NOW()) RETURNING id `, w.printerID, filename, statusID).Scan(&jobID) if err != nil { return 0, fmt.Errorf("mqtt subscriber: create print_job failed: %w", err) } slog.Info("mqtt subscriber: created print job", "job_id", jobID, "file", filename, "status", status) return jobID, nil }