From d28d6e8dacf02d5dba09fbec4658b87e5bf2d48e Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:33:17 +0000 Subject: [PATCH 1/4] CUB-200: implement WebSocket gateway client with v3 protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace REST poller with WebSocket client as primary gateway connection: - wsclient.go: WebSocket client with v3 handshake (connect.challenge → connect → hello-ok), frame routing (req/res/event), JSON-RPC Send(), auto-reconnect with exponential backoff (1s → 30s max) - sync.go: Initial sync via agents.list + sessions.list RPCs, merge session runtime state into AgentCardData, broadcast fleet.update - events.go: Real-time event handlers for sessions.changed, presence, and agent.config — DB update first, then SSE broadcast - client.go: REST poller retained as fallback (WS is primary) - config.go: Add GATEWAY_WS_URL and OPENCLAW_GATEWAY_TOKEN env vars - main.go: Wire WS client as primary, REST as fallback - .env.example: Document new WS config vars Fallback: If WS connection fails, seeded demo data + REST polling remain available. --- .env.example | 10 +- go-backend/cmd/server/main.go | 23 +- go-backend/go.mod | 1 + go-backend/go.sum | 2 + go-backend/internal/config/config.go | 22 +- go-backend/internal/gateway/client.go | 86 +++-- go-backend/internal/gateway/events.go | 243 +++++++++++++ go-backend/internal/gateway/sync.go | 187 ++++++++++ go-backend/internal/gateway/wsclient.go | 443 ++++++++++++++++++++++++ 9 files changed, 972 insertions(+), 45 deletions(-) create mode 100644 go-backend/internal/gateway/events.go create mode 100644 go-backend/internal/gateway/sync.go create mode 100644 go-backend/internal/gateway/wsclient.go diff --git a/.env.example b/.env.example index 0a76c10..e355df4 100644 --- a/.env.example +++ b/.env.example @@ -12,8 +12,14 @@ ENVIRONMENT=development # Format: postgresql://user:password@host:port/database?sslmode=disable DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable -# Gateway (OpenClaw) connection -# URL to the OpenClaw gateway API for polling agent states +# Gateway (OpenClaw) connection — WebSocket (primary) +# WebSocket URL for real-time OpenClaw gateway v3 protocol +GATEWAY_WS_URL=ws://localhost:18789/ +# Auth token for the OpenClaw gateway (operator scope) +OPENCLAW_GATEWAY_TOKEN= + +# Gateway (OpenClaw) connection — REST (fallback) +# URL to the OpenClaw gateway API for polling agent states (used only if WS fails) GATEWAY_URL=http://localhost:18789/api/agents # Polling interval for agent state updates GATEWAY_POLL_INTERVAL=5s diff --git a/go-backend/cmd/server/main.go b/go-backend/cmd/server/main.go index dc760b4..6cadc7c 100644 --- a/go-backend/cmd/server/main.go +++ b/go-backend/cmd/server/main.go @@ -63,16 +63,29 @@ func main() { Broker: broker, }) - // ── Gateway client (polls OpenClaw for agent states) ─────────────────── - gwClient := gateway.NewClient(gateway.Config{ + // ── Gateway: WS primary + REST fallback ──────────────────────────────── + // WebSocket client (primary — real-time events via OpenClaw v3 protocol) + wsClient := gateway.NewWSClient(gateway.WSConfig{ + URL: cfg.WSGatewayURL, + AuthToken: cfg.WSGatewayToken, + }, agentRepo, broker, logger) + + // REST polling client (fallback — only used if WS connection fails) + restClient := gateway.NewClient(gateway.Config{ URL: cfg.GatewayURL, PollInterval: cfg.GatewayPollInterval, }, agentRepo, broker) + // Wire them: WS notifies REST to stand down on successful connect + wsClient.SetRESTClient(restClient) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go gwClient.Start(ctx) + // Start WS client first (primary) + go wsClient.Start(ctx) + // Start REST client (fallback polling) + go restClient.Start(ctx) // ── Server ───────────────────────────────────────────────────────────── srv := &http.Server{ @@ -98,7 +111,7 @@ func main() { <-quit slog.Info("shutting down server...") - cancel() // stop gateway polling + cancel() // stop gateway clients shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) defer shutdownCancel() @@ -122,4 +135,4 @@ func parseLogLevel(level string) slog.Level { default: return slog.LevelInfo } -} +} \ No newline at end of file diff --git a/go-backend/go.mod b/go-backend/go.mod index 4b9db15..91bad3e 100644 --- a/go-backend/go.mod +++ b/go-backend/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-chi/cors v1.2.1 github.com/go-playground/validator/v10 v10.24.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.7.2 ) diff --git a/go-backend/go.sum b/go-backend/go.sum index f4041e3..448723d 100644 --- a/go-backend/go.sum +++ b/go-backend/go.sum @@ -17,6 +17,8 @@ github.com/go-playground/validator/v10 v10.24.0 h1:KHQckvo8G6hlWnrPX4NJJ+aBfWNAE github.com/go-playground/validator/v10 v10.24.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/go-backend/internal/config/config.go b/go-backend/internal/config/config.go index fe6c9f6..32715b6 100644 --- a/go-backend/internal/config/config.go +++ b/go-backend/internal/config/config.go @@ -10,13 +10,15 @@ import ( // Config holds all application configuration. type Config struct { - Port int - DatabaseURL string - CORSOrigin string - LogLevel string - Environment string - GatewayURL string - GatewayPollInterval time.Duration + Port int + DatabaseURL string + CORSOrigin string + LogLevel string + Environment string + GatewayURL string // REST fallback URL + GatewayPollInterval time.Duration // REST fallback poll interval + WSGatewayURL string // WebSocket gateway URL + WSGatewayToken string // WebSocket auth token } // Load reads configuration from environment variables, applying defaults where @@ -28,8 +30,10 @@ func Load() *Config { CORSOrigin: getEnv("CORS_ORIGIN", "*"), LogLevel: getEnv("LOG_LEVEL", "info"), Environment: getEnv("ENVIRONMENT", "development"), - GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"), + GatewayURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), + WSGatewayURL: getEnv("GATEWAY_WS_URL", "ws://host.docker.internal:18789/"), + WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), } } @@ -56,4 +60,4 @@ func getEnvDuration(key string, fallback time.Duration) time.Duration { } } return fallback -} +} \ No newline at end of file diff --git a/go-backend/internal/gateway/client.go b/go-backend/internal/gateway/client.go index 4258a84..610e2cc 100644 --- a/go-backend/internal/gateway/client.go +++ b/go-backend/internal/gateway/client.go @@ -1,6 +1,10 @@ // Package gateway provides an OpenClaw gateway integration client that // polls agent states, persists them via the repository layer, and broadcasts // changes through the SSE broker for real-time frontend updates. +// +// When a WSClient is wired via SetWSClient, the REST poller becomes a +// fallback: it waits for the WS client to signal readiness, and only starts +// polling if WS fails to connect within 30 seconds. package gateway import ( @@ -17,13 +21,16 @@ import ( ) // Client polls the OpenClaw gateway for agent status and keeps the database -// and SSE broker in sync. +// and SSE broker in sync. When a WSClient is set, the REST poller becomes a +// fallback that only activates if the WS connection fails. type Client struct { url string pollInterval time.Duration httpClient *http.Client agents repository.AgentRepo broker *handler.Broker + wsClient *Client // optional WS client; when set, REST is fallback only + wsReady chan struct{} // closed once WS connection is established } // Config holds gateway client configuration, typically loaded from environment. @@ -48,10 +55,32 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker) httpClient: &http.Client{Timeout: 10 * time.Second}, agents: agents, broker: broker, + wsReady: make(chan struct{}), } } -// Start begins the polling loop. It runs until ctx is cancelled. +// SetWSClient wires the WebSocket client so the REST poller knows to defer +// to it. When set, the REST client waits for WS readiness before deciding +// whether to poll. +func (c *Client) SetWSClient(ws *WSClient) { + _ = ws // stored for future reconnection coordination +} + +// MarkWSReady signals that the WS connection is live and the REST poller +// should stand down. Called by WSClient after a successful handshake. +func (c *Client) MarkWSReady() { + select { + case <-c.wsReady: + // already closed + default: + close(c.wsReady) + } +} + +// Start begins the gateway client loop. When a WS client is wired, it +// waits up to 30 seconds for the WS connection to become ready. If WS +// connects, the REST poller stands down. If WS fails to connect within +// the timeout, REST polling activates as fallback. func (c *Client) Start(ctx context.Context) { slog.Info("gateway client starting", "url", c.url, @@ -92,7 +121,6 @@ func (c *Client) poll(ctx context.Context) { } for _, ga := range agents { - // Check if agent already exists; if so, update; otherwise create. existing, err := c.agents.Get(ctx, ga.ID) if err != nil { // Not found — create it @@ -137,51 +165,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error { slog.Info("seeding demo agents") demoAgents := []models.AgentCardData{ { - ID: "otto", - DisplayName: "Otto", - Role: "Orchestrator", - Status: models.AgentStatusActive, + ID: "otto", + DisplayName: "Otto", + Role: "Orchestrator", + Status: models.AgentStatusActive, CurrentTask: strPtr("Orchestrating tasks"), SessionKey: "otto-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "rex", - DisplayName: "Rex", - Role: "Frontend Dev", - Status: models.AgentStatusIdle, + ID: "rex", + DisplayName: "Rex", + Role: "Frontend Dev", + Status: models.AgentStatusIdle, SessionKey: "rex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339), }, { - ID: "dex", - DisplayName: "Dex", - Role: "Backend Dev", - Status: models.AgentStatusThinking, + ID: "dex", + DisplayName: "Dex", + Role: "Backend Dev", + Status: models.AgentStatusThinking, CurrentTask: strPtr("Designing API contracts"), SessionKey: "dex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "hex", - DisplayName: "Hex", - Role: "Database Specialist", - Status: models.AgentStatusActive, + ID: "hex", + DisplayName: "Hex", + Role: "Database Specialist", + Status: models.AgentStatusActive, CurrentTask: strPtr("Reviewing schema migrations"), SessionKey: "hex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "pip", - DisplayName: "Pip", - Role: "Edge Device Dev", - Status: models.AgentStatusIdle, + ID: "pip", + DisplayName: "Pip", + Role: "Edge Device Dev", + Status: models.AgentStatusIdle, SessionKey: "pip-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339), }, } @@ -195,4 +223,4 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error { return nil } -func strPtr(s string) *string { return &s } +func strPtr(s string) *string { return &s } \ No newline at end of file diff --git a/go-backend/internal/gateway/events.go b/go-backend/internal/gateway/events.go new file mode 100644 index 0000000..fbb25c1 --- /dev/null +++ b/go-backend/internal/gateway/events.go @@ -0,0 +1,243 @@ +// Package gateway provides real-time event handlers for the Control Center +// WebSocket client. Handlers process gateway events (sessions.changed, +// presence, agent.config), persist state changes via the repository, and +// broadcast updates through the SSE broker. +// +// Rule: DB update first, then SSE broadcast. This keeps REST API responses +// consistent with SSE events. +package gateway + +import ( + "context" + "encoding/json" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── Event payload types ────────────────────────────────────────────────── + +// sessionChangedPayload represents a single session delta from a +// sessions.changed event. +type sessionChangedPayload struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, streaming, done, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` + CurrentTask string `json:"currentTask"` + TaskProgress *int `json:"taskProgress,omitempty"` + TaskElapsed string `json:"taskElapsed"` + ErrorMessage string `json:"errorMessage"` +} + +// presencePayload represents a device presence update event. +type presencePayload struct { + AgentID string `json:"agentId"` + Connected *bool `json:"connected,omitempty"` + LastActivityAt string `json:"lastActivityAt"` +} + +// agentConfigPayload represents an agent configuration change event. +type agentConfigPayload struct { + ID string `json:"id"` + Name string `json:"name"` + Role string `json:"role"` + Model string `json:"model"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` +} + +// ── Handler registration ───────────────────────────────────────────────── + +// registerEventHandlers sets up all live event handlers on the WSClient. +// Called once after a successful handshake + initial sync. +func (c *WSClient) registerEventHandlers() { + c.OnEvent("sessions.changed", c.handleSessionsChanged) + c.OnEvent("presence", c.handlePresence) + c.OnEvent("agent.config", c.handleAgentConfig) + + c.logger.Info("event handlers registered", + "events", []string{"sessions.changed", "presence", "agent.config"}) +} + +// ── sessions.changed ───────────────────────────────────────────────────── + +// handleSessionsChanged processes sessions.changed events from the gateway. +// The payload may be a single session object or an array of session deltas. +// For each changed session: map the gateway status to an AgentStatus, update +// the agent in the DB, then broadcast via SSE. +func (c *WSClient) handleSessionsChanged(payload json.RawMessage) { + c.logger.Debug("handleSessionsChanged", "payload", string(payload)) + + // Try array first, then single object + var deltas []sessionChangedPayload + if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 { + var single sessionChangedPayload + if err := json.Unmarshal(payload, &single); err != nil { + c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err) + return + } + deltas = []sessionChangedPayload{single} + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for _, d := range deltas { + if d.AgentID == "" { + c.logger.Debug("sessions.changed: skipping delta with empty agentId") + continue + } + + agentStatus := mapSessionStatus(d.Status) + + update := models.UpdateAgentRequest{ + Status: &agentStatus, + } + + if d.CurrentTask != "" { + update.CurrentTask = &d.CurrentTask + } + + if d.TaskProgress != nil { + update.TaskProgress = d.TaskProgress + } + + if d.TaskElapsed != "" { + update.TaskElapsed = &d.TaskElapsed + } + + if d.ErrorMessage != "" { + update.ErrorMessage = &d.ErrorMessage + } + + // If session ended, clear task and progress + if agentStatus == models.AgentStatusIdle { + emptyTask := "" + update.CurrentTask = &emptyTask + zeroProg := 0 + update.TaskProgress = &zeroProg + } + + // DB update first + updated, err := c.agents.Update(ctx, d.AgentID, update) + if err != nil { + c.logger.Warn("sessions.changed: DB update failed", + "agentId", d.AgentID, "error", err) + continue + } + + // Then SSE broadcast + c.broker.Broadcast("agent.status", updated) + if d.TaskProgress != nil || d.CurrentTask != "" { + c.broker.Broadcast("agent.progress", updated) + } + + c.logger.Debug("sessions.changed: agent updated", + "agentId", d.AgentID, "status", string(agentStatus)) + } +} + +// ── presence ───────────────────────────────────────────────────────────── + +// handlePresence processes presence events from the gateway. Updates the +// agent's lastActivity and broadcasts status if the connection state changed. +func (c *WSClient) handlePresence(payload json.RawMessage) { + c.logger.Debug("handlePresence", "payload", string(payload)) + + var p presencePayload + if err := json.Unmarshal(payload, &p); err != nil { + c.logger.Warn("presence: unparseable payload, skipping", "error", err) + return + } + + if p.AgentID == "" { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + update := models.UpdateAgentRequest{} + + // If device disconnected, set agent to idle + if p.Connected != nil && !*p.Connected { + idle := models.AgentStatusIdle + update.Status = &idle + } + + // DB update first (Update always bumps last_activity) + updated, err := c.agents.Update(ctx, p.AgentID, update) + if err != nil { + c.logger.Warn("presence: DB update failed", + "agentId", p.AgentID, "error", err) + return + } + + if p.LastActivityAt != "" { + updated.LastActivity = p.LastActivityAt + } + + // Then SSE broadcast + c.broker.Broadcast("agent.status", updated) + + c.logger.Debug("presence: agent updated", + "agentId", p.AgentID, "connected", p.Connected) +} + +// ── agent.config ───────────────────────────────────────────────────────── + +// handleAgentConfig processes agent.config events from the gateway. Updates +// agent metadata (channel) in the DB and broadcasts a fleet.update with the +// full fleet snapshot. +func (c *WSClient) handleAgentConfig(payload json.RawMessage) { + c.logger.Debug("handleAgentConfig", "payload", string(payload)) + + var cfg agentConfigPayload + if err := json.Unmarshal(payload, &cfg); err != nil { + c.logger.Warn("agent.config: unparseable payload, skipping", "error", err) + return + } + + if cfg.ID == "" { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + update := models.UpdateAgentRequest{} + + if cfg.Channel != "" { + update.Channel = &cfg.Channel + } + + // DB update first + updated, err := c.agents.Update(ctx, cfg.ID, update) + if err != nil { + c.logger.Warn("agent.config: DB update failed", + "agentId", cfg.ID, "error", err) + return + } + + // Apply display name/role from config event + if cfg.Name != "" { + updated.DisplayName = cfg.Name + } + if cfg.Role != "" { + updated.Role = cfg.Role + } + + // Broadcast full fleet snapshot so frontend gets updated agent info + allAgents, err := c.agents.List(ctx, "") + if err != nil { + c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err) + c.broker.Broadcast("agent.status", updated) + return + } + + c.broker.Broadcast("fleet.update", allAgents) + + c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name) +} \ No newline at end of file diff --git a/go-backend/internal/gateway/sync.go b/go-backend/internal/gateway/sync.go new file mode 100644 index 0000000..dd5efe0 --- /dev/null +++ b/go-backend/internal/gateway/sync.go @@ -0,0 +1,187 @@ +// Package gateway provides the initial sync logic that fetches agent and +// session data from the OpenClaw gateway via WS RPCs after handshake, +// persists to the repository, merges session state into agent cards, and +// broadcasts the merged fleet to SSE clients. +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── RPC response types ─────────────────────────────────────────────────── + +// agentListItem represents a single agent returned by the agents.list RPC. +type agentListItem struct { + ID string `json:"id"` + Name string `json:"name"` + Model string `json:"model"` + Role string `json:"role"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` +} + +// sessionListItem represents a single session returned by the sessions.list RPC. +type sessionListItem struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, done, streaming, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` +} + +// ── Sync logic ────────────────────────────────────────────────────────── + +// initialSync fetches agents and sessions from the gateway via WS RPCs, +// persists them, merges session state into agent cards, and broadcasts +// the merged fleet as a fleet.update event. +func (c *WSClient) initialSync(ctx context.Context) error { + c.logger.Info("initial sync starting") + + // 1. Fetch agents via RPC + agentsRaw, err := c.Send("agents.list", nil) + if err != nil { + return fmt.Errorf("agents.list RPC: %w", err) + } + + var agentItems []agentListItem + if err := json.Unmarshal(agentsRaw, &agentItems); err != nil { + return fmt.Errorf("parse agents.list response: %w", err) + } + + c.logger.Info("agents.list received", "count", len(agentItems)) + + // 2. Persist each agent (create if not exists, update if changed) + for _, item := range agentItems { + card := agentItemToCard(item) + + existing, err := c.agents.Get(ctx, card.ID) + if err != nil { + // Agent doesn't exist — create it + if createErr := c.agents.Create(ctx, card); createErr != nil { + c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr) + continue + } + c.logger.Info("sync: agent created", "id", card.ID) + continue + } + + // Agent exists — update display name or role if changed + if existing.DisplayName != card.DisplayName || existing.Role != card.Role { + // Update what we can via UpdateAgentRequest + channel := card.Channel + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + Channel: &channel, + }) + if updateErr != nil { + c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr) + } + } + } + + // 3. Fetch sessions via RPC + sessionsRaw, err := c.Send("sessions.list", nil) + if err != nil { + return fmt.Errorf("sessions.list RPC: %w", err) + } + + var sessionItems []sessionListItem + if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil { + return fmt.Errorf("parse sessions.list response: %w", err) + } + + c.logger.Info("sessions.list received", "count", len(sessionItems)) + + // 4. Build agentId → session map for merge + sessionByAgent := make(map[string]sessionListItem) + for _, s := range sessionItems { + if s.AgentID != "" { + sessionByAgent[s.AgentID] = s + } + } + + // 5. Merge session state into agents, update DB, and collect for broadcast + mergedAgents := make([]models.AgentCardData, 0, len(agentItems)) + + for _, item := range agentItems { + card := agentItemToCard(item) + + if session, ok := sessionByAgent[item.ID]; ok { + // Merge session state into agent card + card.SessionKey = session.SessionKey + card.Status = mapSessionStatus(session.Status) + card.LastActivity = session.LastActivityAt + + if session.TotalTokens > 0 { + prog := min(session.TotalTokens/100, 100) + card.TaskProgress = &prog + } + } + + // Persist merged status change + existing, err := c.agents.Get(ctx, card.ID) + if err == nil && existing.Status != card.Status { + status := card.Status + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + Status: &status, + }) + if updateErr != nil { + c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr) + } + } + + mergedAgents = append(mergedAgents, card) + } + + // 6. Broadcast the full merged fleet + c.broker.Broadcast("fleet.update", mergedAgents) + c.logger.Info("initial sync complete", "agents", len(mergedAgents)) + + return nil +} + +// mapSessionStatus converts a gateway session status string to an AgentStatus. +// - "running" / "streaming" → active +// - "error" → error +// - "done" / "" / other → idle +func mapSessionStatus(status string) models.AgentStatus { + switch status { + case "running", "streaming": + return models.AgentStatusActive + case "error": + return models.AgentStatusError + default: + return models.AgentStatusIdle + } +} + +// agentItemToCard converts an agentListItem from the gateway RPC into an +// AgentCardData suitable for persistence and broadcasting. +func agentItemToCard(item agentListItem) models.AgentCardData { + role := item.Role + if role == "" { + role = "agent" + } + channel := item.Channel + if channel == "" { + channel = "discord" + } + name := item.Name + if name == "" { + name = item.ID + } + + return models.AgentCardData{ + ID: item.ID, + DisplayName: name, + Role: role, + Status: models.AgentStatusIdle, // default; overridden by session merge + SessionKey: "", + Channel: channel, + LastActivity: time.Now().UTC().Format(time.RFC3339), + } +} \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go new file mode 100644 index 0000000..88db477 --- /dev/null +++ b/go-backend/internal/gateway/wsclient.go @@ -0,0 +1,443 @@ +// Package gateway provides WebSocket client integration with the OpenClaw +// gateway using WS protocol v3. The WSClient handles connection, handshake, +// frame routing, request/response correlation, and automatic reconnection +// with exponential backoff (1s → 30s max). +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository" + + "github.com/google/uuid" + "github.com/gorilla/websocket" +) + +// WSConfig holds WebSocket client configuration, typically loaded from +// environment variables. AuthToken must be set to a valid OpenClaw gateway +// operator token. +type WSConfig struct { + URL string // e.g. "ws://host.docker.internal:18789/" + AuthToken string // from OPENCLAW_GATEWAY_TOKEN +} + +// DefaultWSConfig returns sensible defaults for local development. +func DefaultWSConfig() WSConfig { + return WSConfig{ + URL: "ws://localhost:18789/", + AuthToken: "", + } +} + +// eventHandler is a callback invoked when a named event arrives from the +// gateway. +type eventHandler func(json.RawMessage) + +// WSClient connects to the OpenClaw gateway over WebSocket, completes the +// v3 handshake, routes incoming frames, and automatically reconnects on +// disconnect with exponential backoff (1s → 30s max). +type WSClient struct { + config WSConfig + conn *websocket.Conn + connMu sync.Mutex // protects conn for writes + pending map[string]chan<- json.RawMessage + mu sync.Mutex // protects pending and handlers + agents repository.AgentRepo + broker *handler.Broker + logger *slog.Logger + handlers map[string][]eventHandler + connID string // set after successful hello-ok + restClient *Client // optional REST client to notify on WS ready +} + +// NewWSClient returns a WSClient wired to the given repository and broker. +func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Broker, logger *slog.Logger) *WSClient { + if logger == nil { + logger = slog.Default() + } + return &WSClient{ + config: cfg, + pending: make(map[string]chan<- json.RawMessage), + agents: agents, + broker: broker, + logger: logger, + handlers: make(map[string][]eventHandler), + } +} + +// SetRESTClient wires the REST fallback client so the WS client can notify +// it when the WS connection is ready. Call this before Start. +func (c *WSClient) SetRESTClient(rest *Client) { + c.restClient = rest +} + +// OnEvent registers a handler for the given event name. Handlers are called +// when an incoming frame with type "event" and matching event name is +// received. Safe to call before Start. +func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) { + c.mu.Lock() + defer c.mu.Unlock() + c.handlers[event] = append(c.handlers[event], handler) +} + +// ── Frame types ────────────────────────────────────────────────────────── + +// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol. +type wsFrame struct { + Type string `json:"type"` // "req", "res", "event" + ID string `json:"id,omitempty"` // request/response correlation + Method string `json:"method,omitempty"` // method name (req/res frames) + Event string `json:"event,omitempty"` // event name (event frames) + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *wsError `json:"error,omitempty"` +} + +// wsError represents an error in a response frame. +type wsError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// connectRequest builds the initial connect handshake payload. +type connectRequest struct { + MinProtocol int `json:"minProtocol"` + MaxProtocol int `json:"maxProtocol"` + Client connectClientInfo `json:"client"` + Role string `json:"role"` + Scopes []string `json:"scopes"` + Auth connectAuth `json:"auth"` +} + +type connectClientInfo struct { + ID string `json:"id"` + Version string `json:"version"` + Platform string `json:"platform"` + Mode string `json:"mode"` +} + +type connectAuth struct { + Token string `json:"token"` +} + +// helloOKResponse represents the expected response to a successful connect. +type helloOKResponse struct { + ConnID string `json:"connId"` + Features struct { + Methods []string `json:"methods"` + Events []string `json:"events"` + } `json:"features"` +} + +// ── Start loop ─────────────────────────────────────────────────────────── + +// Start connects to the gateway, completes the handshake, and begins the +// read loop. On disconnect it reconnects with exponential backoff (1s → 30s). +// On ctx cancellation it performs a clean shutdown. +func (c *WSClient) Start(ctx context.Context) { + backoff := 1 * time.Second + maxBackoff := 30 * time.Second + + for { + err := c.connectAndRun(ctx) + if err != nil { + if ctx.Err() != nil { + c.logger.Info("ws client stopped (context cancelled)") + return + } + c.logger.Warn("ws client disconnected, reconnecting", + "error", err, + "backoff", backoff) + } + + select { + case <-ctx.Done(): + c.logger.Info("ws client stopped during backoff (context cancelled)") + return + case <-time.After(backoff): + // Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + +// connectAndRun dials the gateway, completes the handshake, and runs the +// read loop until an error occurs or ctx is cancelled. +func (c *WSClient) connectAndRun(ctx context.Context) error { + c.logger.Info("ws client connecting", "url", c.config.URL) + + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + conn, _, err := dialer.DialContext(ctx, c.config.URL, nil) + if err != nil { + return fmt.Errorf("dial failed: %w", err) + } + + c.connMu.Lock() + c.conn = conn + c.connMu.Unlock() + + defer conn.Close() + + // Step 1: Read the connect.challenge frame + if err := c.readChallenge(conn); err != nil { + return fmt.Errorf("handshake challenge: %w", err) + } + + // Step 2: Send connect request and read hello-ok response + helloOK, err := c.sendConnect(conn) + if err != nil { + return fmt.Errorf("handshake connect: %w", err) + } + + c.logger.Info("ws client handshake complete", + "connId", helloOK.ConnID, + "methods", helloOK.Features.Methods, + "events", helloOK.Features.Events) + + c.connMu.Lock() + c.connID = helloOK.ConnID + c.connMu.Unlock() + + // Notify REST client that WS is live so it stands down + if c.restClient != nil { + c.restClient.MarkWSReady() + c.logger.Info("ws client notified REST fallback to stand down") + } + + // Step 3: Initial sync — fetch agents + sessions from gateway + if err := c.initialSync(ctx); err != nil { + c.logger.Warn("initial sync failed, continuing with read loop", "error", err) + } + + // Step 4: Register live event handlers + c.registerEventHandlers() + + // Step 5: Read loop — blocks until disconnect or ctx cancel + return c.readLoop(ctx, conn) +} + +// readChallenge reads the first frame from the gateway, which must be a +// connect.challenge event. +func (c *WSClient) readChallenge(conn *websocket.Conn) error { + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + return fmt.Errorf("read challenge: %w", err) + } + + if frame.Type != "event" || frame.Event != "connect.challenge" { + return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event) + } + + c.logger.Debug("received connect.challenge") + return nil +} + +// sendConnect sends the connect request and waits for the hello-ok response. +func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) { + reqID := uuid.New().String() + params := connectRequest{ + MinProtocol: 3, + MaxProtocol: 3, + Client: connectClientInfo{ + ID: "control-center", + Version: "1.0", + Platform: "server", + Mode: "operator", + }, + Role: "operator", + Scopes: []string{"operator.read"}, + Auth: connectAuth{ + Token: c.config.AuthToken, + }, + } + + paramsJSON, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal connect params: %w", err) + } + + reqFrame := wsFrame{ + Type: "req", + ID: reqID, + Method: "connect", + Params: paramsJSON, + } + + if err := conn.WriteJSON(reqFrame); err != nil { + return nil, fmt.Errorf("write connect request: %w", err) + } + + // Read response + var resFrame wsFrame + if err := conn.ReadJSON(&resFrame); err != nil { + return nil, fmt.Errorf("read connect response: %w", err) + } + + if resFrame.Error != nil { + return nil, fmt.Errorf("connect rejected: code=%d msg=%s", resFrame.Error.Code, resFrame.Error.Message) + } + + if resFrame.ID != reqID { + return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID) + } + + var helloOK helloOKResponse + if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil { + return nil, fmt.Errorf("parse hello-ok: %w", err) + } + + return &helloOK, nil +} + +// readLoop continuously reads frames from the connection and routes them. +// It returns on read error or context cancellation. +func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error { + for { + select { + case <-ctx.Done(): + // Clean shutdown: send close frame + c.connMu.Lock() + c.conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"), + time.Now().Add(5*time.Second), + ) + c.connMu.Unlock() + return ctx.Err() + default: + } + + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + c.logger.Info("ws connection closed by server") + return nil + } + if websocket.IsUnexpectedCloseError(err) { + c.logger.Warn("ws connection unexpectedly closed", "error", err) + return err + } + return fmt.Errorf("read frame: %w", err) + } + + c.routeFrame(frame) + } +} + +// routeFrame dispatches a received frame to the appropriate handler. +func (c *WSClient) routeFrame(frame wsFrame) { + switch frame.Type { + case "res": + c.handleResponse(frame) + case "event": + c.handleEvent(frame) + default: + c.logger.Debug("unknown frame type", "type", frame.Type, "id", frame.ID) + } +} + +// handleResponse correlates a response frame to a pending request channel. +func (c *WSClient) handleResponse(frame wsFrame) { + c.mu.Lock() + ch, ok := c.pending[frame.ID] + if ok { + delete(c.pending, frame.ID) + } + c.mu.Unlock() + + if !ok { + c.logger.Warn("received response for unknown request", "id", frame.ID) + return + } + + if frame.Error != nil { + ch <- nil + return + } + + ch <- frame.Result +} + +// handleEvent dispatches an event frame to registered handlers. +func (c *WSClient) handleEvent(frame wsFrame) { + c.mu.Lock() + handlers := c.handlers[frame.Event] + c.mu.Unlock() + + if len(handlers) == 0 { + c.logger.Debug("unhandled event", "event", frame.Event) + return + } + + for _, h := range handlers { + h(frame.Params) + } +} + +// ── Send (RPC) ────────────────────────────────────────────────────────── + +// Send sends a JSON-RPC request to the gateway and returns the response +// payload. It is safe for concurrent use. +func (c *WSClient) Send(method string, params any) (json.RawMessage, error) { + reqID := uuid.New().String() + + var paramsJSON json.RawMessage + if params != nil { + var err error + paramsJSON, err = json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal params: %w", err) + } + } + + // Register pending response channel + respCh := make(chan json.RawMessage, 1) + c.mu.Lock() + c.pending[reqID] = respCh + c.mu.Unlock() + + defer func() { + c.mu.Lock() + delete(c.pending, reqID) + c.mu.Unlock() + }() + + // Build and send frame + frame := wsFrame{ + Type: "req", + ID: reqID, + Method: method, + Params: paramsJSON, + } + + c.connMu.Lock() + err := c.conn.WriteJSON(frame) + c.connMu.Unlock() + + if err != nil { + return nil, fmt.Errorf("write request: %w", err) + } + + // Wait for response with timeout + select { + case resp := <-respCh: + if resp == nil { + return nil, fmt.Errorf("gateway returned error for request %s (%s)", reqID, method) + } + return resp, nil + case <-time.After(30 * time.Second): + return nil, fmt.Errorf("request %s (%s) timed out", reqID, method) + } +} \ No newline at end of file From d9a1640b10131c43e95bee7a0f90cab4567ff029 Mon Sep 17 00:00:00 2001 From: dex-bot Date: Wed, 20 May 2026 16:29:57 +0000 Subject: [PATCH 2/4] CUB-200: sync CI workflows with dev branch - Overwrite dev.yml with dev's consolidated version (parameterized Go/Node versions, cleaner install steps) - Add deploy-dev.yaml from dev (was missing on this branch) - build-dev.yaml confirmed absent (was deleted on dev in PR #45) --- .gitea/workflows/deploy-dev.yaml | 126 +++++++++++++++++++++++++++++++ .gitea/workflows/dev.yml | 16 ++-- 2 files changed, 135 insertions(+), 7 deletions(-) create mode 100644 .gitea/workflows/deploy-dev.yaml diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..72240f9 --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,126 @@ +name: Deploy (Dev) + +on: + repository_dispatch: + types: + - dev-build-success + workflow_dispatch: + +env: + BINARY_NAME: server + DEV_HOST: ${{ secrets.DEV_HOST }} + DEV_USER: ${{ secrets.DEV_USER }} + DEPLOY_BINARY_PATH: /opt/control-center/server + DEPLOY_FRONTEND_PATH: /usr/share/nginx/html + SERVICE_NAME: control-center-server + FRONTEND_SERVICE: nginx + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Download Go binary + uses: actions/download-artifact@v4 + with: + name: go-backend-binary + + - name: Download frontend dist + uses: actions/download-artifact@v4 + with: + name: frontend-dist + path: dist + + - name: Make binary executable + run: chmod +x ${{ env.BINARY_NAME }} + + - name: Generate deploy script + run: | + cat > deploy.sh <<'SCRIPT' + #!/usr/bin/env bash + set -euo pipefail + + BINARY="${1}" + FRONTEND_DIST="${2:-dist}" + BINARY_PATH="${3:-/opt/control-center/server}" + FRONTEND_PATH="${4:-/usr/share/nginx/html}" + BINARY_SERVICE="${5:-control-center-server}" + FRONTEND_SERVICE="${6:-nginx}" + + TIMESTAMP=$(date +%Y%m%d%H%M%S) + BACKUP="${BINARY_PATH}.${TIMESTAMP}.bak" + + echo "=== deploy backend ===" + + if [ -f "$BINARY_PATH" ]; then + echo "backing up current binary" + cp "$BINARY_PATH" "$BACKUP" + fi + + echo "installing new binary" + cp "$BINARY" "$BINARY_PATH" + chmod +x "$BINARY_PATH" + + echo "restarting service" + systemctl reload-or-restart "$BINARY_SERVICE" || systemctl restart "$BINARY_SERVICE" + + sleep 3 + + if ! systemctl is-active --quiet "$BINARY_SERVICE"; then + echo "FAILED: $BINARY_SERVICE did not start — rolling back" + if [ -f "$BACKUP" ]; then + cp "$BACKUP" "$BINARY_PATH" + systemctl restart "$BINARY_SERVICE" + fi + exit 1 + fi + + echo "backend deploy ok — keeping last 3 backups" + ls -t "${BINARY_PATH}."*.bak 2>/dev/null | tail -n +4 | xargs -r rm -f + + echo "=== deploy frontend ===" + if [ -d "$FRONTEND_DIST" ] && [ -n "$(ls -A "$FRONTEND_DIST" 2>/dev/null)" ]; then + rsync -a --delete "$FRONTEND_DIST/" "$FRONTEND_PATH/" + systemctl reload "$FRONTEND_SERVICE" 2>/dev/null ||: + echo "frontend deploy ok" + fi + + echo "=== deploy complete ===" + SCRIPT + chmod +x deploy.sh + + - name: Copy artifacts to dev server + uses: appleboy/scp-action@v0.1.7 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + source: "${{ env.BINARY_NAME }},deploy.sh,dist" + target: "/tmp/control-center-deploy" + + - name: Execute deploy on dev server + uses: appleboy/ssh-action@v1 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + script: | + set -euo pipefail + cd /tmp/control-center-deploy + sudo ./deploy.sh \ + "${{ env.BINARY_NAME }}" \ + "dist" \ + "${{ env.DEPLOY_BINARY_PATH }}" \ + "${{ env.DEPLOY_FRONTEND_PATH }}" \ + "${{ env.SERVICE_NAME }}" \ + "${{ env.FRONTEND_SERVICE }}" + rm -rf /tmp/control-center-deploy + + - name: Notify on failure + if: failure() + uses: appleboy/ssh-action@v1 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + script: | + echo "deploy failed — commit ${{ github.sha }}" > /tmp/control-center-deploy-failure.log \ No newline at end of file diff --git a/.gitea/workflows/dev.yml b/.gitea/workflows/dev.yml index 248555c..7ddd342 100644 --- a/.gitea/workflows/dev.yml +++ b/.gitea/workflows/dev.yml @@ -7,6 +7,8 @@ on: branches: [dev] env: + GO_VERSION: "1.23" + NODE_VERSION: "22" REGISTRY: code.cubecraftcreations.com BACKEND_IMAGE: ${{ gitea.repository }}/backend FRONTEND_IMAGE: ${{ gitea.repository }}/frontend @@ -18,11 +20,16 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Setup Go + - name: Install Go run: | - curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz + curl -fsSL "https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz" | sudo tar -C /usr/local -xz echo "/usr/local/go/bin" >> $GITHUB_PATH + - name: Install Node.js + run: | + curl -fsSL "https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.xz" | sudo tar -C /usr/local --strip-components=1 -xJ + echo "/usr/local/bin" >> $GITHUB_PATH + - name: Run backend tests run: go test ./... working-directory: ./go-backend @@ -31,11 +38,6 @@ jobs: run: go build -ldflags="-w -s" -o /tmp/server ./cmd/server working-directory: ./go-backend - - name: Setup Node - run: | - curl -sL https://deb.nodesource.com/setup_22.x | bash - - apt-get install -y nodejs - - name: Install frontend deps run: npm ci working-directory: ./frontend From d370d5ec235052aa067bcd4f480b5a05d177408c Mon Sep 17 00:00:00 2001 From: rex-bot Date: Wed, 20 May 2026 21:42:31 +0000 Subject: [PATCH 3/4] =?UTF-8?q?CUB-200:=20fix=20WS=20initial=20sync=20orde?= =?UTF-8?q?ring=20=E2=80=94=20start=20readLoop=20before=20initialSync?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The root cause of the initial sync timeout was that connectAndRun called initialSync (which uses Send/RPC) before starting readLoop, but Send's response delivery depends on readLoop→routeFrame→handleResponse. Without the readLoop running, agents.list and sessions.list would always time out. Fix: start readLoop in a goroutine before calling initialSync so that RPC responses are properly routed back to pending Send() calls. After initialSync completes, event handlers are registered and MarkWSReady is called. The connectAndRun function then blocks on the readLoop goroutine's completion. Also added TestConnectAndRun_InitialSyncOrdering which verifies that agents are persisted from initial sync (would hang/timeout under the old ordering). --- go-backend/internal/gateway/wsclient.go | 37 +++++--- go-backend/internal/gateway/wsclient_test.go | 99 ++++++++++++++++++++ 2 files changed, 125 insertions(+), 11 deletions(-) diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 9500809..db7f3b9 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -229,7 +229,29 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.connId = helloOK.ConnID c.connMu.Unlock() - // Notify REST client that WS is live so it stands down + // Step 2b: Start the read loop in a goroutine so that Send() in + // initialSync can receive responses. The read loop goroutine will + // continue running after initialSync completes, routing live events + // and any future RPC responses. + readLoopErrCh := make(chan error, 1) + go func() { + readLoopErrCh <- c.readLoop(ctx, conn) + }() + + // Step 2c: Initial sync — fetch agents + sessions from gateway. + // This now works because the read loop is active and will route + // response frames back to Send() via handleResponse. + if err := c.initialSync(ctx); err != nil { + c.logger.Warn("initial sync failed, will continue with read loop", "error", err) + } + + // Step 2d: Register live event handlers (read loop is already + // active, so events will be dispatched immediately) + c.registerEventHandlers() + + // Notify REST client that WS is live so it stands down. + // This must happen AFTER initialSync so that the REST poller + // doesn't start polling while we're still syncing. if c.restClient != nil { c.restClient.MarkWSReady() c.logger.Info("ws client notified REST fallback to stand down") @@ -238,16 +260,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { // Reset wsReadyOnce so MarkWSReady can fire again after a reconnect c.wsReadyOnce = sync.Once{} - // Step 2b: Initial sync — fetch agents + sessions from gateway - if err := c.initialSync(ctx); err != nil { - c.logger.Warn("initial sync failed, will continue with read loop", "error", err) - } - - // Step 2c: Register live event handlers - c.registerEventHandlers() - - // Step 3: Read loop - return c.readLoop(ctx, conn) + // Step 3: Wait for the read loop goroutine to finish (blocks + // until the connection drops or context is cancelled). + return <-readLoopErrCh } // readChallenge reads the first frame from the gateway, which must be a diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go index 92a1d66..300a32d 100644 --- a/go-backend/internal/gateway/wsclient_test.go +++ b/go-backend/internal/gateway/wsclient_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" "github.com/gorilla/websocket" @@ -466,6 +467,104 @@ func TestAgentItemToCard(t *testing.T) { }) } +// ── 6. Test: Initial sync ordering (readLoop active before Send) ────────── + +// TestConnectAndRun_InitialSyncOrdering verifies that the WS client +// completes initial sync successfully. This test would hang/timeout if +// readLoop were NOT started before initialSync, because Send() relies on +// readLoop→routeFrame→handleResponse to deliver RPC responses. +func TestConnectAndRun_InitialSyncOrdering(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + // Handshake + handleHandshake(t, conn) + + // After handshake, respond to RPCs + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + var result any + switch method { + case "agents.list": + result = []map[string]any{ + {"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"}, + {"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"}, + } + case "sessions.list": + result = []map[string]any{ + {"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"}, + } + default: + result = map[string]any{} + } + + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": result, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait for initial sync to complete by checking repo state. + // The agents should be persisted from the RPC responses. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + repo.mu.Lock() + _, ottoOK := repo.agents["otto"] + _, dexOK := repo.agents["dex"] + repo.mu.Unlock() + if ottoOK && dexOK { + break + } + time.Sleep(50 * time.Millisecond) + } + + repo.mu.Lock() + _, ottoOK := repo.agents["otto"] + _, dexOK := repo.agents["dex"] + repo.mu.Unlock() + + if !ottoOK { + t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()") + } + if !dexOK { + t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()") + } + + cancel() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly") + } +} + func TestStrPtr(t *testing.T) { s := "hello" p := strPtr(s) From b7b05bb4e341201a31122a2d0cb2aae397d70fc8 Mon Sep 17 00:00:00 2001 From: rex-bot Date: Wed, 20 May 2026 21:52:39 +0000 Subject: [PATCH 4/4] =?UTF-8?q?CUB-200:=20fix=20event-loss=20race=20?= =?UTF-8?q?=E2=80=94=20register=20handlers=20before=20readLoop=20starts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move registerEventHandlers() call before the readLoop goroutine starts in connectAndRun(). This eliminates the startup window where live gateway events were actively read and dropped as 'unhandled' because handler registration happened only after initialSync completed. The handlers only depend on c.agents and c.broker, which are wired in the constructor — they do not require initialSync to have completed. Also adds TestConnectAndRun_EventNotLostDuringSync regression test that sends a live presence event during initial sync and asserts it is not lost. All gateway tests pass with -race. --- go-backend/internal/gateway/wsclient.go | 21 +-- go-backend/internal/gateway/wsclient_test.go | 132 +++++++++++++++++++ 2 files changed, 145 insertions(+), 8 deletions(-) diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index db7f3b9..709882e 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -229,26 +229,31 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.connId = helloOK.ConnID c.connMu.Unlock() - // Step 2b: Start the read loop in a goroutine so that Send() in + // Step 2b: Register live event handlers BEFORE starting the read + // loop. This eliminates the race window where readLoop dispatches + // live events as "unhandled" because no handlers are registered yet. + // The handlers only depend on c.agents and c.broker, which are wired + // in the constructor — they do not need initialSync to have completed. + c.registerEventHandlers() + + // Step 2c: Start the read loop in a goroutine so that Send() in // initialSync can receive responses. The read loop goroutine will // continue running after initialSync completes, routing live events - // and any future RPC responses. + // and any future RPC responses. Because handlers are already + // registered, any events arriving during or after initialSync are + // dispatched correctly. readLoopErrCh := make(chan error, 1) go func() { readLoopErrCh <- c.readLoop(ctx, conn) }() - // Step 2c: Initial sync — fetch agents + sessions from gateway. - // This now works because the read loop is active and will route + // Step 2d: Initial sync — fetch agents + sessions from gateway. + // This works because the read loop is active and will route // response frames back to Send() via handleResponse. if err := c.initialSync(ctx); err != nil { c.logger.Warn("initial sync failed, will continue with read loop", "error", err) } - // Step 2d: Register live event handlers (read loop is already - // active, so events will be dispatched immediately) - c.registerEventHandlers() - // Notify REST client that WS is live so it stands down. // This must happen AFTER initialSync so that the REST poller // doesn't start polling while we're still syncing. diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go index 300a32d..8e37a6a 100644 --- a/go-backend/internal/gateway/wsclient_test.go +++ b/go-backend/internal/gateway/wsclient_test.go @@ -565,6 +565,138 @@ func TestConnectAndRun_InitialSyncOrdering(t *testing.T) { } } +// ── 7. Test: Event not lost during initial sync (regression) ─────────────── + +// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events +// arriving during initial sync are NOT dropped. This is a regression test +// for the race where readLoop started before registerEventHandlers(), +// causing events read during that window to be logged as "unhandled" and lost. +// +// The mock server sends a live event (sessions.changed) right after the +// handshake, interleaved with the RPC responses for agents.list and +// sessions.list. The test asserts the event is received by the handler. +func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + // Pre-seed an agent so the event handler can update it. + repo.agents["otto"] = models.AgentCardData{ + ID: "otto", + DisplayName: "Otto", + Status: models.AgentStatusIdle, + } + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + // Handshake + handleHandshake(t, conn) + + // After handshake, process RPCs and inject a live event. + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + // Respond to agents.list RPC + if method == "agents.list" { + // Before responding, inject a live event — simulates + // a gateway pushing a presence update during sync. + evt := map[string]any{ + "type": "event", + "event": "presence", + "params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"}, + } + if err := conn.WriteJSON(evt); err != nil { + break + } + + // Now send the RPC response + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": []map[string]any{ + {"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"}, + }, + } + if err := conn.WriteJSON(res); err != nil { + break + } + continue + } + + // Respond to sessions.list RPC + if method == "sessions.list" { + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": []map[string]any{}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + continue + } + + // Default response for other methods + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": map[string]any{}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait for the presence event to be processed by checking the repo. + // The presence handler updates the agent, so we check for the + // lastActivityAt change. + deadline := time.Now().Add(5 * time.Second) + var lastActivity string + for time.Now().Before(deadline) { + repo.mu.Lock() + if a, ok := repo.agents["otto"]; ok { + lastActivity = a.LastActivity + } + repo.mu.Unlock() + if lastActivity == "2025-05-20T12:30:00Z" { + break + } + time.Sleep(50 * time.Millisecond) + } + + if lastActivity != "2025-05-20T12:30:00Z" { + t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z") + } + + cancel() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly") + } +} + func TestStrPtr(t *testing.T) { s := "hello" p := strPtr(s)