// Package gateway provides an OpenClaw gateway integration client that // polls agent states, persists them via the repository layer, and broadcasts // changes through the SSE broker for real-time frontend updates. package gateway import ( "context" "encoding/json" "fmt" "log/slog" "net/http" "sync" "time" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository" ) // Client polls the OpenClaw gateway for agent status and keeps the database // and SSE broker in sync. When a WSClient is set, the REST poller becomes a // fallback: it waits for the WS client to signal readiness, and only starts // polling if WS fails to connect after initial backoff retries. type Client struct { url string pollInterval time.Duration httpClient *http.Client agents repository.AgentRepo broker *handler.Broker wsClient *WSClient // optional WS client; when set, REST is fallback only wsReady chan struct{} // closed once WS connection is established wsReadyOnce sync.Once // protects wsReady close from double-close race } // Config holds gateway client configuration, typically loaded from environment. type Config struct { URL string PollInterval time.Duration } // DefaultConfig returns sensible defaults for local development. func DefaultConfig() Config { return Config{ URL: "http://localhost:18789/api/agents", PollInterval: 5 * time.Second, } } // NewClient returns a gateway client wired to the given repository and broker. func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker) *Client { return &Client{ url: cfg.URL, pollInterval: cfg.PollInterval, httpClient: &http.Client{Timeout: 10 * time.Second}, agents: agents, broker: broker, wsReady: make(chan struct{}), } } // SetWSClient wires the WebSocket client so the REST poller knows to defer // to it. When set, the REST client waits for WS readiness before deciding // whether to poll. func (c *Client) SetWSClient(ws *WSClient) { c.wsClient = ws } // MarkWSReady signals that the WS connection is live and the REST poller // should stand down. Called by WSClient after a successful handshake. func (c *Client) MarkWSReady() { c.wsReadyOnce.Do(func() { close(c.wsReady) }) } // Start begins the gateway client loop. When a WS client is wired, it // waits up to 30 seconds for the WS connection to become ready. If WS // connects, the REST poller stands down and only logs periodically. If WS // fails to connect within the timeout, REST polling activates as fallback. func (c *Client) Start(ctx context.Context) { if c.wsClient != nil { slog.Info("gateway client waiting for WS connection", "timeout", "30s") select { case <-c.wsReady: slog.Info("gateway client using WS — REST poller standing down") // WS is live; keep this goroutine alive but idle. If WS // disconnects later, we could re-enter polling, but for now // the WS client handles its own reconnection. <-ctx.Done() slog.Info("gateway client stopped (WS mode)") return case <-time.After(30 * time.Second): slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling", "url", c.url, "pollInterval", c.pollInterval.String()) case <-ctx.Done(): slog.Info("gateway client stopped while waiting for WS") return } } else { slog.Info("gateway client using REST polling (no WS client configured)", "url", c.url, "pollInterval", c.pollInterval.String()) } // REST fallback polling ticker := time.NewTicker(c.pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): slog.Info("gateway client stopped (REST fallback)") return case <-ticker.C: c.poll(ctx) } } } // poll fetches agent states from the gateway and syncs to the database. func (c *Client) poll(ctx context.Context) { resp, err := c.httpClient.Get(c.url) if err != nil { slog.Warn("gateway poll failed", "error", err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { slog.Warn("gateway returned non-200", "status", resp.StatusCode) return } var agents []models.AgentCardData if err := json.NewDecoder(resp.Body).Decode(&agents); err != nil { slog.Warn("gateway response parse failed", "error", err) return } for _, ga := range agents { // Check if agent already exists; if so, update; otherwise create. existing, err := c.agents.Get(ctx, ga.ID) if err != nil { // Not found — create it if err := c.agents.Create(ctx, ga); err != nil { slog.Warn("gateway agent create failed", "id", ga.ID, "error", err) continue } slog.Info("gateway agent created", "id", ga.ID, "status", ga.Status) c.broker.Broadcast("agent.status", ga) continue } // If status changed, update and broadcast if existing.Status != ga.Status { updated, err := c.agents.Update(ctx, ga.ID, models.UpdateAgentRequest{ Status: &ga.Status, }) if err != nil { slog.Warn("gateway agent update failed", "id", ga.ID, "error", err) continue } c.broker.Broadcast("agent.status", updated) slog.Debug("agent status changed", "id", ga.ID, "from", existing.Status, "to", ga.Status) } } } // SeedDemoAgents inserts the five known demo agents if the agents table is // empty. Call this once on application startup after migrations have run. func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error { count, err := agents.Count(ctx) if err != nil { return fmt.Errorf("count agents for seeding: %w", err) } if count > 0 { return nil // already seeded } slog.Info("seeding demo agents") demoAgents := []models.AgentCardData{ { ID: "otto", DisplayName: "Otto", Role: "Orchestrator", Status: models.AgentStatusActive, CurrentTask: strPtr("Orchestrating tasks"), SessionKey: "otto-session", Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { ID: "rex", DisplayName: "Rex", Role: "Frontend Dev", Status: models.AgentStatusIdle, SessionKey: "rex-session", Channel: "discord", LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339), }, { ID: "dex", DisplayName: "Dex", Role: "Backend Dev", Status: models.AgentStatusThinking, CurrentTask: strPtr("Designing API contracts"), SessionKey: "dex-session", Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { ID: "hex", DisplayName: "Hex", Role: "Database Specialist", Status: models.AgentStatusActive, CurrentTask: strPtr("Reviewing schema migrations"), SessionKey: "hex-session", Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { ID: "pip", DisplayName: "Pip", Role: "Edge Device Dev", Status: models.AgentStatusIdle, SessionKey: "pip-session", Channel: "discord", LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339), }, } for _, a := range demoAgents { if err := agents.Create(ctx, a); err != nil { return fmt.Errorf("seed agent %s: %w", a.ID, err) } } slog.Info("demo agents seeded", "count", len(demoAgents)) return nil } func strPtr(s string) *string { return &s }