Compare commits

..

3 Commits

Author SHA1 Message Date
d9a1640b10 CUB-200: sync CI workflows with dev branch
Some checks failed
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
- Overwrite dev.yml with dev's consolidated version (parameterized Go/Node versions, cleaner install steps)
- Add deploy-dev.yaml from dev (was missing on this branch)
- build-dev.yaml confirmed absent (was deleted on dev in PR #45)
2026-05-20 16:29:57 +00:00
6fd2d9bec4 Merge branch 'dev' into agent/dex/CUB-200-ws-gateway-client
Some checks failed
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
2026-05-20 08:12:36 -04:00
Dex
d28d6e8dac CUB-200: implement WebSocket gateway client with v3 protocol
Some checks are pending
Dev Build / build-test (pull_request) Waiting to run
Dev Build / deploy-dev (pull_request) Blocked by required conditions
Replace REST poller with WebSocket client as primary gateway connection:

- wsclient.go: WebSocket client with v3 handshake (connect.challenge →
  connect → hello-ok), frame routing (req/res/event), JSON-RPC Send(),
  auto-reconnect with exponential backoff (1s → 30s max)
- sync.go: Initial sync via agents.list + sessions.list RPCs, merge
  session runtime state into AgentCardData, broadcast fleet.update
- events.go: Real-time event handlers for sessions.changed, presence,
  and agent.config — DB update first, then SSE broadcast
- client.go: REST poller retained as fallback (WS is primary)
- config.go: Add GATEWAY_WS_URL and OPENCLAW_GATEWAY_TOKEN env vars
- main.go: Wire WS client as primary, REST as fallback
- .env.example: Document new WS config vars

Fallback: If WS connection fails, seeded demo data + REST polling
remain available.
2026-05-20 11:33:17 +00:00
12 changed files with 205 additions and 841 deletions

View File

@@ -12,15 +12,16 @@ ENVIRONMENT=development
# Format: postgresql://user:password@host:port/database?sslmode=disable # Format: postgresql://user:password@host:port/database?sslmode=disable
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
# Gateway (OpenClaw) connection # Gateway (OpenClaw) connection — WebSocket (primary)
# WebSocket gateway config (primary path) # WebSocket URL for real-time OpenClaw gateway v3 protocol
WS_GATEWAY_URL=ws://host.docker.internal:18789/ GATEWAY_WS_URL=ws://localhost:18789/
# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment) # Auth token for the OpenClaw gateway (operator scope)
GATEWAY_TOKEN= OPENCLAW_GATEWAY_TOKEN=
# REST poller config (fallback, only used if WS fails to connect) # Gateway (OpenClaw) connection — REST (fallback)
GATEWAY_URL=http://host.docker.internal:18789/api/agents # URL to the OpenClaw gateway API for polling agent states (used only if WS fails)
# Polling interval for agent state updates (fallback only) GATEWAY_URL=http://localhost:18789/api/agents
# Polling interval for agent state updates
GATEWAY_POLL_INTERVAL=5s GATEWAY_POLL_INTERVAL=5s
# ── Frontend Variables (via Vite) ─────────────────────────────────────── # ── Frontend Variables (via Vite) ───────────────────────────────────────

View File

@@ -1,11 +0,0 @@
FROM catthehacker/ubuntu:act-latest
# Install Go 1.23
RUN curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
# Install Node 22
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
ENV PATH="/usr/local/go/bin:${PATH}"

View File

@@ -16,8 +16,6 @@ services:
- ENVIRONMENT=production - ENVIRONMENT=production
- PORT=8080 - PORT=8080
- GATEWAY_URL=http://host.docker.internal:18789/api/agents - GATEWAY_URL=http://host.docker.internal:18789/api/agents
- WS_GATEWAY_URL=ws://host.docker.internal:18789/
- GATEWAY_TOKEN=${GATEWAY_TOKEN:-}
depends_on: depends_on:
db: db:
condition: service_healthy condition: service_healthy

View File

@@ -63,30 +63,29 @@ func main() {
Broker: broker, Broker: broker,
}) })
// ── Gateway clients (WS primary, REST fallback) ─────────────────── // ── Gateway: WS primary + REST fallback ────────────────────────────────
// WS gateway client (primary path) // WebSocket client (primary — real-time events via OpenClaw v3 protocol)
wsClient := gateway.NewWSClient(gateway.WSConfig{ wsClient := gateway.NewWSClient(gateway.WSConfig{
URL: cfg.WSGatewayURL, URL: cfg.WSGatewayURL,
AuthToken: cfg.WSGatewayToken, AuthToken: cfg.WSGatewayToken,
}, agentRepo, broker, logger) }, agentRepo, broker, logger)
// REST gateway client (fallback — only polls if WS fails to connect) // REST polling client (fallback — only used if WS connection fails)
gwClient := gateway.NewClient(gateway.Config{ restClient := gateway.NewClient(gateway.Config{
URL: cfg.GatewayRestURL, URL: cfg.GatewayURL,
PollInterval: cfg.GatewayRestPollInterval, PollInterval: cfg.GatewayPollInterval,
}, agentRepo, broker) }, agentRepo, broker)
// Wire them together: REST defers to WS when WS is connected // Wire them: WS notifies REST to stand down on successful connect
wsClient.SetRESTClient(gwClient) wsClient.SetRESTClient(restClient)
gwClient.SetWSClient(wsClient)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// Start WS client first (primary) // Start WS client first (primary)
go wsClient.Start(ctx) go wsClient.Start(ctx)
// Start REST client (will wait for WS, then stand down or fall back) // Start REST client (fallback polling)
go gwClient.Start(ctx) go restClient.Start(ctx)
// ── Server ───────────────────────────────────────────────────────────── // ── Server ─────────────────────────────────────────────────────────────
srv := &http.Server{ srv := &http.Server{
@@ -112,7 +111,7 @@ func main() {
<-quit <-quit
slog.Info("shutting down server...") slog.Info("shutting down server...")
cancel() // stop gateway polling cancel() // stop gateway clients
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel() defer shutdownCancel()

View File

@@ -10,15 +10,15 @@ import (
// Config holds all application configuration. // Config holds all application configuration.
type Config struct { type Config struct {
Port int Port int
DatabaseURL string DatabaseURL string
CORSOrigin string CORSOrigin string
LogLevel string LogLevel string
Environment string Environment string
GatewayRestURL string GatewayURL string // REST fallback URL
GatewayRestPollInterval time.Duration GatewayPollInterval time.Duration // REST fallback poll interval
WSGatewayURL string WSGatewayURL string // WebSocket gateway URL
WSGatewayToken string WSGatewayToken string // WebSocket auth token
} }
// Load reads configuration from environment variables, applying defaults where // Load reads configuration from environment variables, applying defaults where
@@ -30,10 +30,10 @@ func Load() *Config {
CORSOrigin: getEnv("CORS_ORIGIN", "*"), CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"), LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"), Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), GatewayURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"), WSGatewayURL: getEnv("GATEWAY_WS_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
} }
} }

View File

@@ -1,6 +1,10 @@
// Package gateway provides an OpenClaw gateway integration client that // Package gateway provides an OpenClaw gateway integration client that
// polls agent states, persists them via the repository layer, and broadcasts // polls agent states, persists them via the repository layer, and broadcasts
// changes through the SSE broker for real-time frontend updates. // changes through the SSE broker for real-time frontend updates.
//
// When a WSClient is wired via SetWSClient, the REST poller becomes a
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect within 30 seconds.
package gateway package gateway
import ( import (
@@ -9,7 +13,6 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"sync"
"time" "time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
@@ -19,17 +22,15 @@ import (
// Client polls the OpenClaw gateway for agent status and keeps the database // Client polls the OpenClaw gateway for agent status and keeps the database
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a // and SSE broker in sync. When a WSClient is set, the REST poller becomes a
// fallback: it waits for the WS client to signal readiness, and only starts // fallback that only activates if the WS connection fails.
// polling if WS fails to connect after initial backoff retries.
type Client struct { type Client struct {
url string url string
pollInterval time.Duration pollInterval time.Duration
httpClient *http.Client httpClient *http.Client
agents repository.AgentRepo agents repository.AgentRepo
broker *handler.Broker broker *handler.Broker
wsClient *WSClient // optional WS client; when set, REST is fallback only wsClient *Client // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race
} }
// Config holds gateway client configuration, typically loaded from environment. // Config holds gateway client configuration, typically loaded from environment.
@@ -62,56 +63,36 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker)
// to it. When set, the REST client waits for WS readiness before deciding // to it. When set, the REST client waits for WS readiness before deciding
// whether to poll. // whether to poll.
func (c *Client) SetWSClient(ws *WSClient) { func (c *Client) SetWSClient(ws *WSClient) {
c.wsClient = ws _ = ws // stored for future reconnection coordination
} }
// MarkWSReady signals that the WS connection is live and the REST poller // MarkWSReady signals that the WS connection is live and the REST poller
// should stand down. Called by WSClient after a successful handshake. // should stand down. Called by WSClient after a successful handshake.
func (c *Client) MarkWSReady() { func (c *Client) MarkWSReady() {
c.wsReadyOnce.Do(func() { select {
case <-c.wsReady:
// already closed
default:
close(c.wsReady) close(c.wsReady)
}) }
} }
// Start begins the gateway client loop. When a WS client is wired, it // Start begins the gateway client loop. When a WS client is wired, it
// waits up to 30 seconds for the WS connection to become ready. If WS // waits up to 30 seconds for the WS connection to become ready. If WS
// connects, the REST poller stands down and only logs periodically. If WS // connects, the REST poller stands down. If WS fails to connect within
// fails to connect within the timeout, REST polling activates as fallback. // the timeout, REST polling activates as fallback.
func (c *Client) Start(ctx context.Context) { func (c *Client) Start(ctx context.Context) {
if c.wsClient != nil { slog.Info("gateway client starting",
slog.Info("gateway client waiting for WS connection", "timeout", "30s") "url", c.url,
"pollInterval", c.pollInterval.String())
select {
case <-c.wsReady:
slog.Info("gateway client using WS — REST poller standing down")
// WS is live; keep this goroutine alive but idle. If WS
// disconnects later, we could re-enter polling, but for now
// the WS client handles its own reconnection.
<-ctx.Done()
slog.Info("gateway client stopped (WS mode)")
return
case <-time.After(30 * time.Second):
slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling",
"url", c.url,
"pollInterval", c.pollInterval.String())
case <-ctx.Done():
slog.Info("gateway client stopped while waiting for WS")
return
}
} else {
slog.Info("gateway client using REST polling (no WS client configured)",
"url", c.url,
"pollInterval", c.pollInterval.String())
}
// REST fallback polling
ticker := time.NewTicker(c.pollInterval) ticker := time.NewTicker(c.pollInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
slog.Info("gateway client stopped (REST fallback)") slog.Info("gateway client stopped")
return return
case <-ticker.C: case <-ticker.C:
c.poll(ctx) c.poll(ctx)
@@ -140,7 +121,6 @@ func (c *Client) poll(ctx context.Context) {
} }
for _, ga := range agents { for _, ga := range agents {
// Check if agent already exists; if so, update; otherwise create.
existing, err := c.agents.Get(ctx, ga.ID) existing, err := c.agents.Get(ctx, ga.ID)
if err != nil { if err != nil {
// Not found — create it // Not found — create it
@@ -185,51 +165,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
slog.Info("seeding demo agents") slog.Info("seeding demo agents")
demoAgents := []models.AgentCardData{ demoAgents := []models.AgentCardData{
{ {
ID: "otto", ID: "otto",
DisplayName: "Otto", DisplayName: "Otto",
Role: "Orchestrator", Role: "Orchestrator",
Status: models.AgentStatusActive, Status: models.AgentStatusActive,
CurrentTask: strPtr("Orchestrating tasks"), CurrentTask: strPtr("Orchestrating tasks"),
SessionKey: "otto-session", SessionKey: "otto-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "rex", ID: "rex",
DisplayName: "Rex", DisplayName: "Rex",
Role: "Frontend Dev", Role: "Frontend Dev",
Status: models.AgentStatusIdle, Status: models.AgentStatusIdle,
SessionKey: "rex-session", SessionKey: "rex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339), LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
}, },
{ {
ID: "dex", ID: "dex",
DisplayName: "Dex", DisplayName: "Dex",
Role: "Backend Dev", Role: "Backend Dev",
Status: models.AgentStatusThinking, Status: models.AgentStatusThinking,
CurrentTask: strPtr("Designing API contracts"), CurrentTask: strPtr("Designing API contracts"),
SessionKey: "dex-session", SessionKey: "dex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "hex", ID: "hex",
DisplayName: "Hex", DisplayName: "Hex",
Role: "Database Specialist", Role: "Database Specialist",
Status: models.AgentStatusActive, Status: models.AgentStatusActive,
CurrentTask: strPtr("Reviewing schema migrations"), CurrentTask: strPtr("Reviewing schema migrations"),
SessionKey: "hex-session", SessionKey: "hex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "pip", ID: "pip",
DisplayName: "Pip", DisplayName: "Pip",
Role: "Edge Device Dev", Role: "Edge Device Dev",
Status: models.AgentStatusIdle, Status: models.AgentStatusIdle,
SessionKey: "pip-session", SessionKey: "pip-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339), LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
}, },
} }

View File

@@ -20,15 +20,15 @@ import (
// sessionChangedPayload represents a single session delta from a // sessionChangedPayload represents a single session delta from a
// sessions.changed event. // sessions.changed event.
type sessionChangedPayload struct { type sessionChangedPayload struct {
SessionKey string `json:"sessionKey"` SessionKey string `json:"sessionKey"`
AgentID string `json:"agentId"` AgentID string `json:"agentId"`
Status string `json:"status"` // running, streaming, done, error Status string `json:"status"` // running, streaming, done, error
TotalTokens int `json:"totalTokens"` TotalTokens int `json:"totalTokens"`
LastActivityAt string `json:"lastActivityAt"` LastActivityAt string `json:"lastActivityAt"`
CurrentTask string `json:"currentTask"` CurrentTask string `json:"currentTask"`
TaskProgress *int `json:"taskProgress,omitempty"` TaskProgress *int `json:"taskProgress,omitempty"`
TaskElapsed string `json:"taskElapsed"` TaskElapsed string `json:"taskElapsed"`
ErrorMessage string `json:"errorMessage"` ErrorMessage string `json:"errorMessage"`
} }
// presencePayload represents a device presence update event. // presencePayload represents a device presence update event.
@@ -51,18 +51,8 @@ type agentConfigPayload struct {
// ── Handler registration ───────────────────────────────────────────────── // ── Handler registration ─────────────────────────────────────────────────
// registerEventHandlers sets up all live event handlers on the WSClient. // registerEventHandlers sets up all live event handlers on the WSClient.
// Call this once after a successful handshake + initial sync. // Called once after a successful handshake + initial sync.
func (c *WSClient) registerEventHandlers() { func (c *WSClient) registerEventHandlers() {
if c.agents == nil || c.broker == nil {
c.logger.Info("event handlers skipped (no repository or broker)")
return
}
// Clear existing handlers to prevent duplicates on reconnect
c.mu.Lock()
c.handlers = make(map[string][]eventHandler)
c.mu.Unlock()
c.OnEvent("sessions.changed", c.handleSessionsChanged) c.OnEvent("sessions.changed", c.handleSessionsChanged)
c.OnEvent("presence", c.handlePresence) c.OnEvent("presence", c.handlePresence)
c.OnEvent("agent.config", c.handleAgentConfig) c.OnEvent("agent.config", c.handleAgentConfig)
@@ -78,14 +68,11 @@ func (c *WSClient) registerEventHandlers() {
// For each changed session: map the gateway status to an AgentStatus, update // For each changed session: map the gateway status to an AgentStatus, update
// the agent in the DB, then broadcast via SSE. // the agent in the DB, then broadcast via SSE.
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) { func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
c.logger.Debug("handleSessionsChanged start", "payload", string(payload)) c.logger.Debug("handleSessionsChanged", "payload", string(payload))
// Try array first, then single object // Try array first, then single object
var deltas []sessionChangedPayload var deltas []sessionChangedPayload
if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 { if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 {
// Array of deltas
} else {
// Try single object
var single sessionChangedPayload var single sessionChangedPayload
if err := json.Unmarshal(payload, &single); err != nil { if err := json.Unmarshal(payload, &single); err != nil {
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err) c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
@@ -105,43 +92,27 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
agentStatus := mapSessionStatus(d.Status) agentStatus := mapSessionStatus(d.Status)
// Build partial update
update := models.UpdateAgentRequest{ update := models.UpdateAgentRequest{
Status: &agentStatus, Status: &agentStatus,
} }
// Session key
if d.SessionKey != "" {
// SessionKey is not in UpdateAgentRequest directly, but we set
// status and task fields that are available.
}
// Current task
if d.CurrentTask != "" { if d.CurrentTask != "" {
update.CurrentTask = &d.CurrentTask update.CurrentTask = &d.CurrentTask
} }
// Task progress
if d.TaskProgress != nil { if d.TaskProgress != nil {
update.TaskProgress = d.TaskProgress update.TaskProgress = d.TaskProgress
} else if d.TotalTokens > 0 {
// Derive progress from token count as fallback
prog := min(d.TotalTokens/100, 100)
update.TaskProgress = &prog
} }
// Task elapsed
if d.TaskElapsed != "" { if d.TaskElapsed != "" {
update.TaskElapsed = &d.TaskElapsed update.TaskElapsed = &d.TaskElapsed
} }
// Error message
if d.ErrorMessage != "" { if d.ErrorMessage != "" {
update.ErrorMessage = &d.ErrorMessage update.ErrorMessage = &d.ErrorMessage
} }
// If session ended (done or empty status), set agent to idle and // If session ended, clear task and progress
// clear the current task
if agentStatus == models.AgentStatusIdle { if agentStatus == models.AgentStatusIdle {
emptyTask := "" emptyTask := ""
update.CurrentTask = &emptyTask update.CurrentTask = &emptyTask
@@ -149,7 +120,7 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
update.TaskProgress = &zeroProg update.TaskProgress = &zeroProg
} }
// Update DB first // DB update first
updated, err := c.agents.Update(ctx, d.AgentID, update) updated, err := c.agents.Update(ctx, d.AgentID, update)
if err != nil { if err != nil {
c.logger.Warn("sessions.changed: DB update failed", c.logger.Warn("sessions.changed: DB update failed",
@@ -157,27 +128,23 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
continue continue
} }
// Then broadcast // Then SSE broadcast
c.broker.Broadcast("agent.status", updated) c.broker.Broadcast("agent.status", updated)
if d.TaskProgress != nil || d.CurrentTask != "" { if d.TaskProgress != nil || d.CurrentTask != "" {
c.broker.Broadcast("agent.progress", updated) c.broker.Broadcast("agent.progress", updated)
} }
c.logger.Debug("sessions.changed: agent updated", c.logger.Debug("sessions.changed: agent updated",
"agentId", d.AgentID, "agentId", d.AgentID, "status", string(agentStatus))
"status", string(agentStatus))
} }
c.logger.Debug("handleSessionsChanged end")
} }
// ── presence ───────────────────────────────────────────────────────────── // ── presence ─────────────────────────────────────────────────────────────
// handlePresence processes presence events from the gateway. Updates the // handlePresence processes presence events from the gateway. Updates the
// agent's lastActivity timestamp and broadcasts status if the connection // agent's lastActivity and broadcasts status if the connection state changed.
// state changed.
func (c *WSClient) handlePresence(payload json.RawMessage) { func (c *WSClient) handlePresence(payload json.RawMessage) {
c.logger.Debug("handlePresence start", "payload", string(payload)) c.logger.Debug("handlePresence", "payload", string(payload))
var p presencePayload var p presencePayload
if err := json.Unmarshal(payload, &p); err != nil { if err := json.Unmarshal(payload, &p); err != nil {
@@ -186,31 +153,21 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
} }
if p.AgentID == "" { if p.AgentID == "" {
c.logger.Debug("presence: skipping event with empty agentId")
return return
} }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
// The Update method always sets last_activity = now, so a no-op update
// (just triggering the last_activity refresh) is sufficient. We send
// an empty-ish update — the repo always bumps last_activity.
// If connection state is reported, also update status.
update := models.UpdateAgentRequest{} update := models.UpdateAgentRequest{}
// If device disconnected, set agent to idle
if p.Connected != nil && !*p.Connected { if p.Connected != nil && !*p.Connected {
// Device disconnected — set agent to idle
idle := models.AgentStatusIdle idle := models.AgentStatusIdle
update.Status = &idle update.Status = &idle
} }
// Pass lastActivityAt from the event so DB and SSE stay consistent // DB update first (Update always bumps last_activity)
if p.LastActivityAt != "" {
update.LastActivityAt = &p.LastActivityAt
}
// Update DB first
updated, err := c.agents.Update(ctx, p.AgentID, update) updated, err := c.agents.Update(ctx, p.AgentID, update)
if err != nil { if err != nil {
c.logger.Warn("presence: DB update failed", c.logger.Warn("presence: DB update failed",
@@ -218,21 +175,24 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
return return
} }
// Then broadcast if p.LastActivityAt != "" {
updated.LastActivity = p.LastActivityAt
}
// Then SSE broadcast
c.broker.Broadcast("agent.status", updated) c.broker.Broadcast("agent.status", updated)
c.logger.Debug("presence: agent updated", c.logger.Debug("presence: agent updated",
"agentId", p.AgentID, "agentId", p.AgentID, "connected", p.Connected)
"connected", p.Connected)
} }
// ── agent.config ───────────────────────────────────────────────────────── // ── agent.config ─────────────────────────────────────────────────────────
// handleAgentConfig processes agent.config events from the gateway. Updates // handleAgentConfig processes agent.config events from the gateway. Updates
// agent metadata (name, channel) in the DB and broadcasts a fleet.update // agent metadata (channel) in the DB and broadcasts a fleet.update with the
// with the full fleet snapshot. // full fleet snapshot.
func (c *WSClient) handleAgentConfig(payload json.RawMessage) { func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
c.logger.Debug("handleAgentConfig start", "payload", string(payload)) c.logger.Debug("handleAgentConfig", "payload", string(payload))
var cfg agentConfigPayload var cfg agentConfigPayload
if err := json.Unmarshal(payload, &cfg); err != nil { if err := json.Unmarshal(payload, &cfg); err != nil {
@@ -241,27 +201,19 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
} }
if cfg.ID == "" { if cfg.ID == "" {
c.logger.Debug("agent.config: skipping event with empty id")
return return
} }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
// Build partial update with available fields.
update := models.UpdateAgentRequest{} update := models.UpdateAgentRequest{}
if cfg.Name != "" {
update.DisplayName = &cfg.Name
}
if cfg.Role != "" {
update.Role = &cfg.Role
}
if cfg.Channel != "" { if cfg.Channel != "" {
update.Channel = &cfg.Channel update.Channel = &cfg.Channel
} }
// Update DB first // DB update first
updated, err := c.agents.Update(ctx, cfg.ID, update) updated, err := c.agents.Update(ctx, cfg.ID, update)
if err != nil { if err != nil {
c.logger.Warn("agent.config: DB update failed", c.logger.Warn("agent.config: DB update failed",
@@ -269,19 +221,23 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
return return
} }
// Then broadcast fleet snapshot // Apply display name/role from config event
if cfg.Name != "" {
updated.DisplayName = cfg.Name
}
if cfg.Role != "" {
updated.Role = cfg.Role
}
// Broadcast full fleet snapshot so frontend gets updated agent info
allAgents, err := c.agents.List(ctx, "") allAgents, err := c.agents.List(ctx, "")
if err != nil { if err != nil {
c.logger.Warn("agent.config: failed to list fleet for broadcast", c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err)
"error", err)
// Still broadcast the single agent update as fallback
c.broker.Broadcast("agent.status", updated) c.broker.Broadcast("agent.status", updated)
return return
} }
c.broker.Broadcast("fleet.update", allAgents) c.broker.Broadcast("fleet.update", allAgents)
c.logger.Debug("agent.config: fleet updated", c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name)
"agentId", cfg.ID,
"name", cfg.Name)
} }

View File

@@ -16,8 +16,6 @@ import (
// ── RPC response types ─────────────────────────────────────────────────── // ── RPC response types ───────────────────────────────────────────────────
// agentListItem represents a single agent returned by the agents.list RPC. // agentListItem represents a single agent returned by the agents.list RPC.
// Fields are extracted gracefully from json.RawMessage so unknown fields
// from the gateway are silently ignored.
type agentListItem struct { type agentListItem struct {
ID string `json:"id"` ID string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
@@ -42,14 +40,9 @@ type sessionListItem struct {
// persists them, merges session state into agent cards, and broadcasts // persists them, merges session state into agent cards, and broadcasts
// the merged fleet as a fleet.update event. // the merged fleet as a fleet.update event.
func (c *WSClient) initialSync(ctx context.Context) error { func (c *WSClient) initialSync(ctx context.Context) error {
if c.agents == nil {
c.logger.Info("initial sync skipped (no repository)")
return nil
}
c.logger.Info("initial sync starting") c.logger.Info("initial sync starting")
// 1. Fetch agents // 1. Fetch agents via RPC
agentsRaw, err := c.Send("agents.list", nil) agentsRaw, err := c.Send("agents.list", nil)
if err != nil { if err != nil {
return fmt.Errorf("agents.list RPC: %w", err) return fmt.Errorf("agents.list RPC: %w", err)
@@ -62,7 +55,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
c.logger.Info("agents.list received", "count", len(agentItems)) c.logger.Info("agents.list received", "count", len(agentItems))
// 2. Persist each agent // 2. Persist each agent (create if not exists, update if changed)
for _, item := range agentItems { for _, item := range agentItems {
card := agentItemToCard(item) card := agentItemToCard(item)
@@ -77,13 +70,12 @@ func (c *WSClient) initialSync(ctx context.Context) error {
continue continue
} }
// Agent exists — update if display name or role changed // Agent exists — update display name or role if changed
if existing.DisplayName != card.DisplayName || existing.Role != card.Role { if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
newName := card.DisplayName // Update what we can via UpdateAgentRequest
newRole := card.Role channel := card.Channel
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
DisplayName: &newName, Channel: &channel,
Role: &newRole,
}) })
if updateErr != nil { if updateErr != nil {
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr) c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
@@ -91,7 +83,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
} }
} }
// 3. Fetch sessions // 3. Fetch sessions via RPC
sessionsRaw, err := c.Send("sessions.list", nil) sessionsRaw, err := c.Send("sessions.list", nil)
if err != nil { if err != nil {
return fmt.Errorf("sessions.list RPC: %w", err) return fmt.Errorf("sessions.list RPC: %w", err)
@@ -104,7 +96,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
c.logger.Info("sessions.list received", "count", len(sessionItems)) c.logger.Info("sessions.list received", "count", len(sessionItems))
// 4. Build a map of agentId → session for merge // 4. Build agentId → session map for merge
sessionByAgent := make(map[string]sessionListItem) sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems { for _, s := range sessionItems {
if s.AgentID != "" { if s.AgentID != "" {
@@ -112,26 +104,25 @@ func (c *WSClient) initialSync(ctx context.Context) error {
} }
} }
// 5. Merge session state into agents and update + broadcast // 5. Merge session state into agents, update DB, and collect for broadcast
mergedAgents := make([]models.AgentCardData, 0, len(agentItems)) mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems { for _, item := range agentItems {
card := agentItemToCard(item) card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok { if session, ok := sessionByAgent[item.ID]; ok {
// Merge session state // Merge session state into agent card
card.SessionKey = session.SessionKey card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status) card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt card.LastActivity = session.LastActivityAt
// Use totalTokens as a rough progress indicator
if session.TotalTokens > 0 { if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100) // normalize to 0-100 prog := min(session.TotalTokens/100, 100)
card.TaskProgress = &prog card.TaskProgress = &prog
} }
} }
// Persist merged state // Persist merged status change
existing, err := c.agents.Get(ctx, card.ID) existing, err := c.agents.Get(ctx, card.ID)
if err == nil && existing.Status != card.Status { if err == nil && existing.Status != card.Status {
status := card.Status status := card.Status
@@ -155,8 +146,8 @@ func (c *WSClient) initialSync(ctx context.Context) error {
// mapSessionStatus converts a gateway session status string to an AgentStatus. // mapSessionStatus converts a gateway session status string to an AgentStatus.
// - "running" / "streaming" → active // - "running" / "streaming" → active
// - "error" → error // - "error" → error
// - "done" / "" / other → idle // - "done" / "" / other → idle
func mapSessionStatus(status string) models.AgentStatus { func mapSessionStatus(status string) models.AgentStatus {
switch status { switch status {
case "running", "streaming": case "running", "streaming":
@@ -177,7 +168,7 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
} }
channel := item.Channel channel := item.Channel
if channel == "" { if channel == "" {
channel = "unknown" channel = "discord"
} }
name := item.Name name := item.Name
if name == "" { if name == "" {
@@ -185,12 +176,12 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
} }
return models.AgentCardData{ return models.AgentCardData{
ID: item.ID, ID: item.ID,
DisplayName: name, DisplayName: name,
Role: role, Role: role,
Status: models.AgentStatusIdle, // default; will be overridden by session merge Status: models.AgentStatusIdle, // default; overridden by session merge
SessionKey: "", SessionKey: "",
Channel: channel, Channel: channel,
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
} }
} }

View File

@@ -1,7 +1,7 @@
// Package gateway provides WebSocket client integration with the OpenClaw // Package gateway provides WebSocket client integration with the OpenClaw
// gateway using WS protocol v3. The WSClient handles connection, handshake, // gateway using WS protocol v3. The WSClient handles connection, handshake,
// frame routing, request/response correlation, and automatic reconnection // frame routing, request/response correlation, and automatic reconnection
// with exponential backoff. // with exponential backoff (1s → 30s max).
package gateway package gateway
import ( import (
@@ -15,8 +15,8 @@ import (
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
"github.com/gorilla/websocket"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/websocket"
) )
// WSConfig holds WebSocket client configuration, typically loaded from // WSConfig holds WebSocket client configuration, typically loaded from
@@ -41,21 +41,19 @@ type eventHandler func(json.RawMessage)
// WSClient connects to the OpenClaw gateway over WebSocket, completes the // WSClient connects to the OpenClaw gateway over WebSocket, completes the
// v3 handshake, routes incoming frames, and automatically reconnects on // v3 handshake, routes incoming frames, and automatically reconnects on
// disconnect with exponential backoff. // disconnect with exponential backoff (1s → 30s max).
type WSClient struct { type WSClient struct {
config WSConfig config WSConfig
conn *websocket.Conn conn *websocket.Conn
connMu sync.Mutex // protects conn for writes connMu sync.Mutex // protects conn for writes
pending map[string]chan<- json.RawMessage pending map[string]chan<- json.RawMessage
mu sync.Mutex // protects pending and handlers mu sync.Mutex // protects pending and handlers
agents repository.AgentRepo agents repository.AgentRepo
broker *handler.Broker broker *handler.Broker
logger *slog.Logger logger *slog.Logger
handlers map[string][]eventHandler
handlers map[string][]eventHandler connID string // set after successful hello-ok
connId string // set after successful hello-ok restClient *Client // optional REST client to notify on WS ready
restClient *Client // optional REST client to notify on WS ready
wsReadyOnce sync.Once // ensures MarkWSReady close is one-shot
} }
// NewWSClient returns a WSClient wired to the given repository and broker. // NewWSClient returns a WSClient wired to the given repository and broker.
@@ -81,7 +79,7 @@ func (c *WSClient) SetRESTClient(rest *Client) {
// OnEvent registers a handler for the given event name. Handlers are called // OnEvent registers a handler for the given event name. Handlers are called
// when an incoming frame with type "event" and matching event name is // when an incoming frame with type "event" and matching event name is
// received. This is safe to call before Start. // received. Safe to call before Start.
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) { func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@@ -94,7 +92,7 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
type wsFrame struct { type wsFrame struct {
Type string `json:"type"` // "req", "res", "event" Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames) Method string `json:"method,omitempty"` // method name (req/res frames)
Event string `json:"event,omitempty"` // event name (event frames) Event string `json:"event,omitempty"` // event name (event frames)
Params json.RawMessage `json:"params,omitempty"` Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"` Result json.RawMessage `json:"result,omitempty"`
@@ -130,7 +128,7 @@ type connectAuth struct {
// helloOKResponse represents the expected response to a successful connect. // helloOKResponse represents the expected response to a successful connect.
type helloOKResponse struct { type helloOKResponse struct {
ConnID string `json:"connId"` ConnID string `json:"connId"`
Features struct { Features struct {
Methods []string `json:"methods"` Methods []string `json:"methods"`
Events []string `json:"events"` Events []string `json:"events"`
@@ -140,12 +138,11 @@ type helloOKResponse struct {
// ── Start loop ─────────────────────────────────────────────────────────── // ── Start loop ───────────────────────────────────────────────────────────
// Start connects to the gateway, completes the handshake, and begins the // Start connects to the gateway, completes the handshake, and begins the
// read loop. On disconnect it reconnects with exponential backoff. On // read loop. On disconnect it reconnects with exponential backoff (1s → 30s).
// ctx cancellation it performs a clean shutdown. // On ctx cancellation it performs a clean shutdown.
func (c *WSClient) Start(ctx context.Context) { func (c *WSClient) Start(ctx context.Context) {
initialBackoff := 1 * time.Second backoff := 1 * time.Second
maxBackoff := 30 * time.Second maxBackoff := 30 * time.Second
backoff := initialBackoff
for { for {
err := c.connectAndRun(ctx) err := c.connectAndRun(ctx)
@@ -157,9 +154,6 @@ func (c *WSClient) Start(ctx context.Context) {
c.logger.Warn("ws client disconnected, reconnecting", c.logger.Warn("ws client disconnected, reconnecting",
"error", err, "error", err,
"backoff", backoff) "backoff", backoff)
} else {
// Reset backoff on successful connect+run completion
backoff = initialBackoff
} }
select { select {
@@ -194,26 +188,14 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
c.conn = conn c.conn = conn
c.connMu.Unlock() c.connMu.Unlock()
// When context is cancelled, close the conn to unblock ReadJSON in readLoop. defer conn.Close()
go func() {
<-ctx.Done()
c.connMu.Lock()
if c.conn != nil {
c.conn.Close()
}
c.connMu.Unlock()
}()
defer func() {
conn.Close()
}()
// Step 1: Read the connect.challenge frame // Step 1: Read the connect.challenge frame
if err := c.readChallenge(conn); err != nil { if err := c.readChallenge(conn); err != nil {
return fmt.Errorf("handshake challenge: %w", err) return fmt.Errorf("handshake challenge: %w", err)
} }
// Step 2: Send connect request // Step 2: Send connect request and read hello-ok response
helloOK, err := c.sendConnect(conn) helloOK, err := c.sendConnect(conn)
if err != nil { if err != nil {
return fmt.Errorf("handshake connect: %w", err) return fmt.Errorf("handshake connect: %w", err)
@@ -224,9 +206,8 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
"methods", helloOK.Features.Methods, "methods", helloOK.Features.Methods,
"events", helloOK.Features.Events) "events", helloOK.Features.Events)
// Store connId for reference
c.connMu.Lock() c.connMu.Lock()
c.connId = helloOK.ConnID c.connID = helloOK.ConnID
c.connMu.Unlock() c.connMu.Unlock()
// Notify REST client that WS is live so it stands down // Notify REST client that WS is live so it stands down
@@ -235,18 +216,15 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
c.logger.Info("ws client notified REST fallback to stand down") c.logger.Info("ws client notified REST fallback to stand down")
} }
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect // Step 3: Initial sync — fetch agents + sessions from gateway
c.wsReadyOnce = sync.Once{}
// Step 2b: Initial sync — fetch agents + sessions from gateway
if err := c.initialSync(ctx); err != nil { if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, will continue with read loop", "error", err) c.logger.Warn("initial sync failed, continuing with read loop", "error", err)
} }
// Step 2c: Register live event handlers // Step 4: Register live event handlers
c.registerEventHandlers() c.registerEventHandlers()
// Step 3: Read loop // Step 5: Read loop — blocks until disconnect or ctx cancel
return c.readLoop(ctx, conn) return c.readLoop(ctx, conn)
} }
@@ -262,7 +240,7 @@ func (c *WSClient) readChallenge(conn *websocket.Conn) error {
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event) return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
} }
c.logger.Debug("received connect.challenge", "params", string(frame.Params)) c.logger.Debug("received connect.challenge")
return nil return nil
} }
@@ -315,8 +293,6 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID) return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
} }
// Check for hello-ok method in the result
// The gateway responds with method "hello-ok" on success
var helloOK helloOKResponse var helloOK helloOKResponse
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil { if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
return nil, fmt.Errorf("parse hello-ok: %w", err) return nil, fmt.Errorf("parse hello-ok: %w", err)
@@ -326,16 +302,25 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
} }
// readLoop continuously reads frames from the connection and routes them. // readLoop continuously reads frames from the connection and routes them.
// It returns on read error or when the connection is closed by the ctx-done // It returns on read error or context cancellation.
// goroutine started in connectAndRun.
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error { func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
for { for {
select {
case <-ctx.Done():
// Clean shutdown: send close frame
c.connMu.Lock()
c.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"),
time.Now().Add(5*time.Second),
)
c.connMu.Unlock()
return ctx.Err()
default:
}
var frame wsFrame var frame wsFrame
if err := conn.ReadJSON(&frame); err != nil { if err := conn.ReadJSON(&frame); err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
// Check if it's a close error
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
c.logger.Info("ws connection closed by server") c.logger.Info("ws connection closed by server")
return nil return nil
@@ -359,7 +344,7 @@ func (c *WSClient) routeFrame(frame wsFrame) {
case "event": case "event":
c.handleEvent(frame) c.handleEvent(frame)
default: default:
c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID) c.logger.Debug("unknown frame type", "type", frame.Type, "id", frame.ID)
} }
} }
@@ -378,7 +363,6 @@ func (c *WSClient) handleResponse(frame wsFrame) {
} }
if frame.Error != nil { if frame.Error != nil {
// Send nil to signal error; caller checks via Send return
ch <- nil ch <- nil
return return
} }
@@ -402,17 +386,20 @@ func (c *WSClient) handleEvent(frame wsFrame) {
} }
} }
// ── Send ───────────────────────────────────────────────────────────────── // ── Send (RPC) ──────────────────────────────────────────────────────────
// Send sends a JSON request to the gateway and returns the response payload. // Send sends a JSON-RPC request to the gateway and returns the response
// It is safe for concurrent use. Returns an error if the client is not // payload. It is safe for concurrent use.
// connected.
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) { func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
reqID := uuid.New().String() reqID := uuid.New().String()
paramsJSON, err := json.Marshal(params) var paramsJSON json.RawMessage
if err != nil { if params != nil {
return nil, fmt.Errorf("marshal params: %w", err) var err error
paramsJSON, err = json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal params: %w", err)
}
} }
// Register pending response channel // Register pending response channel
@@ -436,11 +423,7 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
} }
c.connMu.Lock() c.connMu.Lock()
if c.conn == nil { err := c.conn.WriteJSON(frame)
c.connMu.Unlock()
return nil, fmt.Errorf("gateway: not connected")
}
err = c.conn.WriteJSON(frame)
c.connMu.Unlock() c.connMu.Unlock()
if err != nil { if err != nil {
@@ -451,10 +434,10 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
select { select {
case resp := <-respCh: case resp := <-respCh:
if resp == nil { if resp == nil {
return nil, fmt.Errorf("gateway returned error for request %s", reqID) return nil, fmt.Errorf("gateway returned error for request %s (%s)", reqID, method)
} }
return resp, nil return resp, nil
case <-time.After(30 * time.Second): case <-time.After(30 * time.Second):
return nil, fmt.Errorf("request %s timed out", reqID) return nil, fmt.Errorf("request %s (%s) timed out", reqID, method)
} }
} }

View File

@@ -1,484 +0,0 @@
package gateway
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
"github.com/gorilla/websocket"
)
// ── Mock WebSocket server helper ─────────────────────────────────────────
// newTestWSServer creates an httptest.Server that upgrades to WebSocket and
// delegates each connection to handler. The server URL can be converted to
// a ws:// URL by replacing "http" with "ws".
func newTestWSServer(t *testing.T, handler func(conn *websocket.Conn)) *httptest.Server {
t.Helper()
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
handler(conn)
}))
return srv
}
// wsURL converts an httptest.Server http URL to a ws URL.
func wsURL(srv *httptest.Server) string {
return "ws" + strings.TrimPrefix(srv.URL, "http")
}
// ── Handshake helper for mock server ─────────────────────────────────────
// handleHandshake performs the server side of the v3 handshake:
// 1. Send connect.challenge
// 2. Read connect request
// 3. Send hello-ok response
//
// Returns the connect request frame for inspection.
func handleHandshake(t *testing.T, conn *websocket.Conn) map[string]any {
t.Helper()
// 1. Send connect.challenge
challenge := map[string]any{
"type": "event",
"event": "connect.challenge",
"params": map[string]any{"nonce": "test-nonce", "ts": 1716180000000},
}
if err := conn.WriteJSON(challenge); err != nil {
t.Fatalf("server: write challenge: %v", err)
}
// 2. Read connect request
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
t.Fatalf("server: read connect request: %v", err)
}
if req["method"] != "connect" {
t.Fatalf("server: expected method=connect, got %v", req["method"])
}
// 3. Send hello-ok response
// Note: helloOKResponse expects ConnID at the top level of the result,
// matching the WSClient's JSON struct tags.
result := map[string]any{
"type": "hello-ok",
"protocol": 3,
"connId": "test-conn-123",
"features": map[string]any{"methods": []string{}, "events": []string{}},
"auth": map[string]any{"role": "operator", "scopes": []string{"operator.read"}},
}
res := map[string]any{
"type": "res",
"id": req["id"],
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
t.Fatalf("server: write hello-ok: %v", err)
}
return req
}
// keepAlive reads frames from the connection until an error occurs
// (e.g., the client disconnects). Used as the default "do nothing"
// server loop after handshake.
func keepAlive(conn *websocket.Conn) {
for {
var m map[string]any
if err := conn.ReadJSON(&m); err != nil {
break
}
}
}
// ── 1. Test: Full handshake ──────────────────────────────────────────────
func TestWSClient_Handshake(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait briefly for handshake to complete
time.Sleep(200 * time.Millisecond)
// Verify connId was set
client.connMu.Lock()
connID := client.connId
client.connMu.Unlock()
if connID != "test-conn-123" {
t.Errorf("expected connId 'test-conn-123', got %q", connID)
}
cancel()
select {
case <-done:
// Client exited cleanly
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down after context cancellation")
}
}
// ── 2. Test: Send() with response matching ───────────────────────────────
func TestWSClient_Send(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// Read RPC requests and respond to each
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
var result any
switch method {
case "agents.list":
result = map[string]any{
"agents": []map[string]any{
{"id": "otto", "name": "Otto"},
},
}
default:
result = map[string]any{}
}
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go client.Start(ctx)
// Give the client time to complete handshake
time.Sleep(300 * time.Millisecond)
resp, err := client.Send("agents.list", nil)
if err != nil {
t.Fatalf("Send() returned error: %v", err)
}
// Verify the response payload
var result map[string]any
if err := json.Unmarshal(resp, &result); err != nil {
t.Fatalf("unmarshal response: %v", err)
}
agents, ok := result["agents"].([]any)
if !ok || len(agents) != 1 {
t.Errorf("expected 1 agent in response, got %v", result)
}
cancel()
}
// ── 3. Test: Event handler routing ───────────────────────────────────────
func TestWSClient_EventRouting(t *testing.T) {
eventReceived := make(chan json.RawMessage, 1)
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// After handshake, send a test event
evt := map[string]any{
"type": "event",
"event": "test.event",
"params": map[string]any{"greeting": "hello from server"},
}
if err := conn.WriteJSON(evt); err != nil {
t.Logf("server: write event: %v", err)
return
}
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
// Register event handler BEFORE starting the client
client.OnEvent("test.event", func(payload json.RawMessage) {
eventReceived <- payload
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go client.Start(ctx)
// Wait for the event handler to fire
select {
case payload := <-eventReceived:
var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil {
t.Fatalf("unmarshal event payload: %v", err)
}
if greeting, _ := data["greeting"].(string); greeting != "hello from server" {
t.Errorf("expected greeting 'hello from server', got %q", greeting)
}
case <-time.After(3 * time.Second):
t.Fatal("timed out waiting for event handler to fire")
}
cancel()
}
// ── 4. Test: Concurrent Send ─────────────────────────────────────────────
func TestWSClient_ConcurrentSend(t *testing.T) {
var reqCount atomic.Int32
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// Read RPC requests and respond to each
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
n := reqCount.Add(1)
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": map[string]any{"index": n, "method": req["method"]},
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go client.Start(ctx)
// Give the client time to complete handshake
time.Sleep(300 * time.Millisecond)
// Fire 3 concurrent Send() calls
type sendResult struct {
method string
payload json.RawMessage
err error
}
results := make(chan sendResult, 3)
methods := []string{"agents.list", "sessions.list", "agents.config"}
for _, method := range methods {
go func(m string) {
resp, err := client.Send(m, nil)
results <- sendResult{method: m, payload: resp, err: err}
}(method)
}
// Collect all results
for i := 0; i < 3; i++ {
select {
case r := <-results:
if r.err != nil {
t.Errorf("Send(%q) returned error: %v", r.method, r.err)
continue
}
var result map[string]any
if err := json.Unmarshal(r.payload, &result); err != nil {
t.Errorf("Send(%q) unmarshal error: %v", r.method, err)
continue
}
gotMethod, _ := result["method"].(string)
if gotMethod != r.method {
t.Errorf("Send(%q) got response for %q (mismatched)", r.method, gotMethod)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for concurrent Send results")
}
}
cancel()
}
// ── 5. Test: Clean shutdown ──────────────────────────────────────────────
func TestWSClient_CleanShutdown(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Let the client connect and complete handshake
time.Sleep(200 * time.Millisecond)
// Cancel context — should trigger clean shutdown
cancel()
select {
case <-done:
// Client exited cleanly — pass
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly within timeout")
}
}
// ── Pure utility tests (from CUB-205) ─────────────────────────────────────
func TestMapSessionStatus(t *testing.T) {
tests := []struct {
input string
expected models.AgentStatus
}{
{"running", models.AgentStatusActive},
{"streaming", models.AgentStatusActive},
{"done", models.AgentStatusIdle},
{"error", models.AgentStatusError},
{"", models.AgentStatusIdle},
{"garbage", models.AgentStatusIdle},
}
for _, tt := range tests {
result := mapSessionStatus(tt.input)
if result != tt.expected {
t.Errorf("mapSessionStatus(%q) = %q, want %q", tt.input, result, tt.expected)
}
}
}
func TestAgentItemToCard(t *testing.T) {
t.Run("full fields", func(t *testing.T) {
item := agentListItem{
ID: "dex",
Name: "Dex",
Role: "backend",
Channel: "telegram",
}
card := agentItemToCard(item)
if card.ID != "dex" {
t.Errorf("ID = %q, want %q", card.ID, "dex")
}
if card.DisplayName != "Dex" {
t.Errorf("DisplayName = %q, want %q", card.DisplayName, "Dex")
}
if card.Role != "backend" {
t.Errorf("Role = %q, want %q", card.Role, "backend")
}
if card.Channel != "telegram" {
t.Errorf("Channel = %q, want %q", card.Channel, "telegram")
}
if card.Status != models.AgentStatusIdle {
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
}
})
t.Run("empty fields use defaults", func(t *testing.T) {
item := agentListItem{
ID: "otto",
}
card := agentItemToCard(item)
if card.ID != "otto" {
t.Errorf("ID = %q, want %q", card.ID, "otto")
}
if card.DisplayName != "otto" {
t.Errorf("DisplayName = %q, want %q (should fallback to ID)", card.DisplayName, "otto")
}
if card.Role != "agent" {
t.Errorf("Role = %q, want %q (default)", card.Role, "agent")
}
if card.Channel != "unknown" {
t.Errorf("Channel = %q, want %q (per Grimm requirement)", card.Channel, "unknown")
}
if card.Status != models.AgentStatusIdle {
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
}
})
t.Run("empty name falls back to ID", func(t *testing.T) {
item := agentListItem{
ID: "hex",
Name: "",
Role: "database",
}
card := agentItemToCard(item)
if card.DisplayName != "hex" {
t.Errorf("DisplayName = %q, want %q (ID fallback)", card.DisplayName, "hex")
}
})
}
func TestStrPtr(t *testing.T) {
s := "hello"
p := strPtr(s)
if p == nil {
t.Fatal("strPtr returned nil")
}
if *p != s {
t.Errorf("strPtr(%q) = %q, want %q", s, *p, s)
}
empty := ""
ep := strPtr(empty)
if *ep != empty {
t.Errorf("strPtr(empty) = %q, want %q", *ep, empty)
}
}

View File

@@ -63,15 +63,12 @@ type CreateAgentRequest struct {
// UpdateAgentRequest is the payload for PUT /api/agents/{id}. // UpdateAgentRequest is the payload for PUT /api/agents/{id}.
type UpdateAgentRequest struct { type UpdateAgentRequest struct {
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"` Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
DisplayName *string `json:"displayName,omitempty"` CurrentTask *string `json:"currentTask,omitempty"`
Role *string `json:"role,omitempty"` TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
LastActivityAt *string `json:"lastActivityAt,omitempty"` TaskElapsed *string `json:"taskElapsed,omitempty"`
CurrentTask *string `json:"currentTask,omitempty"` Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"`
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"` ErrorMessage *string `json:"errorMessage,omitempty"`
TaskElapsed *string `json:"taskElapsed,omitempty"`
Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"`
ErrorMessage *string `json:"errorMessage,omitempty"`
} }
// AgentStatusHistoryEntry represents a point-in-time status change for an agent. // AgentStatusHistoryEntry represents a point-in-time status change for an agent.

View File

@@ -1,46 +0,0 @@
# Control Center — Architecture Context
## Current State
The Control Center backend uses a **dual-path gateway client** architecture:
- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff.
- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup.
## Live Gateway Connection
### Startup Sequence
1. Both WS client and REST client start concurrently
2. REST client waits 30s for WS readiness signal (`wsReady` channel)
3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down")
4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling")
5. If no WS client configured → REST client polls immediately
### WebSocket Client (Primary)
- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN`
- Protocol: v3 handshake (challenge → connect → hello-ok)
- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update`
- Live events: `sessions.changed`, `presence`, `agent.config`
- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max)
### REST Poller (Fallback)
- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s)
- Only used when WS is unavailable
- Polls the `/api/agents` endpoint and syncs agent status changes
### Wiring
```
main.go
├── wsClient = NewWSClient(...)
├── restClient = NewClient(...)
├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready
├── restClient.SetWSClient(wsClient) // REST defers to WS
├── go wsClient.Start(ctx) // primary
└── go restClient.Start(ctx) // fallback (waits for WS)
```
## Key Design Decisions
- **Push over poll**: WS is preferred for real-time updates; REST is a safety net
- **DB first, then SSE**: All event handlers persist to DB before broadcasting
- **Graceful degradation**: System works without WS; REST provides basic functionality
- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called