Merge pull request 'CUB-200: Implement WebSocket Gateway Client' (!42) from agent/dex/CUB-200-ws-gateway-client into dev
Dev Build & Deploy / test-and-build (push) Failing after 1s
Dev Build & Deploy / docker-build-push (push) Has been skipped

Reviewed-on: #42
This commit was merged in pull request #42.
This commit is contained in:
2026-05-21 06:54:55 -04:00
6 changed files with 320 additions and 66 deletions
+2 -2
View File
@@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
# When using docker-compose, these are set in the services section # When using docker-compose, these are set in the services section
# See docker-compose.yml for service-specific environment variables # See docker-compose.yml for service-specific environment variables
# ── Database Configuration ───────────────────────────────────────────── # ── Database Configuration ───────────────────────────────────────────────
# Set in the db service environment section of docker-compose.yml # Set in the db service environment section of docker-compose.yml
# POSTGRES_USER=controlcenter # POSTGRES_USER=controlcenter
# POSTGRES_PASSWORD=controlcenter # POSTGRES_PASSWORD=controlcenter
@@ -47,4 +47,4 @@ GATEWAY_POLL_INTERVAL=5s
# For Docker deployment: # For Docker deployment:
# 1. Copy .env.example to .env (backend only) # 1. Copy .env.example to .env (backend only)
# 2. Run: docker compose up -d # 2. Run: docker compose up -d
# 3. Access frontend at http://localhost:3000 # 3. Access frontend at http://localhost:3000
+2 -2
View File
@@ -112,7 +112,7 @@ func main() {
<-quit <-quit
slog.Info("shutting down server...") slog.Info("shutting down server...")
cancel() // stop gateway polling cancel() // stop gateway clients
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel() defer shutdownCancel()
@@ -136,4 +136,4 @@ func parseLogLevel(level string) slog.Level {
default: default:
return slog.LevelInfo return slog.LevelInfo
} }
} }
+19 -19
View File
@@ -10,30 +10,30 @@ import (
// Config holds all application configuration. // Config holds all application configuration.
type Config struct { type Config struct {
Port int Port int
DatabaseURL string DatabaseURL string
CORSOrigin string CORSOrigin string
LogLevel string LogLevel string
Environment string Environment string
GatewayRestURL string GatewayRestURL string
GatewayRestPollInterval time.Duration GatewayRestPollInterval time.Duration
WSGatewayURL string WSGatewayURL string
WSGatewayToken string WSGatewayToken string
} }
// Load reads configuration from environment variables, applying defaults where // Load reads configuration from environment variables, applying defaults where
// values are not set. All secrets come from the environment — nothing is hardcoded. // values are not set. All secrets come from the environment — nothing is hardcoded.
func Load() *Config { func Load() *Config {
return &Config{ return &Config{
Port: getEnvInt("PORT", 8080), Port: getEnvInt("PORT", 8080),
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"), DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
CORSOrigin: getEnv("CORS_ORIGIN", "*"), CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"), LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"), Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"), WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
} }
} }
@@ -60,4 +60,4 @@ func getEnvDuration(key string, fallback time.Duration) time.Duration {
} }
} }
return fallback return fallback
} }
+31 -28
View File
@@ -1,6 +1,10 @@
// Package gateway provides an OpenClaw gateway integration client that // Package gateway provides an OpenClaw gateway integration client that
// polls agent states, persists them via the repository layer, and broadcasts // polls agent states, persists them via the repository layer, and broadcasts
// changes through the SSE broker for real-time frontend updates. // changes through the SSE broker for real-time frontend updates.
//
// When a WSClient is wired via SetWSClient, the REST poller becomes a
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect within 30 seconds.
package gateway package gateway
import ( import (
@@ -29,7 +33,7 @@ type Client struct {
broker *handler.Broker broker *handler.Broker
wsClient *WSClient // optional WS client; when set, REST is fallback only wsClient *WSClient // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race wsReadyOnce sync.Once // protects wsReady close from double-close race
} }
// Config holds gateway client configuration, typically loaded from environment. // Config holds gateway client configuration, typically loaded from environment.
@@ -140,7 +144,6 @@ func (c *Client) poll(ctx context.Context) {
} }
for _, ga := range agents { for _, ga := range agents {
// Check if agent already exists; if so, update; otherwise create.
existing, err := c.agents.Get(ctx, ga.ID) existing, err := c.agents.Get(ctx, ga.ID)
if err != nil { if err != nil {
// Not found — create it // Not found — create it
@@ -185,51 +188,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
slog.Info("seeding demo agents") slog.Info("seeding demo agents")
demoAgents := []models.AgentCardData{ demoAgents := []models.AgentCardData{
{ {
ID: "otto", ID: "otto",
DisplayName: "Otto", DisplayName: "Otto",
Role: "Orchestrator", Role: "Orchestrator",
Status: models.AgentStatusActive, Status: models.AgentStatusActive,
CurrentTask: strPtr("Orchestrating tasks"), CurrentTask: strPtr("Orchestrating tasks"),
SessionKey: "otto-session", SessionKey: "otto-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "rex", ID: "rex",
DisplayName: "Rex", DisplayName: "Rex",
Role: "Frontend Dev", Role: "Frontend Dev",
Status: models.AgentStatusIdle, Status: models.AgentStatusIdle,
SessionKey: "rex-session", SessionKey: "rex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339), LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
}, },
{ {
ID: "dex", ID: "dex",
DisplayName: "Dex", DisplayName: "Dex",
Role: "Backend Dev", Role: "Backend Dev",
Status: models.AgentStatusThinking, Status: models.AgentStatusThinking,
CurrentTask: strPtr("Designing API contracts"), CurrentTask: strPtr("Designing API contracts"),
SessionKey: "dex-session", SessionKey: "dex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "hex", ID: "hex",
DisplayName: "Hex", DisplayName: "Hex",
Role: "Database Specialist", Role: "Database Specialist",
Status: models.AgentStatusActive, Status: models.AgentStatusActive,
CurrentTask: strPtr("Reviewing schema migrations"), CurrentTask: strPtr("Reviewing schema migrations"),
SessionKey: "hex-session", SessionKey: "hex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "pip", ID: "pip",
DisplayName: "Pip", DisplayName: "Pip",
Role: "Edge Device Dev", Role: "Edge Device Dev",
Status: models.AgentStatusIdle, Status: models.AgentStatusIdle,
SessionKey: "pip-session", SessionKey: "pip-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339), LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
}, },
} }
@@ -243,4 +246,4 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
return nil return nil
} }
func strPtr(s string) *string { return &s } func strPtr(s string) *string { return &s }
+35 -15
View File
@@ -92,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol. // wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
type wsFrame struct { type wsFrame struct {
Type string `json:"type"` // "req", "res", "event" Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames) Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames) Event string `json:"event,omitempty"` // event name (event frames)
Params json.RawMessage `json:"params,omitempty"` Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"` Result json.RawMessage `json:"result,omitempty"`
Error *wsError `json:"error,omitempty"` Error *wsError `json:"error,omitempty"`
@@ -229,7 +229,34 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
c.connId = helloOK.ConnID c.connId = helloOK.ConnID
c.connMu.Unlock() c.connMu.Unlock()
// Notify REST client that WS is live so it stands down // Step 2b: Register live event handlers BEFORE starting the read
// loop. This eliminates the race window where readLoop dispatches
// live events as "unhandled" because no handlers are registered yet.
// The handlers only depend on c.agents and c.broker, which are wired
// in the constructor — they do not need initialSync to have completed.
c.registerEventHandlers()
// Step 2c: Start the read loop in a goroutine so that Send() in
// initialSync can receive responses. The read loop goroutine will
// continue running after initialSync completes, routing live events
// and any future RPC responses. Because handlers are already
// registered, any events arriving during or after initialSync are
// dispatched correctly.
readLoopErrCh := make(chan error, 1)
go func() {
readLoopErrCh <- c.readLoop(ctx, conn)
}()
// Step 2d: Initial sync — fetch agents + sessions from gateway.
// This works because the read loop is active and will route
// response frames back to Send() via handleResponse.
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
}
// Notify REST client that WS is live so it stands down.
// This must happen AFTER initialSync so that the REST poller
// doesn't start polling while we're still syncing.
if c.restClient != nil { if c.restClient != nil {
c.restClient.MarkWSReady() c.restClient.MarkWSReady()
c.logger.Info("ws client notified REST fallback to stand down") c.logger.Info("ws client notified REST fallback to stand down")
@@ -238,16 +265,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect // Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
c.wsReadyOnce = sync.Once{} c.wsReadyOnce = sync.Once{}
// Step 2b: Initial sync — fetch agents + sessions from gateway // Step 3: Wait for the read loop goroutine to finish (blocks
if err := c.initialSync(ctx); err != nil { // until the connection drops or context is cancelled).
c.logger.Warn("initial sync failed, will continue with read loop", "error", err) return <-readLoopErrCh
}
// Step 2c: Register live event handlers
c.registerEventHandlers()
// Step 3: Read loop
return c.readLoop(ctx, conn)
} }
// readChallenge reads the first frame from the gateway, which must be a // readChallenge reads the first frame from the gateway, which must be a
@@ -11,6 +11,7 @@ import (
"testing" "testing"
"time" "time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@@ -466,6 +467,236 @@ func TestAgentItemToCard(t *testing.T) {
}) })
} }
// ── 6. Test: Initial sync ordering (readLoop active before Send) ──────────
// TestConnectAndRun_InitialSyncOrdering verifies that the WS client
// completes initial sync successfully. This test would hang/timeout if
// readLoop were NOT started before initialSync, because Send() relies on
// readLoop→routeFrame→handleResponse to deliver RPC responses.
func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
srv := newTestWSServer(t, func(conn *websocket.Conn) {
// Handshake
handleHandshake(t, conn)
// After handshake, respond to RPCs
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
var result any
switch method {
case "agents.list":
result = []map[string]any{
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
{"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"},
}
case "sessions.list":
result = []map[string]any{
{"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"},
}
default:
result = map[string]any{}
}
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait for initial sync to complete by checking repo state.
// The agents should be persisted from the RPC responses.
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
repo.mu.Lock()
_, ottoOK := repo.agents["otto"]
_, dexOK := repo.agents["dex"]
repo.mu.Unlock()
if ottoOK && dexOK {
break
}
time.Sleep(50 * time.Millisecond)
}
repo.mu.Lock()
_, ottoOK := repo.agents["otto"]
_, dexOK := repo.agents["dex"]
repo.mu.Unlock()
if !ottoOK {
t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()")
}
if !dexOK {
t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()")
}
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly")
}
}
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
// arriving during initial sync are NOT dropped. This is a regression test
// for the race where readLoop started before registerEventHandlers(),
// causing events read during that window to be logged as "unhandled" and lost.
//
// The mock server sends a live event (sessions.changed) right after the
// handshake, interleaved with the RPC responses for agents.list and
// sessions.list. The test asserts the event is received by the handler.
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Pre-seed an agent so the event handler can update it.
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Status: models.AgentStatusIdle,
}
srv := newTestWSServer(t, func(conn *websocket.Conn) {
// Handshake
handleHandshake(t, conn)
// After handshake, process RPCs and inject a live event.
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
// Respond to agents.list RPC
if method == "agents.list" {
// Before responding, inject a live event — simulates
// a gateway pushing a presence update during sync.
evt := map[string]any{
"type": "event",
"event": "presence",
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
}
if err := conn.WriteJSON(evt); err != nil {
break
}
// Now send the RPC response
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": []map[string]any{
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
},
}
if err := conn.WriteJSON(res); err != nil {
break
}
continue
}
// Respond to sessions.list RPC
if method == "sessions.list" {
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": []map[string]any{},
}
if err := conn.WriteJSON(res); err != nil {
break
}
continue
}
// Default response for other methods
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": map[string]any{},
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait for the presence event to be processed by checking the repo.
// The presence handler updates the agent, so we check for the
// lastActivityAt change.
deadline := time.Now().Add(5 * time.Second)
var lastActivity string
for time.Now().Before(deadline) {
repo.mu.Lock()
if a, ok := repo.agents["otto"]; ok {
lastActivity = a.LastActivity
}
repo.mu.Unlock()
if lastActivity == "2025-05-20T12:30:00Z" {
break
}
time.Sleep(50 * time.Millisecond)
}
if lastActivity != "2025-05-20T12:30:00Z" {
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
}
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly")
}
}
func TestStrPtr(t *testing.T) { func TestStrPtr(t *testing.T) {
s := "hello" s := "hello"
p := strPtr(s) p := strPtr(s)