Files
Control-Center/go-backend/internal/gateway/client.go
Dex 4569fef11d
Some checks failed
Dev Build / deploy-dev (pull_request) Blocked by required conditions
Dev Build / build-test (pull_request) Waiting to run
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
Build (Dev) / build-frontend (pull_request) Failing after 1s
openclaw/grimm-review All 11 findings resolved. Approved.
CUB-203: fix Grimm review blocking issues (PR #41)
🔴 readLoop race: replace WriteControl close with ctx-done goroutine that closes conn
🔴 duplicate event handlers: clear handlers map before re-registering on reconnect
🔴 sync.go CurrentTask abuse: add DisplayName field to UpdateAgentRequest, use it
🔴 sync.go newRole dead code: add Role field to UpdateAgentRequest, use it
🔴 events.go handlePresence DB/SSE inconsistency: pass LastActivityAt in update, don't mutate after DB
🔴 events.go handleAgentConfig DB/SSE inconsistency: use DisplayName/Role fields in update
🟠 Send() nil-conn panic: check conn != nil before WriteJSON
🟠 readLoop prompt ctx cancellation: fixed by item #1
🟠 backoff never resets: reset to initialBackoff after successful connectAndRun
🟠 MarkWSReady double-close race: use sync.Once in Client
Extra json:"-" dead fields: removed from sessionChangedPayload, presencePayload, agentConfigPayload
UpdateAgentRequest: added DisplayName, Role, LastActivityAt fields
2026-05-20 11:47:11 +00:00

247 lines
7.5 KiB
Go

// Package gateway provides an OpenClaw gateway integration client that
// polls agent states, persists them via the repository layer, and broadcasts
// changes through the SSE broker for real-time frontend updates.
package gateway
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
)
// Client polls the OpenClaw gateway for agent status and keeps the database
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect after initial backoff retries.
type Client struct {
url string
pollInterval time.Duration
httpClient *http.Client
agents repository.AgentRepo
broker *handler.Broker
wsClient *WSClient // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race
}
// Config holds gateway client configuration, typically loaded from environment.
type Config struct {
URL string
PollInterval time.Duration
}
// DefaultConfig returns sensible defaults for local development.
func DefaultConfig() Config {
return Config{
URL: "http://localhost:18789/api/agents",
PollInterval: 5 * time.Second,
}
}
// NewClient returns a gateway client wired to the given repository and broker.
func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker) *Client {
return &Client{
url: cfg.URL,
pollInterval: cfg.PollInterval,
httpClient: &http.Client{Timeout: 10 * time.Second},
agents: agents,
broker: broker,
wsReady: make(chan struct{}),
}
}
// SetWSClient wires the WebSocket client so the REST poller knows to defer
// to it. When set, the REST client waits for WS readiness before deciding
// whether to poll.
func (c *Client) SetWSClient(ws *WSClient) {
c.wsClient = ws
}
// MarkWSReady signals that the WS connection is live and the REST poller
// should stand down. Called by WSClient after a successful handshake.
func (c *Client) MarkWSReady() {
c.wsReadyOnce.Do(func() {
close(c.wsReady)
})
}
// Start begins the gateway client loop. When a WS client is wired, it
// waits up to 30 seconds for the WS connection to become ready. If WS
// connects, the REST poller stands down and only logs periodically. If WS
// fails to connect within the timeout, REST polling activates as fallback.
func (c *Client) Start(ctx context.Context) {
if c.wsClient != nil {
slog.Info("gateway client waiting for WS connection", "timeout", "30s")
select {
case <-c.wsReady:
slog.Info("gateway client using WS — REST poller standing down")
// WS is live; keep this goroutine alive but idle. If WS
// disconnects later, we could re-enter polling, but for now
// the WS client handles its own reconnection.
<-ctx.Done()
slog.Info("gateway client stopped (WS mode)")
return
case <-time.After(30 * time.Second):
slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling",
"url", c.url,
"pollInterval", c.pollInterval.String())
case <-ctx.Done():
slog.Info("gateway client stopped while waiting for WS")
return
}
} else {
slog.Info("gateway client using REST polling (no WS client configured)",
"url", c.url,
"pollInterval", c.pollInterval.String())
}
// REST fallback polling
ticker := time.NewTicker(c.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("gateway client stopped (REST fallback)")
return
case <-ticker.C:
c.poll(ctx)
}
}
}
// poll fetches agent states from the gateway and syncs to the database.
func (c *Client) poll(ctx context.Context) {
resp, err := c.httpClient.Get(c.url)
if err != nil {
slog.Warn("gateway poll failed", "error", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
slog.Warn("gateway returned non-200", "status", resp.StatusCode)
return
}
var agents []models.AgentCardData
if err := json.NewDecoder(resp.Body).Decode(&agents); err != nil {
slog.Warn("gateway response parse failed", "error", err)
return
}
for _, ga := range agents {
// Check if agent already exists; if so, update; otherwise create.
existing, err := c.agents.Get(ctx, ga.ID)
if err != nil {
// Not found — create it
if err := c.agents.Create(ctx, ga); err != nil {
slog.Warn("gateway agent create failed", "id", ga.ID, "error", err)
continue
}
slog.Info("gateway agent created", "id", ga.ID, "status", ga.Status)
c.broker.Broadcast("agent.status", ga)
continue
}
// If status changed, update and broadcast
if existing.Status != ga.Status {
updated, err := c.agents.Update(ctx, ga.ID, models.UpdateAgentRequest{
Status: &ga.Status,
})
if err != nil {
slog.Warn("gateway agent update failed", "id", ga.ID, "error", err)
continue
}
c.broker.Broadcast("agent.status", updated)
slog.Debug("agent status changed",
"id", ga.ID,
"from", existing.Status,
"to", ga.Status)
}
}
}
// SeedDemoAgents inserts the five known demo agents if the agents table is
// empty. Call this once on application startup after migrations have run.
func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
count, err := agents.Count(ctx)
if err != nil {
return fmt.Errorf("count agents for seeding: %w", err)
}
if count > 0 {
return nil // already seeded
}
slog.Info("seeding demo agents")
demoAgents := []models.AgentCardData{
{
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Orchestrating tasks"),
SessionKey: "otto-session",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
SessionKey: "rex-session",
Channel: "discord",
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
},
{
ID: "dex",
DisplayName: "Dex",
Role: "Backend Dev",
Status: models.AgentStatusThinking,
CurrentTask: strPtr("Designing API contracts"),
SessionKey: "dex-session",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "hex",
DisplayName: "Hex",
Role: "Database Specialist",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Reviewing schema migrations"),
SessionKey: "hex-session",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "pip",
DisplayName: "Pip",
Role: "Edge Device Dev",
Status: models.AgentStatusIdle,
SessionKey: "pip-session",
Channel: "discord",
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
},
}
for _, a := range demoAgents {
if err := agents.Create(ctx, a); err != nil {
return fmt.Errorf("seed agent %s: %w", a.ID, err)
}
}
slog.Info("demo agents seeded", "count", len(demoAgents))
return nil
}
func strPtr(s string) *string { return &s }