Files
Extrudex/backend/internal/workers/mqtt_subscriber.go
Joshua 38722e54e6
All checks were successful
Dev Build / build-test (pull_request) Successful in 1m29s
CUB-117: Port Moonraker + MQTT printer integrations to Go
- Moonraker REST client with GetPrinterInfo, GetPrintStats, GetPrintHistory
- Moonraker WebSocket client with auto-reconnect + telemetry parsing
- MQTT client via paho.mqtt.golang with TLS support for Bambu Lab
- Moonraker poller worker: background polling, dedup, usage logging to PostgreSQL
- MQTT subscriber worker: Bambu telemetry parsing, print job tracking
- Config: 7 new env vars (MOONRAKER_URL, MQTT_BROKER, etc.)
- main.go: per-printer worker discovery, graceful shutdown
2026-05-12 01:02:49 -04:00

171 lines
4.4 KiB
Go

package workers
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/CubeCraft-Creations/Extrudex/backend/internal/clients"
"github.com/jackc/pgx/v5/pgxpool"
)
// bambuJobState tracks the active print job detected via MQTT.
type bambuJobState struct {
gcodeFile string
gcodeState string
percent int
}
// MQTTSubscriber listens to Bambu Lab MQTT telemetry topics and logs
// filament usage events to PostgreSQL.
type MQTTSubscriber struct {
Client *clients.MQTTClient
pool *pgxpool.Pool
printerID int
printerName string
mu sync.Mutex
state bambuJobState
}
// MQTTSubscriberConfig holds configuration for the MQTT subscriber worker.
type MQTTSubscriberConfig struct {
Pool *pgxpool.Pool
PrinterID int
PrinterName string
}
// NewMQTTSubscriber creates a new MQTTSubscriber worker. Set Client after
// construction to wire the handler.
func NewMQTTSubscriber(cfg MQTTSubscriberConfig) *MQTTSubscriber {
return &MQTTSubscriber{
pool: cfg.Pool,
printerID: cfg.PrinterID,
printerName: cfg.PrinterName,
}
}
// Run connects to MQTT and blocks until ctx is cancelled.
func (w *MQTTSubscriber) Run(ctx context.Context) error {
slog.Info("mqtt subscriber: starting",
"printer_id", w.printerID,
"printer_name", w.printerName,
)
if w.Client == nil {
return fmt.Errorf("mqtt subscriber: Client is nil")
}
if err := w.Client.Connect(); err != nil {
return fmt.Errorf("mqtt subscriber: connect failed: %w", err)
}
defer w.Client.Disconnect()
slog.Info("mqtt subscriber: connected", "printer_id", w.printerID)
<-ctx.Done()
slog.Info("mqtt subscriber: shutting down", "printer_id", w.printerID)
return nil
}
// HandleBambuReport is the MQTT callback for Bambu telemetry messages.
func (w *MQTTSubscriber) HandleBambuReport(report clients.BambuPrintReport) error {
w.mu.Lock()
prev := w.state
current := bambuJobState{
gcodeFile: report.Print.GcodeFile,
gcodeState: report.Print.GcodeState,
percent: report.Print.McPercent,
}
w.state = current
w.mu.Unlock()
if prev.gcodeState == current.gcodeState && prev.gcodeFile == current.gcodeFile {
return nil
}
slog.Info("mqtt subscriber: state change",
"printer_id", w.printerID,
"file", current.gcodeFile,
"state", current.gcodeState,
"percent", current.percent,
)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
switch current.gcodeState {
case "RUNNING":
return w.handleState(ctx, current, "printing")
case "FINISH":
return w.handleState(ctx, current, "completed")
case "FAILED":
return w.handleState(ctx, current, "failed")
}
return nil
}
func (w *MQTTSubscriber) handleState(ctx context.Context, s bambuJobState, status string) error {
jobID, err := w.ensurePrintJob(ctx, s.gcodeFile, status)
if err != nil {
return err
}
if status == "completed" || status == "failed" {
_, _ = w.pool.Exec(ctx, `
UPDATE print_jobs SET
job_status_id = (SELECT id FROM job_statuses WHERE name = $2),
completed_at = CASE WHEN $2 = 'completed' THEN NOW() ELSE completed_at END,
updated_at = NOW()
WHERE id = $1
`, jobID, status)
} else {
_, _ = w.pool.Exec(ctx, `
UPDATE print_jobs SET
job_status_id = (SELECT id FROM job_statuses WHERE name = $2),
started_at = COALESCE(started_at, NOW()),
updated_at = NOW()
WHERE id = $1
`, jobID, status)
}
slog.Info("mqtt subscriber: job updated",
"printer_id", w.printerID, "job_id", jobID, "status", status)
return nil
}
func (w *MQTTSubscriber) ensurePrintJob(ctx context.Context, filename, status string) (int, error) {
var jobID int
err := w.pool.QueryRow(ctx, `
SELECT id FROM print_jobs
WHERE printer_id = $1 AND file_name = $2 AND deleted_at IS NULL
ORDER BY created_at DESC LIMIT 1
`, w.printerID, filename).Scan(&jobID)
if err == nil {
return jobID, nil
}
var statusID int
err = w.pool.QueryRow(ctx, `SELECT id FROM job_statuses WHERE name = $1`, status).Scan(&statusID)
if err != nil {
return 0, fmt.Errorf("mqtt subscriber: unknown status '%s': %w", status, err)
}
err = w.pool.QueryRow(ctx, `
INSERT INTO print_jobs (printer_id, job_name, file_name, job_status_id, started_at)
VALUES ($1, $2, $2, $3, NOW())
RETURNING id
`, w.printerID, filename, statusID).Scan(&jobID)
if err != nil {
return 0, fmt.Errorf("mqtt subscriber: create print_job failed: %w", err)
}
slog.Info("mqtt subscriber: created print job",
"job_id", jobID, "file", filename, "status", status)
return jobID, nil
}