Compare commits
1 Commits
agent/Dex/
...
38722e54e6
| Author | SHA1 | Date | |
|---|---|---|---|
| 38722e54e6 |
@@ -6,33 +6,32 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/clients"
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/config"
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/config"
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/db"
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/db"
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/repositories"
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/models"
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/router"
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/router"
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/sse"
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/sse"
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/workers"
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/workers"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// Setup structured logging
|
|
||||||
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||||
Level: slog.LevelInfo,
|
Level: slog.LevelInfo,
|
||||||
})))
|
})))
|
||||||
|
|
||||||
// Load configuration
|
|
||||||
cfg, err := config.Load()
|
cfg, err := config.Load()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("failed to load config", "error", err)
|
slog.Error("failed to load config", "error", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("config loaded", "port", cfg.Port, "cors_origin", cfg.CorsOrigin)
|
|
||||||
|
|
||||||
// Connect to database
|
|
||||||
dbPool, err := db.NewPool(cfg.DatabaseURL)
|
dbPool, err := db.NewPool(cfg.DatabaseURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("failed to connect to database", "error", err)
|
slog.Error("failed to connect to database", "error", err)
|
||||||
@@ -40,48 +39,38 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer db.ClosePool(dbPool)
|
defer db.ClosePool(dbPool)
|
||||||
|
|
||||||
slog.Info("database connected")
|
|
||||||
|
|
||||||
// Repositories (for background workers)
|
|
||||||
printerRepo := repositories.NewPrinterRepository(dbPool)
|
|
||||||
jobRepo := repositories.NewPrintJobRepository(dbPool)
|
|
||||||
usageLogRepo := repositories.NewUsageLogRepository(dbPool)
|
|
||||||
|
|
||||||
// Create SSE broadcaster and start it
|
|
||||||
sseBC := sse.NewBroadcaster(128)
|
sseBC := sse.NewBroadcaster(128)
|
||||||
sseBC.Start()
|
sseBC.Start()
|
||||||
defer sseBC.Stop()
|
defer sseBC.Stop()
|
||||||
|
|
||||||
slog.Info("sse broadcaster started")
|
|
||||||
|
|
||||||
// Start background workers
|
|
||||||
mrCfg := workers.DefaultMoonrakerPollerConfig()
|
|
||||||
mrPoller := workers.NewMoonrakerPoller(mrCfg, dbPool, printerRepo, jobRepo, usageLogRepo, sseBC)
|
|
||||||
mrPoller.Start()
|
|
||||||
defer mrPoller.Stop()
|
|
||||||
|
|
||||||
mqttCfg := workers.DefaultMQTTSubscriberConfig()
|
|
||||||
mqttSub := workers.NewMQTTSubscriber(mqttCfg, dbPool, printerRepo, usageLogRepo, sseBC)
|
|
||||||
mqttSub.Start()
|
|
||||||
defer mqttSub.Stop()
|
|
||||||
|
|
||||||
slog.Info("background workers started")
|
|
||||||
|
|
||||||
// Create router
|
|
||||||
r := router.New(cfg, dbPool, sseBC)
|
r := router.New(cfg, dbPool, sseBC)
|
||||||
|
|
||||||
// Create HTTP server
|
// ── Workers ─────────────────────────────────────────────────────────
|
||||||
// WriteTimeout is 0 for SSE support — the Chi middleware.Timeout(60s)
|
|
||||||
// handles request-level timeouts on non-SSE routes.
|
var wg sync.WaitGroup
|
||||||
|
workersCtx, cancelWorkers := context.WithCancel(context.Background())
|
||||||
|
defer cancelWorkers()
|
||||||
|
|
||||||
|
pollInterval, _ := time.ParseDuration(cfg.MoonrakerPollInterval)
|
||||||
|
if pollInterval <= 0 {
|
||||||
|
pollInterval = 10 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
activePrinters := listActivePrinters(workersCtx, dbPool)
|
||||||
|
for _, p := range activePrinters {
|
||||||
|
startWorkerForPrinter(workersCtx, &wg, cfg, dbPool, p, pollInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── HTTP server ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
server := &http.Server{
|
server := &http.Server{
|
||||||
Addr: ":" + cfg.Port,
|
Addr: ":" + cfg.Port,
|
||||||
Handler: r,
|
Handler: r,
|
||||||
ReadTimeout: 15 * time.Second,
|
ReadTimeout: 15 * time.Second,
|
||||||
WriteTimeout: 0, // disabled for SSE long-lived connections
|
WriteTimeout: 0,
|
||||||
IdleTimeout: 60 * time.Second,
|
IdleTimeout: 60 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start server in goroutine
|
|
||||||
go func() {
|
go func() {
|
||||||
slog.Info("server starting", "addr", server.Addr)
|
slog.Info("server starting", "addr", server.Addr)
|
||||||
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||||
@@ -90,21 +79,119 @@ func main() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Wait for shutdown signal
|
|
||||||
quit := make(chan os.Signal, 1)
|
quit := make(chan os.Signal, 1)
|
||||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||||
<-quit
|
<-quit
|
||||||
|
|
||||||
slog.Info("server shutting down")
|
slog.Info("server shutting down")
|
||||||
|
cancelWorkers()
|
||||||
|
|
||||||
// Graceful shutdown
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
defer shutdownCancel()
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if err := server.Shutdown(ctx); err != nil {
|
if err := server.Shutdown(shutdownCtx); err != nil {
|
||||||
slog.Error("server shutdown error", "error", err)
|
slog.Error("server shutdown error", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
db.ClosePool(dbPool)
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
slog.Info("all workers stopped")
|
||||||
|
case <-time.After(15 * time.Second):
|
||||||
|
slog.Warn("timed out waiting for workers to stop")
|
||||||
|
}
|
||||||
|
|
||||||
slog.Info("server stopped")
|
slog.Info("server stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func listActivePrinters(ctx context.Context, pool *pgxpool.Pool) []models.Printer {
|
||||||
|
rows, err := pool.Query(ctx, `
|
||||||
|
SELECT id, name, printer_type_id,
|
||||||
|
manufacturer, model,
|
||||||
|
moonraker_url, moonraker_api_key,
|
||||||
|
mqtt_broker_host, mqtt_topic_prefix,
|
||||||
|
mqtt_tls_enabled, is_active,
|
||||||
|
created_at, updated_at
|
||||||
|
FROM printers WHERE is_active = TRUE ORDER BY name
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("failed to query active printers", "error", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var printers []models.Printer
|
||||||
|
for rows.Next() {
|
||||||
|
var p models.Printer
|
||||||
|
if err := rows.Scan(
|
||||||
|
&p.ID, &p.Name, &p.PrinterTypeID,
|
||||||
|
&p.Manufacturer, &p.Model,
|
||||||
|
&p.MoonrakerURL, &p.MoonrakerAPIKey,
|
||||||
|
&p.MQTTBrokerHost, &p.MQTTTopicPrefix,
|
||||||
|
&p.MQTTTLSEnabled, &p.IsActive,
|
||||||
|
&p.CreatedAt, &p.UpdatedAt,
|
||||||
|
); err != nil {
|
||||||
|
slog.Warn("failed to scan printer row", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
printers = append(printers, p)
|
||||||
|
}
|
||||||
|
return printers
|
||||||
|
}
|
||||||
|
|
||||||
|
func startWorkerForPrinter(
|
||||||
|
ctx context.Context,
|
||||||
|
wg *sync.WaitGroup,
|
||||||
|
cfg *config.Config,
|
||||||
|
pool *pgxpool.Pool,
|
||||||
|
printer models.Printer,
|
||||||
|
pollInterval time.Duration,
|
||||||
|
) {
|
||||||
|
if printer.MoonrakerURL != nil && *printer.MoonrakerURL != "" {
|
||||||
|
mc := clients.NewMoonrakerClient(*printer.MoonrakerURL)
|
||||||
|
poller := workers.NewMoonrakerPoller(workers.MoonrakerPollerConfig{
|
||||||
|
Client: mc,
|
||||||
|
Pool: pool,
|
||||||
|
PollInterval: pollInterval,
|
||||||
|
PrinterID: printer.ID,
|
||||||
|
PrinterName: printer.Name,
|
||||||
|
})
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
poller.Run(ctx)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
if printer.MQTTBrokerHost != nil && *printer.MQTTBrokerHost != "" {
|
||||||
|
topicPrefix := cfg.MQTTTopicPrefix
|
||||||
|
if printer.MQTTTopicPrefix != nil && *printer.MQTTTopicPrefix != "" {
|
||||||
|
topicPrefix = *printer.MQTTTopicPrefix
|
||||||
|
}
|
||||||
|
sub := workers.NewMQTTSubscriber(workers.MQTTSubscriberConfig{
|
||||||
|
Pool: pool,
|
||||||
|
PrinterID: printer.ID,
|
||||||
|
PrinterName: printer.Name,
|
||||||
|
})
|
||||||
|
mqttClient := clients.NewMQTTClient(clients.MQTTConfig{
|
||||||
|
Broker: *printer.MQTTBrokerHost,
|
||||||
|
ClientID: cfg.MQTTClientID + "-p" + strconv.Itoa(printer.ID),
|
||||||
|
TopicPrefix: topicPrefix,
|
||||||
|
TLSCert: cfg.MQTTTLSCert,
|
||||||
|
TLSKey: cfg.MQTTTLSKey,
|
||||||
|
Handler: sub.HandleBambuReport,
|
||||||
|
})
|
||||||
|
sub.Client = mqttClient
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := sub.Run(ctx); err != nil {
|
||||||
|
slog.Error("mqtt subscriber error", "printer_id", printer.ID, "error", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
module github.com/CubeCraft-Creations/Extrudex/backend
|
module github.com/CubeCraft-Creations/Extrudex/backend
|
||||||
|
|
||||||
go 1.24
|
go 1.24.0
|
||||||
|
|
||||||
|
toolchain go1.24.2
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/eclipse/paho.mqtt.golang v1.5.1
|
||||||
github.com/go-chi/chi/v5 v5.2.0
|
github.com/go-chi/chi/v5 v5.2.0
|
||||||
|
github.com/gorilla/websocket v1.5.3
|
||||||
github.com/jackc/pgx/v5 v5.7.4
|
github.com/jackc/pgx/v5 v5.7.4
|
||||||
github.com/kelseyhightower/envconfig v1.4.0
|
github.com/kelseyhightower/envconfig v1.4.0
|
||||||
)
|
)
|
||||||
@@ -12,7 +16,8 @@ require (
|
|||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
golang.org/x/crypto v0.31.0 // indirect
|
golang.org/x/crypto v0.42.0 // indirect
|
||||||
golang.org/x/sync v0.10.0 // indirect
|
golang.org/x/net v0.44.0 // indirect
|
||||||
golang.org/x/text v0.21.0 // indirect
|
golang.org/x/sync v0.17.0 // indirect
|
||||||
|
golang.org/x/text v0.29.0 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,8 +1,12 @@
|
|||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
|
||||||
|
github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
|
||||||
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
|
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
|
||||||
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
|
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
|
||||||
|
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||||
|
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
@@ -20,12 +24,14 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
|
|||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
|
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
|
||||||
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
|
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
|
||||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
|
||||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
|
||||||
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||||
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||||
|
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
|
||||||
|
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
|||||||
@@ -1,161 +1,171 @@
|
|||||||
// Package clients provides third-party printer integrations.
|
// Package clients provides client implementations for printer integrations:
|
||||||
|
// Moonraker REST + WebSocket (Klipper-based printers) and MQTT (Bambu Lab).
|
||||||
package clients
|
package clients
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MoonrakerPrinterInfo represents the response from /api/printer/info.
|
// ── Moonraker response types ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// moonrakerRPC is the generic JSON-RPC wrapper Moonraker uses for responses.
|
||||||
|
type moonrakerRPC struct {
|
||||||
|
Result json.RawMessage `json:"result"`
|
||||||
|
Error *moonrakerError `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type moonrakerError struct {
|
||||||
|
Code int `json:"code"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Public DTOs ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// MoonrakerPrinterInfo represents the /printer/info response.
|
||||||
type MoonrakerPrinterInfo struct {
|
type MoonrakerPrinterInfo struct {
|
||||||
State string `json:"state"`
|
State string `json:"state"`
|
||||||
Hostname string `json:"hostname,omitempty"`
|
StateMessage string `json:"state_message"`
|
||||||
SoftwareVersion string `json:"software_version,omitempty"`
|
KlippyReady bool `json:"klippy_ready"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MoonrakerPrintStats represents the response from /api/printer/print_stats.
|
// MoonrakerPrintStats represents the print_stats object from
|
||||||
|
// /printer/objects/query?print_stats.
|
||||||
type MoonrakerPrintStats struct {
|
type MoonrakerPrintStats struct {
|
||||||
State string `json:"state"`
|
State string `json:"state"`
|
||||||
Filename string `json:"filename,omitempty"`
|
Filename *string `json:"filename"`
|
||||||
FilamentUsedMm float64 `json:"filament_used,omitempty"`
|
FilamentUsedMm float64 `json:"filament_used"`
|
||||||
TotalDuration float64 `json:"total_duration,omitempty"`
|
PrintDuration float64 `json:"print_duration"`
|
||||||
PrintDuration float64 `json:"print_duration,omitempty"`
|
Message *string `json:"message"`
|
||||||
Message string `json:"message,omitempty"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MoonrakerPrintJob represents a single job from the history API.
|
// MoonrakerPrintJob represents a single entry in /server/history/items.
|
||||||
type MoonrakerPrintJob struct {
|
type MoonrakerPrintJob struct {
|
||||||
JobID string `json:"job_id,omitempty"`
|
JobID string `json:"job_id"`
|
||||||
Filename string `json:"filename"`
|
Filename string `json:"filename"`
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
StartTime time.Time `json:"start_time"`
|
FilamentUsedMm float64 `json:"filament_used"`
|
||||||
EndTime time.Time `json:"end_time,omitempty"`
|
PrintDuration float64 `json:"print_duration"`
|
||||||
FilamentUsedMm float64 `json:"filament_used,omitempty"`
|
TotalDuration float64 `json:"total_duration"`
|
||||||
TotalDuration float64 `json:"total_duration,omitempty"`
|
StartTime *float64 `json:"start_time"`
|
||||||
|
EndTime *float64 `json:"end_time"`
|
||||||
|
Metadata map[string]interface{} `json:"metadata"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MoonrakerHistoryResponse represents the response from /api/server/history/job.
|
// MoonrakerHistoryResponse wraps the /server/history/items response.
|
||||||
type MoonrakerHistoryResponse struct {
|
type MoonrakerHistoryResponse struct {
|
||||||
Items []MoonrakerPrintJob `json:"jobs"`
|
Items []MoonrakerPrintJob `json:"items"`
|
||||||
|
TotalCount int `json:"count"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MoonrakerClient is an HTTP client for the Moonraker API.
|
// ── Client ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// MoonrakerClient is an HTTP client for the Moonraker REST API on
|
||||||
|
// Klipper-based printers (e.g., Elegoo Centauri Carbon).
|
||||||
type MoonrakerClient struct {
|
type MoonrakerClient struct {
|
||||||
HTTPClient *http.Client
|
baseURL string
|
||||||
|
httpClient *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMoonrakerClient creates a MoonrakerClient with the given request timeout.
|
// NewMoonrakerClient creates a MoonrakerClient that targets the given
|
||||||
func NewMoonrakerClient(timeout time.Duration) *MoonrakerClient {
|
// base URL (e.g., "http://192.168.1.50:7125"). The internal HTTP client
|
||||||
|
// uses a 15-second timeout.
|
||||||
|
func NewMoonrakerClient(baseURL string) *MoonrakerClient {
|
||||||
|
baseURL = strings.TrimRight(baseURL, "/")
|
||||||
return &MoonrakerClient{
|
return &MoonrakerClient{
|
||||||
HTTPClient: &http.Client{Timeout: timeout},
|
baseURL: baseURL,
|
||||||
|
httpClient: &http.Client{
|
||||||
|
Timeout: 15 * time.Second,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// baseURL builds the Moonraker base URL from host and port.
|
// GetPrinterInfo calls GET /printer/info and returns the Klipper state.
|
||||||
func (c *MoonrakerClient) baseURL(host string, port int) string {
|
// Returns nil when the printer is unreachable or the response cannot be parsed.
|
||||||
if port == 0 {
|
func (c *MoonrakerClient) GetPrinterInfo(ctx context.Context) (*MoonrakerPrinterInfo, error) {
|
||||||
port = 80
|
var info MoonrakerPrinterInfo
|
||||||
}
|
if err := c.getJSON(ctx, "/printer/info", &info); err != nil {
|
||||||
return fmt.Sprintf("http://%s:%d", host, port)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPrinterInfo fetches printer info from Moonraker.
|
|
||||||
func (c *MoonrakerClient) GetPrinterInfo(ctx context.Context, host string, port int, apiKey string) (*MoonrakerPrinterInfo, error) {
|
|
||||||
url := c.baseURL(host, port) + "/api/printer/info"
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if apiKey != "" {
|
return &info, nil
|
||||||
req.Header.Set("X-Api-Key", apiKey)
|
}
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
// GetPrintStats calls GET /printer/objects/query?print_stats and returns
|
||||||
|
// real-time print statistics including filament consumption.
|
||||||
|
// Returns nil when no print is active or the printer is unreachable.
|
||||||
|
func (c *MoonrakerClient) GetPrintStats(ctx context.Context) (*MoonrakerPrintStats, error) {
|
||||||
|
var stats MoonrakerPrintStats
|
||||||
|
// Moonraker wraps the object in status.print_stats
|
||||||
|
var wrapper struct {
|
||||||
|
Status struct {
|
||||||
|
PrintStats MoonrakerPrintStats `json:"print_stats"`
|
||||||
|
} `json:"status"`
|
||||||
|
}
|
||||||
|
if err := c.getJSON(ctx, "/printer/objects/query?print_stats", &wrapper); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
stats = wrapper.Status.PrintStats
|
||||||
|
return &stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPrintHistory calls GET /server/history/items and returns recent print
|
||||||
|
// jobs. limit controls the maximum number of items (clamped 1-100).
|
||||||
|
func (c *MoonrakerClient) GetPrintHistory(ctx context.Context, limit int) (*MoonrakerHistoryResponse, error) {
|
||||||
|
if limit < 1 {
|
||||||
|
limit = 1
|
||||||
|
}
|
||||||
|
if limit > 100 {
|
||||||
|
limit = 100
|
||||||
|
}
|
||||||
|
var history MoonrakerHistoryResponse
|
||||||
|
if err := c.getJSON(ctx, fmt.Sprintf("/server/history/items?limit=%d", limit), &history); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &history, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Internal helpers ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
func (c *MoonrakerClient) getJSON(ctx context.Context, path string, target interface{}) error {
|
||||||
|
url := c.baseURL + path
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("moonraker getPrinterInfo request failed: %w", err)
|
return fmt.Errorf("moonraker: failed to build request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Accept", "application/json")
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("moonraker: request failed (%s): %w", url, err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
body, err := io.ReadAll(resp.Body)
|
||||||
return nil, fmt.Errorf("moonraker getPrinterInfo returned status %d", resp.StatusCode)
|
if err != nil {
|
||||||
|
return fmt.Errorf("moonraker: failed to read body: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var body struct {
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
Result MoonrakerPrinterInfo `json:"result"`
|
return fmt.Errorf("moonraker: %s returned HTTP %d: %s", url, resp.StatusCode, string(body))
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
|
|
||||||
return nil, fmt.Errorf("moonraker getPrinterInfo decode failed: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Debug("moonraker printer info", "host", host, "state", body.Result.State)
|
// Moonraker wraps responses in {"result": ...}
|
||||||
return &body.Result, nil
|
var rpc moonrakerRPC
|
||||||
}
|
if err := json.Unmarshal(body, &rpc); err != nil {
|
||||||
|
return fmt.Errorf("moonraker: failed to parse response: %w", err)
|
||||||
// GetPrintStats fetches current print statistics from Moonraker.
|
}
|
||||||
func (c *MoonrakerClient) GetPrintStats(ctx context.Context, host string, port int, apiKey string) (*MoonrakerPrintStats, error) {
|
if rpc.Error != nil && rpc.Error.Message != "" {
|
||||||
url := c.baseURL(host, port) + "/api/printer/print_stats"
|
return fmt.Errorf("moonraker: api error: %s", rpc.Error.Message)
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
}
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
if err := json.Unmarshal(rpc.Result, target); err != nil {
|
||||||
}
|
return fmt.Errorf("moonraker: failed to unmarshal result: %w (raw: %s)", err, string(rpc.Result))
|
||||||
if apiKey != "" {
|
}
|
||||||
req.Header.Set("X-Api-Key", apiKey)
|
return nil
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("moonraker getPrintStats request failed: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return nil, fmt.Errorf("moonraker getPrintStats returned status %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
var body struct {
|
|
||||||
Result MoonrakerPrintStats `json:"result"`
|
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
|
|
||||||
return nil, fmt.Errorf("moonraker getPrintStats decode failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Debug("moonraker print stats", "host", host, "state", body.Result.State, "filename", body.Result.Filename)
|
|
||||||
return &body.Result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPrintHistory fetches completed print job history from Moonraker.
|
|
||||||
func (c *MoonrakerClient) GetPrintHistory(ctx context.Context, host string, port int, apiKey string, limit int) (*MoonrakerHistoryResponse, error) {
|
|
||||||
if limit <= 0 {
|
|
||||||
limit = 25
|
|
||||||
}
|
|
||||||
url := fmt.Sprintf("%s/api/server/history/job?limit=%d", c.baseURL(host, port), limit)
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if apiKey != "" {
|
|
||||||
req.Header.Set("X-Api-Key", apiKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("moonraker getPrintHistory request failed: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return nil, fmt.Errorf("moonraker getPrintHistory returned status %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
var body MoonrakerHistoryResponse
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
|
|
||||||
return nil, fmt.Errorf("moonraker getPrintHistory decode failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Debug("moonraker print history", "host", host, "count", len(body.Items))
|
|
||||||
return &body, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
229
backend/internal/clients/moonraker_ws.go
Normal file
229
backend/internal/clients/moonraker_ws.go
Normal file
@@ -0,0 +1,229 @@
|
|||||||
|
package clients
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ── WebSocket message types ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// moonrakerWSMessage is a single JSON-RPC frame from the Moonraker WebSocket.
|
||||||
|
type moonrakerWSMessage struct {
|
||||||
|
JSONRPC string `json:"jsonrpc"`
|
||||||
|
Method string `json:"method"`
|
||||||
|
Params json.RawMessage `json:"params"`
|
||||||
|
ID *int `json:"id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MoonrakerPrintEvent is the payload delivered by the "notify_status_update"
|
||||||
|
// subscription when print_stats or display_status change.
|
||||||
|
type MoonrakerPrintEvent struct {
|
||||||
|
PrintStats *MoonrakerPrintStats `json:"print_stats"`
|
||||||
|
DisplayStatus *MoonrakerDisplayStatus `json:"display_status"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MoonrakerDisplayStatus carries progress and the LCD message.
|
||||||
|
type MoonrakerDisplayStatus struct {
|
||||||
|
Progress float64 `json:"progress"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MoonrakerStatusHandler is called for every status update received from the
|
||||||
|
// Moonraker WebSocket. It receives the parsed event and the raw JSON.
|
||||||
|
type MoonrakerStatusHandler func(event MoonrakerPrintEvent) error
|
||||||
|
|
||||||
|
// ── WebSocket client ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// MoonrakerWSClient maintains a persistent WebSocket connection to the
|
||||||
|
// Moonraker server and delivers parsed status updates to a handler.
|
||||||
|
type MoonrakerWSClient struct {
|
||||||
|
wsURL string
|
||||||
|
handler MoonrakerStatusHandler
|
||||||
|
dialer *websocket.Dialer
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
conn *websocket.Conn
|
||||||
|
done chan struct{}
|
||||||
|
once sync.Once
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMoonrakerWSClient creates a WebSocket client for the given Moonraker base
|
||||||
|
// URL. The handler is invoked on every status update.
|
||||||
|
func NewMoonrakerWSClient(baseURL string, handler MoonrakerStatusHandler) *MoonrakerWSClient {
|
||||||
|
baseURL = strings.TrimRight(baseURL, "/")
|
||||||
|
wsURL := strings.Replace(baseURL, "http://", "ws://", 1)
|
||||||
|
wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
|
||||||
|
wsURL += "/websocket"
|
||||||
|
|
||||||
|
return &MoonrakerWSClient{
|
||||||
|
wsURL: wsURL,
|
||||||
|
handler: handler,
|
||||||
|
dialer: &websocket.Dialer{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
HandshakeTimeout: 10 * time.Second,
|
||||||
|
},
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect establishes the WebSocket, subscribes to status updates, and
|
||||||
|
// starts the read loop in a background goroutine. It retries on failure
|
||||||
|
// with exponential backoff up to a 60-second cap.
|
||||||
|
func (c *MoonrakerWSClient) Connect(ctx context.Context) {
|
||||||
|
go c.run(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown gracefully closes the WebSocket and stops the read loop.
|
||||||
|
func (c *MoonrakerWSClient) Shutdown() {
|
||||||
|
c.once.Do(func() {
|
||||||
|
close(c.done)
|
||||||
|
})
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if c.conn != nil {
|
||||||
|
c.conn.Close()
|
||||||
|
c.conn = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is the main connection loop with reconnect backoff.
|
||||||
|
func (c *MoonrakerWSClient) run(ctx context.Context) {
|
||||||
|
backoff := 1 * time.Second
|
||||||
|
const maxBackoff = 60 * time.Second
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
slog.Info("moonraker ws: context cancelled, stopping")
|
||||||
|
return
|
||||||
|
case <-c.done:
|
||||||
|
slog.Info("moonraker ws: shutdown requested")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.connectAndRead(ctx); err != nil {
|
||||||
|
slog.Error("moonraker ws: connection error, retrying", "error", err, "backoff", backoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exponential backoff.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-c.done:
|
||||||
|
return
|
||||||
|
case <-time.After(backoff):
|
||||||
|
}
|
||||||
|
backoff *= 2
|
||||||
|
if backoff > maxBackoff {
|
||||||
|
backoff = maxBackoff
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *MoonrakerWSClient) connectAndRead(ctx context.Context) error {
|
||||||
|
slog.Info("moonraker ws: connecting", "url", c.wsURL)
|
||||||
|
|
||||||
|
conn, _, err := c.dialer.DialContext(ctx, c.wsURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("dial failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.conn != nil {
|
||||||
|
c.conn.Close()
|
||||||
|
}
|
||||||
|
c.conn = conn
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.conn == conn {
|
||||||
|
c.conn = nil
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Subscribe to status updates.
|
||||||
|
subReq := map[string]interface{}{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "printer.objects.subscribe",
|
||||||
|
"params": map[string]interface{}{
|
||||||
|
"objects": map[string]interface{}{
|
||||||
|
"print_stats": nil,
|
||||||
|
"display_status": nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"id": 1,
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(subReq); err != nil {
|
||||||
|
return fmt.Errorf("subscribe failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("moonraker ws: subscribed to status updates")
|
||||||
|
|
||||||
|
// Set read deadline to detect stale connections.
|
||||||
|
// 120s is long enough to avoid false positives.
|
||||||
|
pingPeriod := 60 * time.Second
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Set read deadline.
|
||||||
|
if err := conn.SetReadDeadline(time.Now().Add(150 * time.Second)); err != nil {
|
||||||
|
return fmt.Errorf("set read deadline: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, raw, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send periodic pings to keep the connection alive.
|
||||||
|
go func() {
|
||||||
|
time.Sleep(pingPeriod)
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.conn == conn {
|
||||||
|
c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second))
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var msg moonrakerWSMessage
|
||||||
|
if err := json.Unmarshal(raw, &msg); err != nil {
|
||||||
|
slog.Warn("moonraker ws: failed to parse message", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only process notify_status_update messages.
|
||||||
|
if msg.Method != "notify_status_update" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var statusWrapper []MoonrakerPrintEvent
|
||||||
|
if err := json.Unmarshal(msg.Params, &statusWrapper); err != nil {
|
||||||
|
// Params might be an object, not an array.
|
||||||
|
var singleEvent MoonrakerPrintEvent
|
||||||
|
if err2 := json.Unmarshal(msg.Params, &singleEvent); err2 != nil {
|
||||||
|
slog.Warn("moonraker ws: failed to unmarshal status params", "error", err2)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
statusWrapper = []MoonrakerPrintEvent{singleEvent}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ev := range statusWrapper {
|
||||||
|
if c.handler != nil {
|
||||||
|
if err := c.handler(ev); err != nil {
|
||||||
|
slog.Error("moonraker ws: handler error", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,119 +1,183 @@
|
|||||||
// Package clients provides third-party printer integrations.
|
|
||||||
package clients
|
package clients
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MQTTClient wraps the Eclipse Paho MQTT client for printer telemetry.
|
// ── Bambu Lab telemetry types ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
// BambuPrintReport is the JSON payload published by Bambu Lab printers
|
||||||
|
// on the MQTT report topic. The structure varies by printer model;
|
||||||
|
// we extract the common fields needed for filament tracking.
|
||||||
|
type BambuPrintReport struct {
|
||||||
|
// Print holds the active print job data.
|
||||||
|
Print BambuPrintData `json:"print"`
|
||||||
|
|
||||||
|
// VtTray contains AMS tray info; the extruded length is per-tray.
|
||||||
|
VtTray *BambuVtTray `json:"vt_tray,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BambuPrintData carries the active print state from a Bambu report.
|
||||||
|
type BambuPrintData struct {
|
||||||
|
// GcodeFile is the filename being printed.
|
||||||
|
GcodeFile string `json:"gcode_file"`
|
||||||
|
// GcodeState describes the current print state:
|
||||||
|
// "IDLE", "RUNNING", "PAUSE", "FINISH", "FAILED".
|
||||||
|
GcodeState string `json:"gcode_state"`
|
||||||
|
// McPercent is the progress as a percentage (0-100).
|
||||||
|
McPercent int `json:"mc_percent"`
|
||||||
|
// McRemainingTime is the estimated remaining time in minutes.
|
||||||
|
McRemainingTime int `json:"mc_remaining_time"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BambuVtTray holds AMS tray telemetry from Bambu printers.
|
||||||
|
type BambuVtTray struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
TagUID string `json:"tag_uid"`
|
||||||
|
TrayIDName string `json:"tray_id_name"`
|
||||||
|
// TrayInfoIdx is the hex color code for the tray's filament.
|
||||||
|
TrayInfoIdx string `json:"tray_info_idx"`
|
||||||
|
// TrayColor is a hex color string like "FF0000FF".
|
||||||
|
TrayColor string `json:"tray_color"`
|
||||||
|
// Remain is the percentage of filament remaining on this tray (0-100).
|
||||||
|
Remain int `json:"remain"`
|
||||||
|
// K is a temperature coefficient.
|
||||||
|
K float64 `json:"k"`
|
||||||
|
// N is a second temperature coefficient.
|
||||||
|
N float64 `json:"n"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BambuReportHandler is called for each parsed Bambu telemetry message.
|
||||||
|
type BambuReportHandler func(report BambuPrintReport) error
|
||||||
|
|
||||||
|
// ── MQTT client ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// MQTTClient wraps the Eclipse Paho MQTT client for Bambu Lab printer
|
||||||
|
// telemetry with optional TLS support.
|
||||||
type MQTTClient struct {
|
type MQTTClient struct {
|
||||||
|
broker string
|
||||||
|
clientID string
|
||||||
|
topicPrefix string
|
||||||
|
tlsCert string
|
||||||
|
tlsKey string
|
||||||
|
handler BambuReportHandler
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
client mqtt.Client
|
client mqtt.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// MQTTConfig holds per-printer MQTT connection settings.
|
// MQTTConfig holds the configuration for creating an MQTTClient.
|
||||||
type MQTTConfig struct {
|
type MQTTConfig struct {
|
||||||
BrokerHost string
|
Broker string // e.g., "ssl://192.168.1.50:8883"
|
||||||
BrokerPort int
|
ClientID string // unique MQTT client id, defaults to "extrudex"
|
||||||
TopicPrefix string
|
TopicPrefix string // topic prefix, defaults to "device/+/report"
|
||||||
TLSEnabled bool
|
TLSCert string // path to TLS client certificate (optional)
|
||||||
ClientID string
|
TLSKey string // path to TLS client key (optional)
|
||||||
|
Handler BambuReportHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
// BambuPrintStatus is the known Bambu Lab print-status payload shape.
|
// NewMQTTClient creates a new MQTTClient. The connection is not established
|
||||||
type BambuPrintStatus struct {
|
// until Connect is called.
|
||||||
Print struct {
|
func NewMQTTClient(cfg MQTTConfig) *MQTTClient {
|
||||||
GcodeFile string `json:"gcode_file,omitempty"`
|
|
||||||
Stage int `json:"stage,omitempty"`
|
|
||||||
SubTaskName string `json:"subtask_name,omitempty"`
|
|
||||||
PrintType string `json:"print_type,omitempty"`
|
|
||||||
FilamentUsedMm float64 `json:"mc_percent,omitempty"` // placeholder; real telemetry varies
|
|
||||||
} `json:"print,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewMQTTClient creates an MQTT client connected to the given broker.
|
|
||||||
func NewMQTTClient(cfg MQTTConfig) (*MQTTClient, error) {
|
|
||||||
if cfg.BrokerPort == 0 {
|
|
||||||
if cfg.TLSEnabled {
|
|
||||||
cfg.BrokerPort = 8883
|
|
||||||
} else {
|
|
||||||
cfg.BrokerPort = 1883
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if cfg.ClientID == "" {
|
if cfg.ClientID == "" {
|
||||||
cfg.ClientID = fmt.Sprintf("extrudex-%d", time.Now().Unix())
|
cfg.ClientID = "extrudex"
|
||||||
}
|
}
|
||||||
|
if cfg.TopicPrefix == "" {
|
||||||
|
cfg.TopicPrefix = "device/+/report"
|
||||||
|
}
|
||||||
|
return &MQTTClient{
|
||||||
|
broker: cfg.Broker,
|
||||||
|
clientID: cfg.ClientID,
|
||||||
|
topicPrefix: cfg.TopicPrefix,
|
||||||
|
tlsCert: cfg.TLSCert,
|
||||||
|
tlsKey: cfg.TLSKey,
|
||||||
|
handler: cfg.Handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect establishes the MQTT connection and subscribes to the configured
|
||||||
|
// topic prefix. Returns an error if the initial connection fails.
|
||||||
|
func (c *MQTTClient) Connect() error {
|
||||||
opts := mqtt.NewClientOptions().
|
opts := mqtt.NewClientOptions().
|
||||||
AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.BrokerHost, cfg.BrokerPort)).
|
AddBroker(c.broker).
|
||||||
SetClientID(cfg.ClientID).
|
SetClientID(c.clientID).
|
||||||
SetAutoReconnect(true).
|
SetAutoReconnect(true).
|
||||||
SetConnectTimeout(10 * time.Second).
|
SetMaxReconnectInterval(30 * time.Second).
|
||||||
SetOrderMatters(false)
|
SetKeepAlive(30 * time.Second).
|
||||||
|
SetPingTimeout(10 * time.Second).
|
||||||
|
SetConnectTimeout(15 * time.Second).
|
||||||
|
SetOnConnectHandler(func(client mqtt.Client) {
|
||||||
|
slog.Info("mqtt: connected", "broker", c.broker)
|
||||||
|
// Subscribe on every reconnect.
|
||||||
|
token := client.Subscribe(c.topicPrefix, 0, c.messageHandler)
|
||||||
|
token.Wait()
|
||||||
|
if err := token.Error(); err != nil {
|
||||||
|
slog.Error("mqtt: subscribe failed on reconnect", "topic", c.topicPrefix, "error", err)
|
||||||
|
} else {
|
||||||
|
slog.Info("mqtt: subscribed", "topic", c.topicPrefix)
|
||||||
|
}
|
||||||
|
}).
|
||||||
|
SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
||||||
|
slog.Warn("mqtt: connection lost", "error", err)
|
||||||
|
})
|
||||||
|
|
||||||
if cfg.TLSEnabled {
|
// Configure TLS if cert and key are provided.
|
||||||
opts = opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: false})
|
if c.tlsCert != "" && c.tlsKey != "" {
|
||||||
|
cert, err := tls.LoadX509KeyPair(c.tlsCert, c.tlsKey)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("mqtt: failed to load TLS cert/key: %w", err)
|
||||||
|
}
|
||||||
|
opts.SetTLSConfig(&tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
MinVersion: tls.VersionTLS12,
|
||||||
|
})
|
||||||
|
slog.Info("mqtt: TLS configured", "cert", c.tlsCert)
|
||||||
}
|
}
|
||||||
|
|
||||||
client := mqtt.NewClient(opts)
|
c.client = mqtt.NewClient(opts)
|
||||||
token := client.Connect()
|
token := c.client.Connect()
|
||||||
if token.Wait() && token.Error() != nil {
|
if !token.WaitTimeout(15 * time.Second) {
|
||||||
return nil, fmt.Errorf("mqtt connect failed: %w", token.Error())
|
return fmt.Errorf("mqtt: connect timed out to %s", c.broker)
|
||||||
|
}
|
||||||
|
if err := token.Error(); err != nil {
|
||||||
|
return fmt.Errorf("mqtt: connect failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("mqtt client connected", "broker", cfg.BrokerHost, "port", cfg.BrokerPort, "tls", cfg.TLSEnabled)
|
slog.Info("mqtt: initial connection established", "broker", c.broker)
|
||||||
return &MQTTClient{client: client}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe registers a callback for messages matching topic.
|
|
||||||
func (c *MQTTClient) Subscribe(topic string, qos byte, callback func([]byte)) error {
|
|
||||||
token := c.client.Subscribe(topic, qos, func(_ mqtt.Client, msg mqtt.Message) {
|
|
||||||
callback(msg.Payload())
|
|
||||||
})
|
|
||||||
if token.Wait() && token.Error() != nil {
|
|
||||||
return fmt.Errorf("mqtt subscribe failed: %w", token.Error())
|
|
||||||
}
|
|
||||||
slog.Info("mqtt subscribed", "topic", topic, "qos", qos)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe removes a subscription.
|
// Disconnect gracefully closes the MQTT connection.
|
||||||
func (c *MQTTClient) Unsubscribe(topics ...string) error {
|
func (c *MQTTClient) Disconnect() {
|
||||||
token := c.client.Unsubscribe(topics...)
|
c.mu.Lock()
|
||||||
if token.Wait() && token.Error() != nil {
|
defer c.mu.Unlock()
|
||||||
return fmt.Errorf("mqtt unsubscribe failed: %w", token.Error())
|
if c.client != nil && c.client.IsConnected() {
|
||||||
}
|
c.client.Disconnect(2500) // wait up to 2.5s
|
||||||
return nil
|
slog.Info("mqtt: disconnected")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Disconnect cleanly disconnects the MQTT client.
|
|
||||||
func (c *MQTTClient) Disconnect(quiesceMs uint) {
|
// messageHandler is the MQTT callback invoked for every message received on
|
||||||
c.client.Disconnect(quiesceMs)
|
// the subscribed topic.
|
||||||
}
|
func (c *MQTTClient) messageHandler(_ mqtt.Client, msg mqtt.Message) {
|
||||||
|
if c.handler == nil {
|
||||||
// IsConnected returns whether the underlying client is connected.
|
return
|
||||||
func (c *MQTTClient) IsConnected() bool {
|
}
|
||||||
return c.client.IsConnected()
|
|
||||||
}
|
var report BambuPrintReport
|
||||||
|
if err := json.Unmarshal(msg.Payload(), &report); err != nil {
|
||||||
// ParseBambuTelemetry attempts to parse a Bambu Lab telemetry JSON payload.
|
slog.Warn("mqtt: failed to parse bambu report", "topic", msg.Topic(), "error", err)
|
||||||
func ParseBambuTelemetry(payload []byte) (*BambuPrintStatus, error) {
|
return
|
||||||
var msg BambuPrintStatus
|
}
|
||||||
if err := json.Unmarshal(payload, &msg); err != nil {
|
|
||||||
return nil, fmt.Errorf("parse bambu telemetry failed: %w", err)
|
if err := c.handler(report); err != nil {
|
||||||
}
|
slog.Error("mqtt: handler error", "topic", msg.Topic(), "error", err)
|
||||||
return &msg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DefaultBambuTopics returns the default topic patterns for Bambu Lab printers.
|
|
||||||
func DefaultBambuTopics(topicPrefix string) []string {
|
|
||||||
return []string{
|
|
||||||
topicPrefix + "/report",
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,17 @@ type Config struct {
|
|||||||
Port string `envconfig:"port" default:"8080"`
|
Port string `envconfig:"port" default:"8080"`
|
||||||
CorsOrigin string `envconfig:"cors_origin" default:"*"`
|
CorsOrigin string `envconfig:"cors_origin" default:"*"`
|
||||||
LogLevel string `envconfig:"log_level" default:"info"`
|
LogLevel string `envconfig:"log_level" default:"info"`
|
||||||
|
|
||||||
|
// Moonraker integration.
|
||||||
|
MoonrakerURL string `envconfig:"moonraker_url" default:"http://localhost:7125"`
|
||||||
|
MoonrakerPollInterval string `envconfig:"moonraker_poll_interval" default:"10s"`
|
||||||
|
|
||||||
|
// MQTT (Bambu Lab) integration.
|
||||||
|
MQTTBroker string `envconfig:"mqtt_broker" default:"localhost:1883"`
|
||||||
|
MQTTTopicPrefix string `envconfig:"mqtt_topic_prefix" default:"device/+/report"`
|
||||||
|
MQTTClientID string `envconfig:"mqtt_client_id" default:"extrudex"`
|
||||||
|
MQTTTLSCert string `envconfig:"mqtt_tls_cert" default:""`
|
||||||
|
MQTTTLSKey string `envconfig:"mqtt_tls_key" default:""`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load reads configuration from environment variables and returns a populated Config.
|
// Load reads configuration from environment variables and returns a populated Config.
|
||||||
|
|||||||
@@ -1,321 +1,255 @@
|
|||||||
// Package workers provides background goroutines for printer telemetry.
|
|
||||||
package workers
|
package workers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"strconv"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/clients"
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/clients"
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/models"
|
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/repositories"
|
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/sse"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MoonrakerPollerConfig controls the background polling behaviour.
|
// ── deduplication ───────────────────────────────────────────────────────────
|
||||||
type MoonrakerPollerConfig struct {
|
|
||||||
PollInterval time.Duration
|
// jobTrack holds the last-seen filename and filament_used for dedup.
|
||||||
RequestTimeout time.Duration
|
type jobTrack struct {
|
||||||
|
filename string
|
||||||
|
filamentUsed float64
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultMoonrakerPollerConfig returns sensible defaults.
|
// MoonrakerPoller periodically queries the Moonraker REST API for print stats
|
||||||
func DefaultMoonrakerPollerConfig() MoonrakerPollerConfig {
|
// and logs filament usage to PostgreSQL. It deduplicates by tracking the
|
||||||
return MoonrakerPollerConfig{
|
// last-known filament_used value for the active job on this printer.
|
||||||
PollInterval: 30 * time.Second,
|
|
||||||
RequestTimeout: 10 * time.Second,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// MoonrakerPoller periodically polls Moonraker printers for status and usage.
|
|
||||||
type MoonrakerPoller struct {
|
type MoonrakerPoller struct {
|
||||||
cfg MoonrakerPollerConfig
|
client *clients.MoonrakerClient
|
||||||
client *clients.MoonrakerClient
|
pool *pgxpool.Pool
|
||||||
printerRepo *repositories.PrinterRepository
|
pollInterval time.Duration
|
||||||
jobRepo *repositories.PrintJobRepository
|
printerID int
|
||||||
usageRepo *repositories.UsageLogRepository
|
printerName string
|
||||||
sseBC *sse.Broadcaster
|
|
||||||
pool *pgxpool.Pool
|
mu sync.Mutex
|
||||||
stop chan struct{}
|
track jobTrack
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMoonrakerPoller creates a poller. It uses the pool directly for
|
// MoonrakerPollerConfig holds configuration for the Moonraker polling worker.
|
||||||
// transaction-scoped writes that the repository layer cannot span.
|
type MoonrakerPollerConfig struct {
|
||||||
func NewMoonrakerPoller(
|
Client *clients.MoonrakerClient
|
||||||
cfg MoonrakerPollerConfig,
|
Pool *pgxpool.Pool
|
||||||
pool *pgxpool.Pool,
|
PollInterval time.Duration
|
||||||
printerRepo *repositories.PrinterRepository,
|
PrinterID int
|
||||||
jobRepo *repositories.PrintJobRepository,
|
PrinterName string
|
||||||
usageRepo *repositories.UsageLogRepository,
|
}
|
||||||
sseBC *sse.Broadcaster,
|
|
||||||
) *MoonrakerPoller {
|
// NewMoonrakerPoller creates a new MoonrakerPoller worker.
|
||||||
|
func NewMoonrakerPoller(cfg MoonrakerPollerConfig) *MoonrakerPoller {
|
||||||
|
if cfg.PollInterval <= 0 {
|
||||||
|
cfg.PollInterval = 10 * time.Second
|
||||||
|
}
|
||||||
return &MoonrakerPoller{
|
return &MoonrakerPoller{
|
||||||
cfg: cfg,
|
client: cfg.Client,
|
||||||
client: clients.NewMoonrakerClient(cfg.RequestTimeout),
|
pool: cfg.Pool,
|
||||||
printerRepo: printerRepo,
|
pollInterval: cfg.PollInterval,
|
||||||
jobRepo: jobRepo,
|
printerID: cfg.PrinterID,
|
||||||
usageRepo: usageRepo,
|
printerName: cfg.PrinterName,
|
||||||
sseBC: sseBC,
|
|
||||||
pool: pool,
|
|
||||||
stop: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins the polling loop in a goroutine.
|
// Run starts the polling loop. It blocks until ctx is cancelled.
|
||||||
func (p *MoonrakerPoller) Start() {
|
func (w *MoonrakerPoller) Run(ctx context.Context) {
|
||||||
go p.loop()
|
slog.Info("moonraker poller: starting",
|
||||||
}
|
"printer_id", w.printerID,
|
||||||
|
"printer_name", w.printerName,
|
||||||
|
"interval", w.pollInterval,
|
||||||
|
)
|
||||||
|
|
||||||
// Stop signals the loop to exit.
|
ticker := time.NewTicker(w.pollInterval)
|
||||||
func (p *MoonrakerPoller) Stop() {
|
|
||||||
close(p.stop)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MoonrakerPoller) loop() {
|
|
||||||
ticker := time.NewTicker(p.cfg.PollInterval)
|
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Immediate first tick.
|
w.poll(ctx)
|
||||||
p.pollCycle()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ctx.Done():
|
||||||
p.pollCycle()
|
slog.Info("moonraker poller: stopping", "printer_id", w.printerID)
|
||||||
case <-p.stop:
|
|
||||||
slog.Info("moonraker poller stopped")
|
|
||||||
return
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
w.poll(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *MoonrakerPoller) pollCycle() {
|
func (w *MoonrakerPoller) poll(ctx context.Context) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
stats, err := w.client.GetPrintStats(ctx)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
printers, err := p.printerRepo.GetAll(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("moonraker poller: failed to list printers", "error", err)
|
slog.Warn("moonraker poller: failed to get print stats",
|
||||||
|
"printer_id", w.printerID, "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, printer := range printers {
|
if stats.State == "" {
|
||||||
if !printer.IsActive || printer.MoonrakerURL == nil || *printer.MoonrakerURL == "" {
|
return
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.pollPrinter(ctx, printer); err != nil {
|
|
||||||
slog.Warn("moonraker poller: poll failed",
|
|
||||||
"printer", printer.Name,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
jobName := "unknown"
|
||||||
|
if stats.Filename != nil {
|
||||||
|
jobName = *stats.Filename
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute delta under lock; release before I/O.
|
||||||
|
w.mu.Lock()
|
||||||
|
prevName := w.track.filename
|
||||||
|
prevUsed := w.track.filamentUsed
|
||||||
|
|
||||||
|
if jobName != prevName {
|
||||||
|
w.track.filename = jobName
|
||||||
|
w.track.filamentUsed = 0
|
||||||
|
prevUsed = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
deltaMM := stats.FilamentUsedMm - prevUsed
|
||||||
|
totalMM := stats.FilamentUsedMm
|
||||||
|
if deltaMM <= 0 && jobName == prevName {
|
||||||
|
w.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
slog.Info("moonraker poller: filament usage",
|
||||||
|
"printer_id", w.printerID,
|
||||||
|
"job", jobName,
|
||||||
|
"delta_mm", deltaMM,
|
||||||
|
"total_mm", totalMM,
|
||||||
|
"state", stats.State,
|
||||||
|
)
|
||||||
|
|
||||||
|
jobID, err := w.ensurePrintJob(ctx, jobName, stats.State)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("moonraker poller: failed to ensure print job",
|
||||||
|
"printer_id", w.printerID, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
spoolID, density := lookupActiveSpool(ctx, w.pool, w.printerID)
|
||||||
|
|
||||||
|
if err := insertUsageLog(ctx, w.pool, jobID, spoolID, deltaMM, density); err != nil {
|
||||||
|
slog.Error("moonraker poller: failed to log usage",
|
||||||
|
"printer_id", w.printerID, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.mu.Lock()
|
||||||
|
w.track.filamentUsed = totalMM
|
||||||
|
w.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// pollPrinter performs a single Moonraker poll for a printer.
|
func (w *MoonrakerPoller) ensurePrintJob(ctx context.Context, jobName, state string) (int, error) {
|
||||||
func (p *MoonrakerPoller) pollPrinter(ctx context.Context, printer models.Printer) error {
|
var jobID int
|
||||||
host := *printer.MoonrakerURL
|
err := w.pool.QueryRow(ctx, `
|
||||||
var apiKey string
|
SELECT pj.id FROM print_jobs pj
|
||||||
if printer.MoonrakerAPIKey != nil {
|
JOIN job_statuses js ON pj.job_status_id = js.id
|
||||||
apiKey = *printer.MoonrakerAPIKey
|
WHERE pj.printer_id = $1
|
||||||
|
AND pj.job_name = $2
|
||||||
|
AND pj.deleted_at IS NULL
|
||||||
|
AND js.name IN ('printing', 'pending')
|
||||||
|
ORDER BY pj.created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
`, w.printerID, jobName).Scan(&jobID)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
_, _ = w.pool.Exec(ctx, `
|
||||||
|
UPDATE print_jobs SET
|
||||||
|
job_status_id = (SELECT id FROM job_statuses WHERE name = 'printing'),
|
||||||
|
started_at = COALESCE(started_at, NOW()),
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE id = $1
|
||||||
|
AND job_status_id = (SELECT id FROM job_statuses WHERE name = 'pending')
|
||||||
|
`, jobID)
|
||||||
|
return jobID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch printer info (status)
|
var statusID int
|
||||||
info, err := p.client.GetPrinterInfo(ctx, host, 80, apiKey)
|
err = w.pool.QueryRow(ctx, `SELECT id FROM job_statuses WHERE name = 'printing'`).Scan(&statusID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.broadcastStatus(printer.ID, printer.Name, "offline")
|
return 0, fmt.Errorf("moonraker poller: missing 'printing' job status: %w", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
status := mapMoonrakerState(info.State)
|
err = w.pool.QueryRow(ctx, `
|
||||||
p.broadcastStatus(printer.ID, printer.Name, status)
|
INSERT INTO print_jobs (printer_id, job_name, file_name, job_status_id, started_at)
|
||||||
|
VALUES ($1, $2, $3, $4, NOW())
|
||||||
// Fetch print stats
|
RETURNING id
|
||||||
stats, err := p.client.GetPrintStats(ctx, host, 80, apiKey)
|
`, w.printerID, jobName, jobName, statusID).Scan(&jobID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("getPrintStats failed: %w", err)
|
return 0, fmt.Errorf("moonraker poller: failed to create print job: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if status == "printing" && stats.Filename != "" {
|
slog.Info("moonraker poller: created print job", "job_id", jobID, "job_name", jobName)
|
||||||
p.broadcastJobStarted(printer.ID, stats.Filename)
|
return jobID, nil
|
||||||
}
|
|
||||||
|
|
||||||
if isCompleteState(stats.State) && stats.FilamentUsedMm > 0 {
|
|
||||||
// Record usage
|
|
||||||
if err := p.recordUsage(ctx, printer, stats); err != nil {
|
|
||||||
slog.Error("moonraker poller: record usage failed",
|
|
||||||
"printer", printer.Name, "error", err)
|
|
||||||
} else {
|
|
||||||
p.broadcastJobCompleted(printer.ID, stats.Filename, stats.FilamentUsedMm)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *MoonrakerPoller) recordUsage(ctx context.Context, printer models.Printer, stats *clients.MoonrakerPrintStats) error {
|
// ── Package-level helpers (shared by both workers) ──────────────────────────
|
||||||
// Find active spool for printer — for now use the first active spool
|
|
||||||
// or fallback to the one referenced by the printer if available.
|
|
||||||
// In a real scenario we'd query AMS slots or fallback logic.
|
|
||||||
// Here we simply look for the most recently used spool in usage_logs.
|
|
||||||
var spoolID int
|
|
||||||
row := p.pool.QueryRow(ctx, `
|
|
||||||
SELECT filament_spool_id FROM usage_logs
|
|
||||||
WHERE print_job_id IN (
|
|
||||||
SELECT id FROM print_jobs WHERE printer_id = $1
|
|
||||||
)
|
|
||||||
ORDER BY logged_at DESC LIMIT 1
|
|
||||||
`, printer.ID)
|
|
||||||
_ = row.Scan(&spoolID)
|
|
||||||
|
|
||||||
if spoolID == 0 {
|
// lookupActiveSpool finds the most recently used spool for a given printer.
|
||||||
// No prior usage — skip recording (no known spool to deduct from)
|
func lookupActiveSpool(ctx context.Context, pool *pgxpool.Pool, printerID int) (int, float64) {
|
||||||
slog.Warn("moonraker poller: no known spool for printer; skipping usage record",
|
type result struct {
|
||||||
"printer", printer.Name)
|
id int
|
||||||
|
density float64
|
||||||
|
}
|
||||||
|
var res result
|
||||||
|
|
||||||
|
err := pool.QueryRow(ctx, `
|
||||||
|
SELECT fs.id, COALESCE(mb.density_g_cm3, 1.24)
|
||||||
|
FROM filament_spools fs
|
||||||
|
JOIN material_bases mb ON fs.material_base_id = mb.id
|
||||||
|
JOIN print_jobs pj ON pj.filament_spool_id = fs.id
|
||||||
|
WHERE pj.printer_id = $1 AND fs.deleted_at IS NULL
|
||||||
|
ORDER BY pj.created_at DESC LIMIT 1
|
||||||
|
`, printerID).Scan(&res.id, &res.density)
|
||||||
|
if err == nil {
|
||||||
|
return res.id, res.density
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pool.QueryRow(ctx, `
|
||||||
|
SELECT fs.id, COALESCE(mb.density_g_cm3, 1.24)
|
||||||
|
FROM filament_spools fs
|
||||||
|
JOIN material_bases mb ON fs.material_base_id = mb.id
|
||||||
|
WHERE fs.deleted_at IS NULL
|
||||||
|
ORDER BY fs.created_at DESC LIMIT 1
|
||||||
|
`).Scan(&res.id, &res.density)
|
||||||
|
if err == nil {
|
||||||
|
return res.id, res.density
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1, 1.24
|
||||||
|
}
|
||||||
|
|
||||||
|
// insertUsageLog inserts a usage_log entry and decrements the spool's remaining grams.
|
||||||
|
func insertUsageLog(ctx context.Context, pool *pgxpool.Pool, jobID, spoolID int, deltaMM, densityGCm3 float64) error {
|
||||||
|
const crossSectionCm2 = 0.02405 // π * (0.0875cm)² for 1.75mm filament
|
||||||
|
gramsUsed := crossSectionCm2 * (deltaMM / 10.0) * densityGCm3
|
||||||
|
|
||||||
|
if gramsUsed <= 0 || deltaMM <= 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute grams from mm extruded using defaults (1.75mm diameter, PLA density 1.24)
|
if _, err := pool.Exec(ctx, `
|
||||||
grams := calculateGrams(stats.FilamentUsedMm, 1.75, 1.24)
|
|
||||||
|
|
||||||
// Create a print job record
|
|
||||||
var jobID int
|
|
||||||
err := p.pool.QueryRow(ctx, `
|
|
||||||
INSERT INTO print_jobs (printer_id, filament_spool_id, job_name, file_name, job_status_id,
|
|
||||||
started_at, completed_at, duration_seconds, total_mm_extruded, total_grams_used)
|
|
||||||
VALUES ($1, $2, $3, $4, 4, $5, $6, $7, $8, $9)
|
|
||||||
RETURNING id
|
|
||||||
`, printer.ID, spoolID, stats.Filename, stats.Filename,
|
|
||||||
time.Now().Add(-time.Duration(stats.TotalDuration)*time.Second),
|
|
||||||
time.Now(),
|
|
||||||
int(stats.TotalDuration),
|
|
||||||
stats.FilamentUsedMm,
|
|
||||||
grams,
|
|
||||||
).Scan(&jobID)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("insert print_job failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create usage_log
|
|
||||||
_, err = p.pool.Exec(ctx, `
|
|
||||||
INSERT INTO usage_logs (print_job_id, filament_spool_id, mm_extruded, grams_used, logged_at)
|
INSERT INTO usage_logs (print_job_id, filament_spool_id, mm_extruded, grams_used, logged_at)
|
||||||
VALUES ($1, $2, $3, $4, NOW())
|
VALUES ($1, $2, $3, $4, NOW())
|
||||||
`, jobID, spoolID, stats.FilamentUsedMm, grams)
|
`, jobID, spoolID, deltaMM, gramsUsed); err != nil {
|
||||||
|
return fmt.Errorf("usage_log insert failed: %w", err)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("insert usage_log failed: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("moonraker poller: recorded usage",
|
_, _ = pool.Exec(ctx, `
|
||||||
"printer", printer.Name,
|
UPDATE filament_spools
|
||||||
"job", stats.Filename,
|
SET remaining_grams = GREATEST(remaining_grams - $2::int, 0),
|
||||||
"mm", stats.FilamentUsedMm,
|
updated_at = NOW()
|
||||||
"grams", grams,
|
WHERE id = $1
|
||||||
|
`, spoolID, int(gramsUsed))
|
||||||
|
|
||||||
|
slog.Debug("moonraker poller: logged usage",
|
||||||
|
"job_id", jobID, "spool_id", spoolID,
|
||||||
|
"mm_extruded", deltaMM, "grams_used", gramsUsed,
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *MoonrakerPoller) broadcastStatus(printerID int, name, status string) {
|
|
||||||
if p.sseBC == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ev, err := sse.NewEvent(sse.EventPrinterStatus, sse.PrinterStatusPayload{
|
|
||||||
PrinterID: printerID,
|
|
||||||
PrinterName: name,
|
|
||||||
Status: status,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.sseBC.Publish(ev)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MoonrakerPoller) broadcastJobStarted(printerID int, jobName string) {
|
|
||||||
if p.sseBC == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ev, err := sse.NewEvent(sse.EventJobStarted, sse.JobStartedPayload{
|
|
||||||
JobName: jobName,
|
|
||||||
PrinterID: printerID,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.sseBC.Publish(ev)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *MoonrakerPoller) broadcastJobCompleted(printerID int, jobName string, mmExtruded float64) {
|
|
||||||
if p.sseBC == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
grams := calculateGrams(mmExtruded, 1.75, 1.24)
|
|
||||||
gramsInt := int(grams)
|
|
||||||
ev, err := sse.NewEvent(sse.EventJobCompleted, sse.JobCompletedPayload{
|
|
||||||
JobName: jobName,
|
|
||||||
PrinterID: printerID,
|
|
||||||
TotalGramsUsed: &gramsInt,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.sseBC.Publish(ev)
|
|
||||||
}
|
|
||||||
|
|
||||||
func mapMoonrakerState(state string) string {
|
|
||||||
switch state {
|
|
||||||
case "printing":
|
|
||||||
return "printing"
|
|
||||||
case "paused":
|
|
||||||
return "paused"
|
|
||||||
case "complete", "standby", "cancelled":
|
|
||||||
return "idle"
|
|
||||||
case "error":
|
|
||||||
return "error"
|
|
||||||
default:
|
|
||||||
return "offline"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func isCompleteState(state string) bool {
|
|
||||||
return state == "complete" || state == "completed"
|
|
||||||
}
|
|
||||||
|
|
||||||
func calculateGrams(mmExtruded, diameterMm, densityGcm3 float64) float64 {
|
|
||||||
if mmExtruded <= 0 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
radiusCm := diameterMm / 2.0 / 10.0
|
|
||||||
crossSection := 3.141592653589793 * radiusCm * radiusCm
|
|
||||||
volumeCm3 := (mmExtruded / 10.0) * crossSection
|
|
||||||
return volumeCm3 * densityGcm3
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
// Helper for port parsing (Moonraker URL may contain port)
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
func extractHostPort(rawURL string) (string, int) {
|
|
||||||
// Very simplistic: if rawURL contains ":" after a dot, parse host:port.
|
|
||||||
// Otherwise assume host only and return port 80.
|
|
||||||
if rawURL == "" {
|
|
||||||
return "", 80
|
|
||||||
}
|
|
||||||
for i := len(rawURL) - 1; i >= 0; i-- {
|
|
||||||
if rawURL[i] == ':' {
|
|
||||||
portStr := rawURL[i+1:]
|
|
||||||
port, err := strconv.Atoi(portStr)
|
|
||||||
if err == nil {
|
|
||||||
return rawURL[:i], port
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if rawURL[i] == '/' {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rawURL, 80
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,223 +1,170 @@
|
|||||||
// Package workers provides background goroutines for printer telemetry.
|
|
||||||
package workers
|
package workers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/clients"
|
"github.com/CubeCraft-Creations/Extrudex/backend/internal/clients"
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/models"
|
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/repositories"
|
|
||||||
"github.com/CubeCraft-Creations/Extrudex/backend/internal/sse"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MQTTSubscriberConfig controls MQTT background worker behaviour.
|
// bambuJobState tracks the active print job detected via MQTT.
|
||||||
type MQTTSubscriberConfig struct {
|
type bambuJobState struct {
|
||||||
ReconnectInterval time.Duration
|
gcodeFile string
|
||||||
|
gcodeState string
|
||||||
|
percent int
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultMQTTSubscriberConfig returns sensible defaults.
|
// MQTTSubscriber listens to Bambu Lab MQTT telemetry topics and logs
|
||||||
func DefaultMQTTSubscriberConfig() MQTTSubscriberConfig {
|
// filament usage events to PostgreSQL.
|
||||||
return MQTTSubscriberConfig{
|
|
||||||
ReconnectInterval: 30 * time.Second,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// MQTTSubscriber manages per-printer MQTT connections and telemetry ingestion.
|
|
||||||
type MQTTSubscriber struct {
|
type MQTTSubscriber struct {
|
||||||
cfg MQTTSubscriberConfig
|
Client *clients.MQTTClient
|
||||||
printerRepo *repositories.PrinterRepository
|
pool *pgxpool.Pool
|
||||||
usageRepo *repositories.UsageLogRepository
|
|
||||||
sseBC *sse.Broadcaster
|
printerID int
|
||||||
pool *pgxpool.Pool
|
printerName string
|
||||||
clients map[int]*clients.MQTTClient // keyed by printer ID
|
|
||||||
stop chan struct{}
|
mu sync.Mutex
|
||||||
|
state bambuJobState
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMQTTSubscriber creates a new subscriber worker.
|
// MQTTSubscriberConfig holds configuration for the MQTT subscriber worker.
|
||||||
func NewMQTTSubscriber(
|
type MQTTSubscriberConfig struct {
|
||||||
cfg MQTTSubscriberConfig,
|
Pool *pgxpool.Pool
|
||||||
pool *pgxpool.Pool,
|
PrinterID int
|
||||||
printerRepo *repositories.PrinterRepository,
|
PrinterName string
|
||||||
usageRepo *repositories.UsageLogRepository,
|
}
|
||||||
sseBC *sse.Broadcaster,
|
|
||||||
) *MQTTSubscriber {
|
// NewMQTTSubscriber creates a new MQTTSubscriber worker. Set Client after
|
||||||
|
// construction to wire the handler.
|
||||||
|
func NewMQTTSubscriber(cfg MQTTSubscriberConfig) *MQTTSubscriber {
|
||||||
return &MQTTSubscriber{
|
return &MQTTSubscriber{
|
||||||
cfg: cfg,
|
pool: cfg.Pool,
|
||||||
printerRepo: printerRepo,
|
printerID: cfg.PrinterID,
|
||||||
usageRepo: usageRepo,
|
printerName: cfg.PrinterName,
|
||||||
sseBC: sseBC,
|
|
||||||
pool: pool,
|
|
||||||
clients: make(map[int]*clients.MQTTClient),
|
|
||||||
stop: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins the connection manager loop.
|
// Run connects to MQTT and blocks until ctx is cancelled.
|
||||||
func (s *MQTTSubscriber) Start() {
|
func (w *MQTTSubscriber) Run(ctx context.Context) error {
|
||||||
go s.loop()
|
slog.Info("mqtt subscriber: starting",
|
||||||
}
|
"printer_id", w.printerID,
|
||||||
|
"printer_name", w.printerName,
|
||||||
|
)
|
||||||
|
|
||||||
// Stop signals the loop to exit and disconnects all clients.
|
if w.Client == nil {
|
||||||
func (s *MQTTSubscriber) Stop() {
|
return fmt.Errorf("mqtt subscriber: Client is nil")
|
||||||
close(s.stop)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MQTTSubscriber) loop() {
|
|
||||||
// Initial connect attempt.
|
|
||||||
s.connectAll()
|
|
||||||
|
|
||||||
ticker := time.NewTicker(s.cfg.ReconnectInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
s.connectAll()
|
|
||||||
case <-s.stop:
|
|
||||||
slog.Info("mqtt subscriber stopped")
|
|
||||||
for _, c := range s.clients {
|
|
||||||
c.Disconnect(1000)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := w.Client.Connect(); err != nil {
|
||||||
|
return fmt.Errorf("mqtt subscriber: connect failed: %w", err)
|
||||||
|
}
|
||||||
|
defer w.Client.Disconnect()
|
||||||
|
|
||||||
|
slog.Info("mqtt subscriber: connected", "printer_id", w.printerID)
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
slog.Info("mqtt subscriber: shutting down", "printer_id", w.printerID)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MQTTSubscriber) connectAll() {
|
// HandleBambuReport is the MQTT callback for Bambu telemetry messages.
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
func (w *MQTTSubscriber) HandleBambuReport(report clients.BambuPrintReport) error {
|
||||||
|
w.mu.Lock()
|
||||||
|
prev := w.state
|
||||||
|
current := bambuJobState{
|
||||||
|
gcodeFile: report.Print.GcodeFile,
|
||||||
|
gcodeState: report.Print.GcodeState,
|
||||||
|
percent: report.Print.McPercent,
|
||||||
|
}
|
||||||
|
w.state = current
|
||||||
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
if prev.gcodeState == current.gcodeState && prev.gcodeFile == current.gcodeFile {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("mqtt subscriber: state change",
|
||||||
|
"printer_id", w.printerID,
|
||||||
|
"file", current.gcodeFile,
|
||||||
|
"state", current.gcodeState,
|
||||||
|
"percent", current.percent,
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
printers, err := s.printerRepo.GetAll(ctx)
|
switch current.gcodeState {
|
||||||
|
case "RUNNING":
|
||||||
|
return w.handleState(ctx, current, "printing")
|
||||||
|
case "FINISH":
|
||||||
|
return w.handleState(ctx, current, "completed")
|
||||||
|
case "FAILED":
|
||||||
|
return w.handleState(ctx, current, "failed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *MQTTSubscriber) handleState(ctx context.Context, s bambuJobState, status string) error {
|
||||||
|
jobID, err := w.ensurePrintJob(ctx, s.gcodeFile, status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("mqtt subscriber: failed to list printers", "error", err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, printer := range printers {
|
if status == "completed" || status == "failed" {
|
||||||
if !printer.IsActive || printer.MQTTBrokerHost == nil || *printer.MQTTBrokerHost == "" {
|
_, _ = w.pool.Exec(ctx, `
|
||||||
// Disconnect if previously connected and now inactive
|
UPDATE print_jobs SET
|
||||||
if existing, ok := s.clients[printer.ID]; ok {
|
job_status_id = (SELECT id FROM job_statuses WHERE name = $2),
|
||||||
existing.Disconnect(1000)
|
completed_at = CASE WHEN $2 = 'completed' THEN NOW() ELSE completed_at END,
|
||||||
delete(s.clients, printer.ID)
|
updated_at = NOW()
|
||||||
}
|
WHERE id = $1
|
||||||
continue
|
`, jobID, status)
|
||||||
}
|
} else {
|
||||||
|
_, _ = w.pool.Exec(ctx, `
|
||||||
if _, ok := s.clients[printer.ID]; ok {
|
UPDATE print_jobs SET
|
||||||
// Already connected — skip
|
job_status_id = (SELECT id FROM job_statuses WHERE name = $2),
|
||||||
continue
|
started_at = COALESCE(started_at, NOW()),
|
||||||
}
|
updated_at = NOW()
|
||||||
|
WHERE id = $1
|
||||||
topicPrefix := ""
|
`, jobID, status)
|
||||||
if printer.MQTTTopicPrefix != nil {
|
|
||||||
topicPrefix = *printer.MQTTTopicPrefix
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := clients.MQTTConfig{
|
|
||||||
BrokerHost: *printer.MQTTBrokerHost,
|
|
||||||
TopicPrefix: topicPrefix,
|
|
||||||
TLSEnabled: printer.MQTTTLSEnabled,
|
|
||||||
ClientID: fmt.Sprintf("extrudex-printer-%d", printer.ID),
|
|
||||||
}
|
|
||||||
|
|
||||||
c, err := clients.NewMQTTClient(cfg)
|
|
||||||
if err != nil {
|
|
||||||
slog.Warn("mqtt subscriber: connect failed",
|
|
||||||
"printer", printer.Name,
|
|
||||||
"broker", cfg.BrokerHost,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
s.clients[printer.ID] = c
|
|
||||||
|
|
||||||
// Subscribe to telemetry topics
|
|
||||||
topics := clients.DefaultBambuTopics(topicPrefix)
|
|
||||||
for _, topic := range topics {
|
|
||||||
if err := c.Subscribe(topic, 0, s.makeHandler(printer)); err != nil {
|
|
||||||
slog.Warn("mqtt subscriber: subscribe failed",
|
|
||||||
"printer", printer.Name,
|
|
||||||
"topic", topic,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Info("mqtt subscriber: connected",
|
|
||||||
"printer", printer.Name,
|
|
||||||
"broker", cfg.BrokerHost,
|
|
||||||
"topics", topics,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
slog.Info("mqtt subscriber: job updated",
|
||||||
|
"printer_id", w.printerID, "job_id", jobID, "status", status)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MQTTSubscriber) makeHandler(printer models.Printer) func([]byte) {
|
func (w *MQTTSubscriber) ensurePrintJob(ctx context.Context, filename, status string) (int, error) {
|
||||||
return func(payload []byte) {
|
var jobID int
|
||||||
slog.Debug("mqtt subscriber: message received",
|
err := w.pool.QueryRow(ctx, `
|
||||||
"printer", printer.Name,
|
SELECT id FROM print_jobs
|
||||||
"size", len(payload),
|
WHERE printer_id = $1 AND file_name = $2 AND deleted_at IS NULL
|
||||||
)
|
ORDER BY created_at DESC LIMIT 1
|
||||||
|
`, w.printerID, filename).Scan(&jobID)
|
||||||
|
|
||||||
// Attempt Bambu Lab parse
|
if err == nil {
|
||||||
telemetry, err := clients.ParseBambuTelemetry(payload)
|
return jobID, nil
|
||||||
if err != nil {
|
|
||||||
slog.Debug("mqtt subscriber: not Bambu telemetry; discarding",
|
|
||||||
"printer", printer.Name, "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine status from telemetry
|
|
||||||
status := "idle"
|
|
||||||
if telemetry.Print.Stage > 0 {
|
|
||||||
status = "printing"
|
|
||||||
}
|
|
||||||
s.broadcastStatus(printer.ID, printer.Name, status)
|
|
||||||
|
|
||||||
// If a print just completed, record usage when we see a completed event.
|
|
||||||
// Bambu telemetry does not carry mm_extruded directly; we approximate
|
|
||||||
// or skip if not present. Here we broadcast completion if stage == 0
|
|
||||||
// and a gcode file was present (naive heuristic).
|
|
||||||
if telemetry.Print.GcodeFile != "" && telemetry.Print.Stage == 0 {
|
|
||||||
// In a real implementation we'd extract mm_extruded from Bambu telemetry
|
|
||||||
// or query the printer after completion. For now broadcast completion.
|
|
||||||
s.broadcastJobCompleted(printer.ID, telemetry.Print.GcodeFile, 0)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MQTTSubscriber) broadcastStatus(printerID int, name, status string) {
|
var statusID int
|
||||||
if s.sseBC == nil {
|
err = w.pool.QueryRow(ctx, `SELECT id FROM job_statuses WHERE name = $1`, status).Scan(&statusID)
|
||||||
return
|
|
||||||
}
|
|
||||||
ev, err := sse.NewEvent(sse.EventPrinterStatus, sse.PrinterStatusPayload{
|
|
||||||
PrinterID: printerID,
|
|
||||||
PrinterName: name,
|
|
||||||
Status: status,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return 0, fmt.Errorf("mqtt subscriber: unknown status '%s': %w", status, err)
|
||||||
}
|
}
|
||||||
s.sseBC.Publish(ev)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MQTTSubscriber) broadcastJobCompleted(printerID int, jobName string, mmExtruded float64) {
|
err = w.pool.QueryRow(ctx, `
|
||||||
if s.sseBC == nil {
|
INSERT INTO print_jobs (printer_id, job_name, file_name, job_status_id, started_at)
|
||||||
return
|
VALUES ($1, $2, $2, $3, NOW())
|
||||||
}
|
RETURNING id
|
||||||
grams := calculateGrams(mmExtruded, 1.75, 1.24)
|
`, w.printerID, filename, statusID).Scan(&jobID)
|
||||||
gramsInt := int(grams)
|
|
||||||
ev, err := sse.NewEvent(sse.EventJobCompleted, sse.JobCompletedPayload{
|
|
||||||
JobName: jobName,
|
|
||||||
PrinterID: printerID,
|
|
||||||
TotalGramsUsed: &gramsInt,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return 0, fmt.Errorf("mqtt subscriber: create print_job failed: %w", err)
|
||||||
}
|
}
|
||||||
s.sseBC.Publish(ev)
|
|
||||||
|
slog.Info("mqtt subscriber: created print job",
|
||||||
|
"job_id", jobID, "file", filename, "status", status)
|
||||||
|
return jobID, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user