CUB-200: Implement WebSocket Gateway Client #42

Merged
overseer merged 6 commits from agent/dex/CUB-200-ws-gateway-client into dev 2026-05-21 06:54:55 -04:00
6 changed files with 320 additions and 66 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
# When using docker-compose, these are set in the services section
# See docker-compose.yml for service-specific environment variables
# ── Database Configuration ─────────────────────────────────────────────
# ── Database Configuration ───────────────────────────────────────────────
# Set in the db service environment section of docker-compose.yml
# POSTGRES_USER=controlcenter
# POSTGRES_PASSWORD=controlcenter
+1 -1
View File
@@ -112,7 +112,7 @@ func main() {
<-quit
slog.Info("shutting down server...")
cancel() // stop gateway polling
cancel() // stop gateway clients
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel()
+18 -18
View File
@@ -10,30 +10,30 @@ import (
// Config holds all application configuration.
type Config struct {
Port int
DatabaseURL string
CORSOrigin string
LogLevel string
Environment string
GatewayRestURL string
GatewayRestPollInterval time.Duration
WSGatewayURL string
WSGatewayToken string
Port int
DatabaseURL string
CORSOrigin string
LogLevel string
Environment string
GatewayRestURL string
GatewayRestPollInterval time.Duration
WSGatewayURL string
WSGatewayToken string
}
// Load reads configuration from environment variables, applying defaults where
// values are not set. All secrets come from the environment — nothing is hardcoded.
func Load() *Config {
return &Config{
Port: getEnvInt("PORT", 8080),
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
Port: getEnvInt("PORT", 8080),
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
}
}
+30 -27
View File
@@ -1,6 +1,10 @@
// Package gateway provides an OpenClaw gateway integration client that
// polls agent states, persists them via the repository layer, and broadcasts
// changes through the SSE broker for real-time frontend updates.
//
// When a WSClient is wired via SetWSClient, the REST poller becomes a
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect within 30 seconds.
package gateway
import (
@@ -29,7 +33,7 @@ type Client struct {
broker *handler.Broker
wsClient *WSClient // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race
wsReadyOnce sync.Once // protects wsReady close from double-close race
}
// Config holds gateway client configuration, typically loaded from environment.
@@ -140,7 +144,6 @@ func (c *Client) poll(ctx context.Context) {
}
for _, ga := range agents {
// Check if agent already exists; if so, update; otherwise create.
existing, err := c.agents.Get(ctx, ga.ID)
if err != nil {
// Not found — create it
@@ -185,51 +188,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
slog.Info("seeding demo agents")
demoAgents := []models.AgentCardData{
{
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusActive,
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Orchestrating tasks"),
SessionKey: "otto-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
SessionKey: "rex-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
},
{
ID: "dex",
DisplayName: "Dex",
Role: "Backend Dev",
Status: models.AgentStatusThinking,
ID: "dex",
DisplayName: "Dex",
Role: "Backend Dev",
Status: models.AgentStatusThinking,
CurrentTask: strPtr("Designing API contracts"),
SessionKey: "dex-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "hex",
DisplayName: "Hex",
Role: "Database Specialist",
Status: models.AgentStatusActive,
ID: "hex",
DisplayName: "Hex",
Role: "Database Specialist",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Reviewing schema migrations"),
SessionKey: "hex-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "pip",
DisplayName: "Pip",
Role: "Edge Device Dev",
Status: models.AgentStatusIdle,
ID: "pip",
DisplayName: "Pip",
Role: "Edge Device Dev",
Status: models.AgentStatusIdle,
SessionKey: "pip-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
},
}
+35 -15
View File
@@ -92,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
type wsFrame struct {
Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames)
Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames)
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *wsError `json:"error,omitempty"`
@@ -229,7 +229,34 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
c.connId = helloOK.ConnID
c.connMu.Unlock()
// Notify REST client that WS is live so it stands down
// Step 2b: Register live event handlers BEFORE starting the read
// loop. This eliminates the race window where readLoop dispatches
// live events as "unhandled" because no handlers are registered yet.
// The handlers only depend on c.agents and c.broker, which are wired
// in the constructor — they do not need initialSync to have completed.
c.registerEventHandlers()
// Step 2c: Start the read loop in a goroutine so that Send() in
// initialSync can receive responses. The read loop goroutine will
// continue running after initialSync completes, routing live events
// and any future RPC responses. Because handlers are already
// registered, any events arriving during or after initialSync are
// dispatched correctly.
readLoopErrCh := make(chan error, 1)
go func() {
readLoopErrCh <- c.readLoop(ctx, conn)
}()
// Step 2d: Initial sync — fetch agents + sessions from gateway.
// This works because the read loop is active and will route
// response frames back to Send() via handleResponse.
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
}
// Notify REST client that WS is live so it stands down.
// This must happen AFTER initialSync so that the REST poller
// doesn't start polling while we're still syncing.
if c.restClient != nil {
c.restClient.MarkWSReady()
c.logger.Info("ws client notified REST fallback to stand down")
@@ -238,16 +265,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
c.wsReadyOnce = sync.Once{}
// Step 2b: Initial sync — fetch agents + sessions from gateway
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
}
// Step 2c: Register live event handlers
c.registerEventHandlers()
// Step 3: Read loop
return c.readLoop(ctx, conn)
// Step 3: Wait for the read loop goroutine to finish (blocks
// until the connection drops or context is cancelled).
return <-readLoopErrCh
}
// readChallenge reads the first frame from the gateway, which must be a
@@ -11,6 +11,7 @@ import (
"testing"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
"github.com/gorilla/websocket"
@@ -466,6 +467,236 @@ func TestAgentItemToCard(t *testing.T) {
})
}
// ── 6. Test: Initial sync ordering (readLoop active before Send) ──────────
// TestConnectAndRun_InitialSyncOrdering verifies that the WS client
// completes initial sync successfully. This test would hang/timeout if
// readLoop were NOT started before initialSync, because Send() relies on
// readLoop→routeFrame→handleResponse to deliver RPC responses.
func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
srv := newTestWSServer(t, func(conn *websocket.Conn) {
// Handshake
handleHandshake(t, conn)
// After handshake, respond to RPCs
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
var result any
switch method {
case "agents.list":
result = []map[string]any{
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
{"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"},
}
case "sessions.list":
result = []map[string]any{
{"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"},
}
default:
result = map[string]any{}
}
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait for initial sync to complete by checking repo state.
// The agents should be persisted from the RPC responses.
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
repo.mu.Lock()
_, ottoOK := repo.agents["otto"]
_, dexOK := repo.agents["dex"]
repo.mu.Unlock()
if ottoOK && dexOK {
break
}
time.Sleep(50 * time.Millisecond)
}
repo.mu.Lock()
_, ottoOK := repo.agents["otto"]
_, dexOK := repo.agents["dex"]
repo.mu.Unlock()
if !ottoOK {
t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()")
}
if !dexOK {
t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()")
}
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly")
}
}
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
// arriving during initial sync are NOT dropped. This is a regression test
// for the race where readLoop started before registerEventHandlers(),
// causing events read during that window to be logged as "unhandled" and lost.
//
// The mock server sends a live event (sessions.changed) right after the
// handshake, interleaved with the RPC responses for agents.list and
// sessions.list. The test asserts the event is received by the handler.
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Pre-seed an agent so the event handler can update it.
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Status: models.AgentStatusIdle,
}
srv := newTestWSServer(t, func(conn *websocket.Conn) {
// Handshake
handleHandshake(t, conn)
// After handshake, process RPCs and inject a live event.
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
// Respond to agents.list RPC
if method == "agents.list" {
// Before responding, inject a live event — simulates
// a gateway pushing a presence update during sync.
evt := map[string]any{
"type": "event",
"event": "presence",
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
}
if err := conn.WriteJSON(evt); err != nil {
break
}
// Now send the RPC response
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": []map[string]any{
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
},
}
if err := conn.WriteJSON(res); err != nil {
break
}
continue
}
// Respond to sessions.list RPC
if method == "sessions.list" {
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": []map[string]any{},
}
if err := conn.WriteJSON(res); err != nil {
break
}
continue
}
// Default response for other methods
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": map[string]any{},
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait for the presence event to be processed by checking the repo.
// The presence handler updates the agent, so we check for the
// lastActivityAt change.
deadline := time.Now().Add(5 * time.Second)
var lastActivity string
for time.Now().Before(deadline) {
repo.mu.Lock()
if a, ok := repo.agents["otto"]; ok {
lastActivity = a.LastActivity
}
repo.mu.Unlock()
if lastActivity == "2025-05-20T12:30:00Z" {
break
}
time.Sleep(50 * time.Millisecond)
}
if lastActivity != "2025-05-20T12:30:00Z" {
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
}
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly")
}
}
func TestStrPtr(t *testing.T) {
s := "hello"
p := strPtr(s)