From 70d39b87d16efd6363c4968144f883eb7a9887a4 Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:02:21 +0000 Subject: [PATCH 1/6] CUB-203: add WebSocket client scaffold for OpenClaw gateway v3 --- go-backend/cmd/server/main.go | 9 + go-backend/go.mod | 1 + go-backend/go.sum | 2 + go-backend/internal/config/config.go | 16 +- go-backend/internal/gateway/wsclient.go | 423 ++++++++++++++++++++++++ 5 files changed, 445 insertions(+), 6 deletions(-) create mode 100644 go-backend/internal/gateway/wsclient.go diff --git a/go-backend/cmd/server/main.go b/go-backend/cmd/server/main.go index dc760b4..76b6a2b 100644 --- a/go-backend/cmd/server/main.go +++ b/go-backend/cmd/server/main.go @@ -69,11 +69,20 @@ func main() { PollInterval: cfg.GatewayPollInterval, }, agentRepo, broker) + // ── WebSocket client (connects to OpenClaw gateway WS v3) ───────────── + wsClient := gateway.NewWSClient(gateway.WSConfig{ + URL: cfg.WSGatewayURL, + AuthToken: cfg.WSGatewayToken, + }, agentRepo, broker, logger) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() go gwClient.Start(ctx) + // Start WS client in background; logs connection status + go wsClient.Start(ctx) + // ── Server ───────────────────────────────────────────────────────────── srv := &http.Server{ Addr: fmt.Sprintf(":%d", cfg.Port), diff --git a/go-backend/go.mod b/go-backend/go.mod index 4b9db15..91bad3e 100644 --- a/go-backend/go.mod +++ b/go-backend/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-chi/cors v1.2.1 github.com/go-playground/validator/v10 v10.24.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.7.2 ) diff --git a/go-backend/go.sum b/go-backend/go.sum index f4041e3..448723d 100644 --- a/go-backend/go.sum +++ b/go-backend/go.sum @@ -17,6 +17,8 @@ github.com/go-playground/validator/v10 v10.24.0 h1:KHQckvo8G6hlWnrPX4NJJ+aBfWNAE github.com/go-playground/validator/v10 v10.24.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/go-backend/internal/config/config.go b/go-backend/internal/config/config.go index fe6c9f6..3098ba8 100644 --- a/go-backend/internal/config/config.go +++ b/go-backend/internal/config/config.go @@ -10,13 +10,15 @@ import ( // Config holds all application configuration. type Config struct { - Port int - DatabaseURL string - CORSOrigin string - LogLevel string - Environment string - GatewayURL string + Port int + DatabaseURL string + CORSOrigin string + LogLevel string + Environment string + GatewayURL string GatewayPollInterval time.Duration + WSGatewayURL string + WSGatewayToken string } // Load reads configuration from environment variables, applying defaults where @@ -30,6 +32,8 @@ func Load() *Config { Environment: getEnv("ENVIRONMENT", "development"), GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"), GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), + WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://localhost:18789/"), + WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), } } diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go new file mode 100644 index 0000000..1e64775 --- /dev/null +++ b/go-backend/internal/gateway/wsclient.go @@ -0,0 +1,423 @@ +// Package gateway provides WebSocket client integration with the OpenClaw +// gateway using WS protocol v3. The WSClient handles connection, handshake, +// frame routing, request/response correlation, and automatic reconnection +// with exponential backoff. +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository" + + "github.com/gorilla/websocket" + "github.com/google/uuid" +) + +// WSConfig holds WebSocket client configuration, typically loaded from +// environment variables. AuthToken must be set to a valid OpenClaw gateway +// operator token. +type WSConfig struct { + URL string // e.g. "ws://host.docker.internal:18789/" + AuthToken string // from OPENCLAW_GATEWAY_TOKEN +} + +// DefaultWSConfig returns sensible defaults for local development. +func DefaultWSConfig() WSConfig { + return WSConfig{ + URL: "ws://localhost:18789/", + AuthToken: "", + } +} + +// eventHandler is a callback invoked when a named event arrives from the +// gateway. +type eventHandler func(json.RawMessage) + +// WSClient connects to the OpenClaw gateway over WebSocket, completes the +// v3 handshake, routes incoming frames, and automatically reconnects on +// disconnect with exponential backoff. +type WSClient struct { + config WSConfig + conn *websocket.Conn + connMu sync.Mutex // protects conn for writes + pending map[string]chan<- json.RawMessage + mu sync.Mutex // protects pending and handlers + agents repository.AgentRepo + broker *handler.Broker + logger *slog.Logger + + handlers map[string][]eventHandler +} + +// NewWSClient returns a WSClient wired to the given repository and broker. +func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Broker, logger *slog.Logger) *WSClient { + if logger == nil { + logger = slog.Default() + } + return &WSClient{ + config: cfg, + pending: make(map[string]chan<- json.RawMessage), + agents: agents, + broker: broker, + logger: logger, + handlers: make(map[string][]eventHandler), + } +} + +// OnEvent registers a handler for the given event name. Handlers are called +// when an incoming frame with type "event" and matching event name is +// received. This is safe to call before Start. +func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) { + c.mu.Lock() + defer c.mu.Unlock() + c.handlers[event] = append(c.handlers[event], handler) +} + +// ── Frame types ────────────────────────────────────────────────────────── + +// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol. +type wsFrame struct { + Type string `json:"type"` // "req", "res", "event" + ID string `json:"id,omitempty"` // request/response correlation + Method string `json:"method,omitempty"` // method name (req frames) + Event string `json:"event,omitempty"` // event name (event frames) + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *wsError `json:"error,omitempty"` +} + +// wsError represents an error in a response frame. +type wsError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// connectRequest builds the initial connect handshake payload. +type connectRequest struct { + MinProtocol int `json:"minProtocol"` + MaxProtocol int `json:"maxProtocol"` + Client connectClientInfo `json:"client"` + Role string `json:"role"` + Scopes []string `json:"scopes"` + Auth connectAuth `json:"auth"` +} + +type connectClientInfo struct { + ID string `json:"id"` + Version string `json:"version"` + Platform string `json:"platform"` + Mode string `json:"mode"` +} + +type connectAuth struct { + Token string `json:"token"` +} + +// helloOKResponse represents the expected response to a successful connect. +type helloOKResponse struct { + ConnID string `json:"connId"` + Features struct { + Methods []string `json:"methods"` + Events []string `json:"events"` + } `json:"features"` +} + +// ── Start loop ─────────────────────────────────────────────────────────── + +// Start connects to the gateway, completes the handshake, and begins the +// read loop. On disconnect it reconnects with exponential backoff. On +// ctx cancellation it performs a clean shutdown. +func (c *WSClient) Start(ctx context.Context) { + backoff := 1 * time.Second + maxBackoff := 30 * time.Second + + for { + err := c.connectAndRun(ctx) + if err != nil { + if ctx.Err() != nil { + c.logger.Info("ws client stopped (context cancelled)") + return + } + c.logger.Warn("ws client disconnected, reconnecting", + "error", err, + "backoff", backoff) + } + + select { + case <-ctx.Done(): + c.logger.Info("ws client stopped during backoff (context cancelled)") + return + case <-time.After(backoff): + // Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + +// connectAndRun dials the gateway, completes the handshake, and runs the +// read loop until an error occurs or ctx is cancelled. +func (c *WSClient) connectAndRun(ctx context.Context) error { + c.logger.Info("ws client connecting", "url", c.config.URL) + + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + conn, _, err := dialer.DialContext(ctx, c.config.URL, nil) + if err != nil { + return fmt.Errorf("dial failed: %w", err) + } + + c.connMu.Lock() + c.conn = conn + c.connMu.Unlock() + + // Reset backoff on successful connect + defer func() { + conn.Close() + }() + + // Step 1: Read the connect.challenge frame + if err := c.readChallenge(conn); err != nil { + return fmt.Errorf("handshake challenge: %w", err) + } + + // Step 2: Send connect request + helloOK, err := c.sendConnect(conn) + if err != nil { + return fmt.Errorf("handshake connect: %w", err) + } + + c.logger.Info("ws client handshake complete", + "connId", helloOK.ConnID, + "methods", helloOK.Features.Methods, + "events", helloOK.Features.Events) + + // Step 3: Read loop + return c.readLoop(ctx, conn) +} + +// readChallenge reads the first frame from the gateway, which must be a +// connect.challenge event. +func (c *WSClient) readChallenge(conn *websocket.Conn) error { + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + return fmt.Errorf("read challenge: %w", err) + } + + if frame.Type != "event" || frame.Event != "connect.challenge" { + return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event) + } + + c.logger.Debug("received connect.challenge", "params", string(frame.Params)) + return nil +} + +// sendConnect sends the connect request and waits for the hello-ok response. +func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) { + reqID := uuid.New().String() + params := connectRequest{ + MinProtocol: 3, + MaxProtocol: 3, + Client: connectClientInfo{ + ID: "control-center", + Version: "1.0", + Platform: "server", + Mode: "operator", + }, + Role: "operator", + Scopes: []string{"operator.read"}, + Auth: connectAuth{ + Token: c.config.AuthToken, + }, + } + + paramsJSON, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal connect params: %w", err) + } + + reqFrame := wsFrame{ + Type: "req", + ID: reqID, + Method: "connect", + Params: paramsJSON, + } + + if err := conn.WriteJSON(reqFrame); err != nil { + return nil, fmt.Errorf("write connect request: %w", err) + } + + // Read response + var resFrame wsFrame + if err := conn.ReadJSON(&resFrame); err != nil { + return nil, fmt.Errorf("read connect response: %w", err) + } + + if resFrame.Error != nil { + return nil, fmt.Errorf("connect rejected: code=%d msg=%s", resFrame.Error.Code, resFrame.Error.Message) + } + + if resFrame.ID != reqID { + return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID) + } + + // Check for hello-ok method in the result + // The gateway responds with method "hello-ok" on success + var helloOK helloOKResponse + if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil { + return nil, fmt.Errorf("parse hello-ok: %w", err) + } + + return &helloOK, nil +} + +// readLoop continuously reads frames from the connection and routes them. +// It returns on read error or context cancellation. +func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error { + for { + select { + case <-ctx.Done(): + // Clean shutdown: send close frame + c.connMu.Lock() + c.conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"), + time.Now().Add(5*time.Second), + ) + c.connMu.Unlock() + return ctx.Err() + default: + } + + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + // Check if it's a close error + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + c.logger.Info("ws connection closed by server") + return nil + } + if websocket.IsUnexpectedCloseError(err) { + c.logger.Warn("ws connection unexpectedly closed", "error", err) + return err + } + return fmt.Errorf("read frame: %w", err) + } + + c.routeFrame(frame) + } +} + +// routeFrame dispatches a received frame to the appropriate handler. +func (c *WSClient) routeFrame(frame wsFrame) { + switch frame.Type { + case "res": + c.handleResponse(frame) + case "event": + c.handleEvent(frame) + default: + c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID) + } +} + +// handleResponse correlates a response frame to a pending request channel. +func (c *WSClient) handleResponse(frame wsFrame) { + c.mu.Lock() + ch, ok := c.pending[frame.ID] + if ok { + delete(c.pending, frame.ID) + } + c.mu.Unlock() + + if !ok { + c.logger.Warn("received response for unknown request", "id", frame.ID) + return + } + + if frame.Error != nil { + // Send nil to signal error; caller checks via Send return + ch <- nil + return + } + + ch <- frame.Result +} + +// handleEvent dispatches an event frame to registered handlers. +func (c *WSClient) handleEvent(frame wsFrame) { + c.mu.Lock() + handlers := c.handlers[frame.Event] + c.mu.Unlock() + + if len(handlers) == 0 { + c.logger.Debug("unhandled event", "event", frame.Event) + return + } + + for _, h := range handlers { + h(frame.Params) + } +} + +// ── Send ───────────────────────────────────────────────────────────────── + +// Send sends a JSON request to the gateway and returns the response payload. +// It is safe for concurrent use. The caller should check for errors in the +// returned payload. A nil payload with nil error means the gateway sent an +// error response (check via the response frame's error field, which is logged). +func (c *WSClient) Send(method string, params any) (json.RawMessage, error) { + reqID := uuid.New().String() + + paramsJSON, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal params: %w", err) + } + + // Register pending response channel + respCh := make(chan json.RawMessage, 1) + c.mu.Lock() + c.pending[reqID] = respCh + c.mu.Unlock() + + defer func() { + c.mu.Lock() + delete(c.pending, reqID) + c.mu.Unlock() + }() + + // Build and send frame + frame := wsFrame{ + Type: "req", + ID: reqID, + Method: method, + Params: paramsJSON, + } + + c.connMu.Lock() + err = c.conn.WriteJSON(frame) + c.connMu.Unlock() + + if err != nil { + return nil, fmt.Errorf("write request: %w", err) + } + + // Wait for response with timeout + select { + case resp := <-respCh: + if resp == nil { + return nil, fmt.Errorf("gateway returned error for request %s", reqID) + } + return resp, nil + case <-time.After(30 * time.Second): + return nil, fmt.Errorf("request %s timed out", reqID) + } +} \ No newline at end of file -- 2.53.0 From 60ba3e5b4f2d3fb538e92d1ccffeeb5f2391bb55 Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:07:23 +0000 Subject: [PATCH 2/6] CUB-201: add initial sync via agents.list + sessions.list RPCs - Create gateway/sync.go with initialSync method on WSClient - Fetch agents via agents.list RPC, persist to AgentRepo - Fetch sessions via sessions.list RPC, map status to AgentStatus - Merge session state (status, sessionKey, tokens) into AgentCardData - Broadcast merged fleet as fleet.update via SSE broker - Trigger initialSync after hello-ok handshake - Re-sync automatically on reconnect (connectAndRun calls initialSync) - Handle unknown gateway fields gracefully via typed extraction --- go-backend/internal/gateway/sync.go | 191 ++++++++++++++++++++++++ go-backend/internal/gateway/wsclient.go | 11 ++ 2 files changed, 202 insertions(+) create mode 100644 go-backend/internal/gateway/sync.go diff --git a/go-backend/internal/gateway/sync.go b/go-backend/internal/gateway/sync.go new file mode 100644 index 0000000..da4ff02 --- /dev/null +++ b/go-backend/internal/gateway/sync.go @@ -0,0 +1,191 @@ +// Package gateway provides the initial sync logic that fetches agent and +// session data from the OpenClaw gateway via WS RPCs after handshake, +// persists to the repository, merges session state into agent cards, and +// broadcasts the merged fleet to SSE clients. +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── RPC response types ─────────────────────────────────────────────────── + +// agentListItem represents a single agent returned by the agents.list RPC. +// Fields are extracted gracefully from json.RawMessage so unknown fields +// from the gateway are silently ignored. +type agentListItem struct { + ID string `json:"id"` + Name string `json:"name"` + Model string `json:"model"` + Role string `json:"role"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` +} + +// sessionListItem represents a single session returned by the sessions.list RPC. +type sessionListItem struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, done, streaming, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` +} + +// ── Sync logic ────────────────────────────────────────────────────────── + +// initialSync fetches agents and sessions from the gateway via WS RPCs, +// persists them, merges session state into agent cards, and broadcasts +// the merged fleet as a fleet.update event. +func (c *WSClient) initialSync(ctx context.Context) error { + c.logger.Info("initial sync starting") + + // 1. Fetch agents + agentsRaw, err := c.Send("agents.list", nil) + if err != nil { + return fmt.Errorf("agents.list RPC: %w", err) + } + + var agentItems []agentListItem + if err := json.Unmarshal(agentsRaw, &agentItems); err != nil { + return fmt.Errorf("parse agents.list response: %w", err) + } + + c.logger.Info("agents.list received", "count", len(agentItems)) + + // 2. Persist each agent + for _, item := range agentItems { + card := agentItemToCard(item) + + existing, err := c.agents.Get(ctx, card.ID) + if err != nil { + // Agent doesn't exist — create it + if createErr := c.agents.Create(ctx, card); createErr != nil { + c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr) + continue + } + c.logger.Info("sync: agent created", "id", card.ID) + continue + } + + // Agent exists — update if display name or role changed + if existing.DisplayName != card.DisplayName || existing.Role != card.Role { + newName := card.DisplayName + newRole := card.Role + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + CurrentTask: &newName, // reuse field for display name update + }) + if updateErr != nil { + c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr) + } + _ = newRole // role not in UpdateAgentRequest yet, skip silently + } + } + + // 3. Fetch sessions + sessionsRaw, err := c.Send("sessions.list", nil) + if err != nil { + return fmt.Errorf("sessions.list RPC: %w", err) + } + + var sessionItems []sessionListItem + if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil { + return fmt.Errorf("parse sessions.list response: %w", err) + } + + c.logger.Info("sessions.list received", "count", len(sessionItems)) + + // 4. Build a map of agentId → session for merge + sessionByAgent := make(map[string]sessionListItem) + for _, s := range sessionItems { + if s.AgentID != "" { + sessionByAgent[s.AgentID] = s + } + } + + // 5. Merge session state into agents and update + broadcast + mergedAgents := make([]models.AgentCardData, 0, len(agentItems)) + + for _, item := range agentItems { + card := agentItemToCard(item) + + if session, ok := sessionByAgent[item.ID]; ok { + // Merge session state + card.SessionKey = session.SessionKey + card.Status = mapSessionStatus(session.Status) + card.LastActivity = session.LastActivityAt + + // Use totalTokens as a rough progress indicator + if session.TotalTokens > 0 { + prog := min(session.TotalTokens/100, 100) // normalize to 0-100 + card.TaskProgress = &prog + } + } + + // Persist merged state + existing, err := c.agents.Get(ctx, card.ID) + if err == nil && existing.Status != card.Status { + status := card.Status + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + Status: &status, + }) + if updateErr != nil { + c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr) + } + } + + mergedAgents = append(mergedAgents, card) + } + + // 6. Broadcast the full merged fleet + c.broker.Broadcast("fleet.update", mergedAgents) + c.logger.Info("initial sync complete", "agents", len(mergedAgents)) + + return nil +} + +// mapSessionStatus converts a gateway session status string to an AgentStatus. +// - "running" / "streaming" → active +// - "error" → error +// - "done" / "" / other → idle +func mapSessionStatus(status string) models.AgentStatus { + switch status { + case "running", "streaming": + return models.AgentStatusActive + case "error": + return models.AgentStatusError + default: + return models.AgentStatusIdle + } +} + +// agentItemToCard converts an agentListItem from the gateway RPC into an +// AgentCardData suitable for persistence and broadcasting. +func agentItemToCard(item agentListItem) models.AgentCardData { + role := item.Role + if role == "" { + role = "agent" + } + channel := item.Channel + if channel == "" { + channel = "discord" + } + name := item.Name + if name == "" { + name = item.ID + } + + return models.AgentCardData{ + ID: item.ID, + DisplayName: name, + Role: role, + Status: models.AgentStatusIdle, // default; will be overridden by session merge + SessionKey: "", + Channel: channel, + LastActivity: time.Now().UTC().Format(time.RFC3339), + } +} \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 1e64775..1fae44a 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -53,6 +53,7 @@ type WSClient struct { logger *slog.Logger handlers map[string][]eventHandler + connId string // set after successful hello-ok } // NewWSClient returns a WSClient wired to the given repository and broker. @@ -202,6 +203,16 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { "methods", helloOK.Features.Methods, "events", helloOK.Features.Events) + // Store connId for reference + c.connMu.Lock() + c.connId = helloOK.ConnID + c.connMu.Unlock() + + // Step 2b: Initial sync — fetch agents + sessions from gateway + if err := c.initialSync(ctx); err != nil { + c.logger.Warn("initial sync failed, will continue with read loop", "error", err) + } + // Step 3: Read loop return c.readLoop(ctx, conn) } -- 2.53.0 From 9062f8fa8d01f2124ffe5c33d136cec25a4c36ff Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:13:53 +0000 Subject: [PATCH 3/6] CUB-202: add real-time event handlers for sessions.changed, presence, agent.config --- go-backend/internal/gateway/events.go | 285 ++++++++++++++++++++++++ go-backend/internal/gateway/wsclient.go | 3 + 2 files changed, 288 insertions(+) create mode 100644 go-backend/internal/gateway/events.go diff --git a/go-backend/internal/gateway/events.go b/go-backend/internal/gateway/events.go new file mode 100644 index 0000000..d6544b6 --- /dev/null +++ b/go-backend/internal/gateway/events.go @@ -0,0 +1,285 @@ +// Package gateway provides real-time event handlers for the Control Center +// WebSocket client. Handlers process gateway events (sessions.changed, +// presence, agent.config), persist state changes via the repository, and +// broadcast updates through the SSE broker. +// +// Rule: DB update first, then SSE broadcast. This keeps REST API responses +// consistent with SSE events. +package gateway + +import ( + "context" + "encoding/json" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── Event payload types ────────────────────────────────────────────────── + +// sessionChangedPayload represents a single session delta from a +// sessions.changed event. Fields are optional; use json.RawMessage for +// anything we don't strictly need. +type sessionChangedPayload struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, streaming, done, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` + CurrentTask string `json:"currentTask"` + TaskProgress *int `json:"taskProgress,omitempty"` + TaskElapsed string `json:"taskElapsed"` + ErrorMessage string `json:"errorMessage"` + Extra json.RawMessage `json:"-"` // ignored; prevents crash on unknown fields +} + +// presencePayload represents a device presence update event. +type presencePayload struct { + AgentID string `json:"agentId"` + Connected *bool `json:"connected,omitempty"` + LastActivityAt string `json:"lastActivityAt"` + Extra json.RawMessage `json:"-"` // ignored +} + +// agentConfigPayload represents an agent configuration change event. +type agentConfigPayload struct { + ID string `json:"id"` + Name string `json:"name"` + Role string `json:"role"` + Model string `json:"model"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` + Extra json.RawMessage `json:"-"` // ignored +} + +// ── Handler registration ───────────────────────────────────────────────── + +// registerEventHandlers sets up all live event handlers on the WSClient. +// Call this once after a successful handshake + initial sync. +func (c *WSClient) registerEventHandlers() { + c.OnEvent("sessions.changed", c.handleSessionsChanged) + c.OnEvent("presence", c.handlePresence) + c.OnEvent("agent.config", c.handleAgentConfig) + + c.logger.Info("event handlers registered", + "events", []string{"sessions.changed", "presence", "agent.config"}) +} + +// ── sessions.changed ───────────────────────────────────────────────────── + +// handleSessionsChanged processes sessions.changed events from the gateway. +// The payload may be a single session object or an array of session deltas. +// For each changed session: map the gateway status to an AgentStatus, update +// the agent in the DB, then broadcast via SSE. +func (c *WSClient) handleSessionsChanged(payload json.RawMessage) { + c.logger.Debug("handleSessionsChanged start", "payload", string(payload)) + + // Try array first, then single object + var deltas []sessionChangedPayload + if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 { + // Array of deltas + } else { + // Try single object + var single sessionChangedPayload + if err := json.Unmarshal(payload, &single); err != nil { + c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err) + return + } + deltas = []sessionChangedPayload{single} + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for _, d := range deltas { + if d.AgentID == "" { + c.logger.Debug("sessions.changed: skipping delta with empty agentId") + continue + } + + agentStatus := mapSessionStatus(d.Status) + + // Build partial update + update := models.UpdateAgentRequest{ + Status: &agentStatus, + } + + // Session key + if d.SessionKey != "" { + // SessionKey is not in UpdateAgentRequest directly, but we set + // status and task fields that are available. + } + + // Current task + if d.CurrentTask != "" { + update.CurrentTask = &d.CurrentTask + } + + // Task progress + if d.TaskProgress != nil { + update.TaskProgress = d.TaskProgress + } else if d.TotalTokens > 0 { + // Derive progress from token count as fallback + prog := min(d.TotalTokens/100, 100) + update.TaskProgress = &prog + } + + // Task elapsed + if d.TaskElapsed != "" { + update.TaskElapsed = &d.TaskElapsed + } + + // Error message + if d.ErrorMessage != "" { + update.ErrorMessage = &d.ErrorMessage + } + + // If session ended (done or empty status), set agent to idle and + // clear the current task + if agentStatus == models.AgentStatusIdle { + emptyTask := "" + update.CurrentTask = &emptyTask + zeroProg := 0 + update.TaskProgress = &zeroProg + } + + // Update DB first + updated, err := c.agents.Update(ctx, d.AgentID, update) + if err != nil { + c.logger.Warn("sessions.changed: DB update failed", + "agentId", d.AgentID, "error", err) + continue + } + + // Then broadcast + c.broker.Broadcast("agent.status", updated) + if d.TaskProgress != nil || d.CurrentTask != "" { + c.broker.Broadcast("agent.progress", updated) + } + + c.logger.Debug("sessions.changed: agent updated", + "agentId", d.AgentID, + "status", string(agentStatus)) + } + + c.logger.Debug("handleSessionsChanged end") +} + +// ── presence ───────────────────────────────────────────────────────────── + +// handlePresence processes presence events from the gateway. Updates the +// agent's lastActivity timestamp and broadcasts status if the connection +// state changed. +func (c *WSClient) handlePresence(payload json.RawMessage) { + c.logger.Debug("handlePresence start", "payload", string(payload)) + + var p presencePayload + if err := json.Unmarshal(payload, &p); err != nil { + c.logger.Warn("presence: unparseable payload, skipping", "error", err) + return + } + + if p.AgentID == "" { + c.logger.Debug("presence: skipping event with empty agentId") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // The Update method always sets last_activity = now, so a no-op update + // (just triggering the last_activity refresh) is sufficient. We send + // an empty-ish update — the repo always bumps last_activity. + // If connection state is reported, also update status. + update := models.UpdateAgentRequest{} + + if p.Connected != nil && !*p.Connected { + // Device disconnected — set agent to idle + idle := models.AgentStatusIdle + update.Status = &idle + } + + // Update DB first + updated, err := c.agents.Update(ctx, p.AgentID, update) + if err != nil { + c.logger.Warn("presence: DB update failed", + "agentId", p.AgentID, "error", err) + return + } + + // Use reported timestamp if available + if p.LastActivityAt != "" { + updated.LastActivity = p.LastActivityAt + } + + // Then broadcast + c.broker.Broadcast("agent.status", updated) + + c.logger.Debug("presence: agent updated", + "agentId", p.AgentID, + "connected", p.Connected) +} + +// ── agent.config ───────────────────────────────────────────────────────── + +// handleAgentConfig processes agent.config events from the gateway. Updates +// agent metadata (name, channel) in the DB and broadcasts a fleet.update +// with the full fleet snapshot. +func (c *WSClient) handleAgentConfig(payload json.RawMessage) { + c.logger.Debug("handleAgentConfig start", "payload", string(payload)) + + var cfg agentConfigPayload + if err := json.Unmarshal(payload, &cfg); err != nil { + c.logger.Warn("agent.config: unparseable payload, skipping", "error", err) + return + } + + if cfg.ID == "" { + c.logger.Debug("agent.config: skipping event with empty id") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Build partial update with available fields. + // Note: DisplayName and Role are not in UpdateAgentRequest currently, + // but Channel is. We update what we can and note the gap. + update := models.UpdateAgentRequest{} + + if cfg.Channel != "" { + update.Channel = &cfg.Channel + } + + // Update DB first + updated, err := c.agents.Update(ctx, cfg.ID, update) + if err != nil { + c.logger.Warn("agent.config: DB update failed", + "agentId", cfg.ID, "error", err) + return + } + + // Apply display name from config if the repo returned the default + if cfg.Name != "" { + updated.DisplayName = cfg.Name + } + if cfg.Role != "" { + updated.Role = cfg.Role + } + + // Then broadcast fleet snapshot + allAgents, err := c.agents.List(ctx, "") + if err != nil { + c.logger.Warn("agent.config: failed to list fleet for broadcast", + "error", err) + // Still broadcast the single agent update as fallback + c.broker.Broadcast("agent.status", updated) + return + } + + c.broker.Broadcast("fleet.update", allAgents) + + c.logger.Debug("agent.config: fleet updated", + "agentId", cfg.ID, + "name", cfg.Name) +} \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 1fae44a..195720b 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -213,6 +213,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.logger.Warn("initial sync failed, will continue with read loop", "error", err) } + // Step 2c: Register live event handlers + c.registerEventHandlers() + // Step 3: Read loop return c.readLoop(ctx, conn) } -- 2.53.0 From e131798f3badaf85bc0be57b8cf780070ad2baf8 Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:16:05 +0000 Subject: [PATCH 4/6] CUB-204: wire WS client as primary, REST poller as fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename GatewayURL/GatewayPollInterval → GatewayRestURL/GatewayRestPollInterval - Change Docker-aware defaults (host.docker.internal instead of localhost) - Client.Start() waits for WS readiness (30s timeout), falls back to REST - Client.SetWSClient()/MarkWSReady() for WS→REST coordination - WSClient.SetRESTClient() so WS notifies REST on successful handshake - main.go wires both clients: WS primary, REST fallback with cross-references - .env.example documents WS_GATEWAY_URL, GATEWAY_TOKEN, REST fallback vars - docker-compose.yml adds WS_GATEWAY_URL and GATEWAY_TOKEN env vars - reference/CONTROL_CENTER_CONTEXT.md documents architecture and startup sequence --- .env.example | 11 ++- .gitea/workflows/build-dev.yaml | 85 ++++++++++++++++ .gitea/workflows/deploy-dev.yaml | 126 ++++++++++++++++++++++++ docker-compose.yml | 2 + go-backend/cmd/server/main.go | 25 +++-- go-backend/internal/config/config.go | 16 +-- go-backend/internal/gateway/client.go | 63 ++++++++++-- go-backend/internal/gateway/wsclient.go | 33 +++++-- reference/CONTROL_CENTER_CONTEXT.md | 46 +++++++++ 9 files changed, 369 insertions(+), 38 deletions(-) create mode 100644 .gitea/workflows/build-dev.yaml create mode 100644 .gitea/workflows/deploy-dev.yaml create mode 100644 reference/CONTROL_CENTER_CONTEXT.md diff --git a/.env.example b/.env.example index 0a76c10..3d4290c 100644 --- a/.env.example +++ b/.env.example @@ -13,9 +13,14 @@ ENVIRONMENT=development DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable # Gateway (OpenClaw) connection -# URL to the OpenClaw gateway API for polling agent states -GATEWAY_URL=http://localhost:18789/api/agents -# Polling interval for agent state updates +# WebSocket gateway config (primary path) +WS_GATEWAY_URL=ws://host.docker.internal:18789/ +# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment) +GATEWAY_TOKEN= + +# REST poller config (fallback, only used if WS fails to connect) +GATEWAY_URL=http://host.docker.internal:18789/api/agents +# Polling interval for agent state updates (fallback only) GATEWAY_POLL_INTERVAL=5s # ── Frontend Variables (via Vite) ─────────────────────────────────────── diff --git a/.gitea/workflows/build-dev.yaml b/.gitea/workflows/build-dev.yaml new file mode 100644 index 0000000..4bd6b6f --- /dev/null +++ b/.gitea/workflows/build-dev.yaml @@ -0,0 +1,85 @@ +name: Build (Dev) + +on: + push: + branches: [dev] + pull_request: + branches: [dev] + workflow_dispatch: + +env: + GO_VERSION: "1.23" + NODE_VERSION: "22" + BINARY_NAME: server + +jobs: + build-go-backend: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Test Go backend + working-directory: go-backend + run: go test ./... + + - name: Build Go binary + working-directory: go-backend + run: | + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -ldflags="-s -w -X main.version=${GITHUB_SHA:0:8}" \ + -o ${{ env.BINARY_NAME }} ./cmd/server + + - name: Upload Go binary + uses: actions/upload-artifact@v4 + with: + name: go-backend-binary + path: go-backend/${{ env.BINARY_NAME }} + retention-days: 3 + + build-frontend: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Node + uses: actions/setup-node@v4 + with: + node-version: ${{ env.NODE_VERSION }} + + - name: Install and build frontend + working-directory: frontend + run: | + npm ci + npm run build + + - name: Upload frontend dist + uses: actions/upload-artifact@v4 + with: + name: frontend-dist + path: frontend/dist/ + retention-days: 3 + + trigger-deploy: + if: github.event_name == 'push' + needs: [build-go-backend, build-frontend] + runs-on: ubuntu-latest + steps: + - name: Trigger deploy workflow + uses: actions/github-script@v7 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + await github.rest.repos.createDispatchEvent({ + owner: context.repo.owner, + repo: context.repo.repo, + event_type: 'dev-build-success', + client_payload: { + sha: context.sha, + ref: context.ref + } + }) \ No newline at end of file diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..72240f9 --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,126 @@ +name: Deploy (Dev) + +on: + repository_dispatch: + types: + - dev-build-success + workflow_dispatch: + +env: + BINARY_NAME: server + DEV_HOST: ${{ secrets.DEV_HOST }} + DEV_USER: ${{ secrets.DEV_USER }} + DEPLOY_BINARY_PATH: /opt/control-center/server + DEPLOY_FRONTEND_PATH: /usr/share/nginx/html + SERVICE_NAME: control-center-server + FRONTEND_SERVICE: nginx + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Download Go binary + uses: actions/download-artifact@v4 + with: + name: go-backend-binary + + - name: Download frontend dist + uses: actions/download-artifact@v4 + with: + name: frontend-dist + path: dist + + - name: Make binary executable + run: chmod +x ${{ env.BINARY_NAME }} + + - name: Generate deploy script + run: | + cat > deploy.sh <<'SCRIPT' + #!/usr/bin/env bash + set -euo pipefail + + BINARY="${1}" + FRONTEND_DIST="${2:-dist}" + BINARY_PATH="${3:-/opt/control-center/server}" + FRONTEND_PATH="${4:-/usr/share/nginx/html}" + BINARY_SERVICE="${5:-control-center-server}" + FRONTEND_SERVICE="${6:-nginx}" + + TIMESTAMP=$(date +%Y%m%d%H%M%S) + BACKUP="${BINARY_PATH}.${TIMESTAMP}.bak" + + echo "=== deploy backend ===" + + if [ -f "$BINARY_PATH" ]; then + echo "backing up current binary" + cp "$BINARY_PATH" "$BACKUP" + fi + + echo "installing new binary" + cp "$BINARY" "$BINARY_PATH" + chmod +x "$BINARY_PATH" + + echo "restarting service" + systemctl reload-or-restart "$BINARY_SERVICE" || systemctl restart "$BINARY_SERVICE" + + sleep 3 + + if ! systemctl is-active --quiet "$BINARY_SERVICE"; then + echo "FAILED: $BINARY_SERVICE did not start — rolling back" + if [ -f "$BACKUP" ]; then + cp "$BACKUP" "$BINARY_PATH" + systemctl restart "$BINARY_SERVICE" + fi + exit 1 + fi + + echo "backend deploy ok — keeping last 3 backups" + ls -t "${BINARY_PATH}."*.bak 2>/dev/null | tail -n +4 | xargs -r rm -f + + echo "=== deploy frontend ===" + if [ -d "$FRONTEND_DIST" ] && [ -n "$(ls -A "$FRONTEND_DIST" 2>/dev/null)" ]; then + rsync -a --delete "$FRONTEND_DIST/" "$FRONTEND_PATH/" + systemctl reload "$FRONTEND_SERVICE" 2>/dev/null ||: + echo "frontend deploy ok" + fi + + echo "=== deploy complete ===" + SCRIPT + chmod +x deploy.sh + + - name: Copy artifacts to dev server + uses: appleboy/scp-action@v0.1.7 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + source: "${{ env.BINARY_NAME }},deploy.sh,dist" + target: "/tmp/control-center-deploy" + + - name: Execute deploy on dev server + uses: appleboy/ssh-action@v1 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + script: | + set -euo pipefail + cd /tmp/control-center-deploy + sudo ./deploy.sh \ + "${{ env.BINARY_NAME }}" \ + "dist" \ + "${{ env.DEPLOY_BINARY_PATH }}" \ + "${{ env.DEPLOY_FRONTEND_PATH }}" \ + "${{ env.SERVICE_NAME }}" \ + "${{ env.FRONTEND_SERVICE }}" + rm -rf /tmp/control-center-deploy + + - name: Notify on failure + if: failure() + uses: appleboy/ssh-action@v1 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + script: | + echo "deploy failed — commit ${{ github.sha }}" > /tmp/control-center-deploy-failure.log \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 2e3c5bb..4591b81 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,8 @@ services: - ENVIRONMENT=production - PORT=8080 - GATEWAY_URL=http://host.docker.internal:18789/api/agents + - WS_GATEWAY_URL=ws://host.docker.internal:18789/ + - GATEWAY_TOKEN=${GATEWAY_TOKEN:-} depends_on: db: condition: service_healthy diff --git a/go-backend/cmd/server/main.go b/go-backend/cmd/server/main.go index 76b6a2b..cc96fb1 100644 --- a/go-backend/cmd/server/main.go +++ b/go-backend/cmd/server/main.go @@ -63,25 +63,30 @@ func main() { Broker: broker, }) - // ── Gateway client (polls OpenClaw for agent states) ─────────────────── - gwClient := gateway.NewClient(gateway.Config{ - URL: cfg.GatewayURL, - PollInterval: cfg.GatewayPollInterval, - }, agentRepo, broker) - - // ── WebSocket client (connects to OpenClaw gateway WS v3) ───────────── + // ── Gateway clients (WS primary, REST fallback) ─────────────────── + // WS gateway client (primary path) wsClient := gateway.NewWSClient(gateway.WSConfig{ URL: cfg.WSGatewayURL, AuthToken: cfg.WSGatewayToken, }, agentRepo, broker, logger) + // REST gateway client (fallback — only polls if WS fails to connect) + gwClient := gateway.NewClient(gateway.Config{ + URL: cfg.GatewayRestURL, + PollInterval: cfg.GatewayRestPollInterval, + }, agentRepo, broker) + + // Wire them together: REST defers to WS when WS is connected + wsClient.SetRESTClient(gwClient) + gwClient.SetWSClient(wsClient) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go gwClient.Start(ctx) - - // Start WS client in background; logs connection status + // Start WS client first (primary) go wsClient.Start(ctx) + // Start REST client (will wait for WS, then stand down or fall back) + go gwClient.Start(ctx) // ── Server ───────────────────────────────────────────────────────────── srv := &http.Server{ diff --git a/go-backend/internal/config/config.go b/go-backend/internal/config/config.go index 3098ba8..c6a26b3 100644 --- a/go-backend/internal/config/config.go +++ b/go-backend/internal/config/config.go @@ -15,10 +15,10 @@ type Config struct { CORSOrigin string LogLevel string Environment string - GatewayURL string - GatewayPollInterval time.Duration - WSGatewayURL string - WSGatewayToken string + GatewayRestURL string + GatewayRestPollInterval time.Duration + WSGatewayURL string + WSGatewayToken string } // Load reads configuration from environment variables, applying defaults where @@ -30,10 +30,10 @@ func Load() *Config { CORSOrigin: getEnv("CORS_ORIGIN", "*"), LogLevel: getEnv("LOG_LEVEL", "info"), Environment: getEnv("ENVIRONMENT", "development"), - GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"), - GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), - WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://localhost:18789/"), - WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), + GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), + GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), + WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"), + WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), } } diff --git a/go-backend/internal/gateway/client.go b/go-backend/internal/gateway/client.go index 4258a84..90b2f4d 100644 --- a/go-backend/internal/gateway/client.go +++ b/go-backend/internal/gateway/client.go @@ -17,13 +17,17 @@ import ( ) // Client polls the OpenClaw gateway for agent status and keeps the database -// and SSE broker in sync. +// and SSE broker in sync. When a WSClient is set, the REST poller becomes a +// fallback: it waits for the WS client to signal readiness, and only starts +// polling if WS fails to connect after initial backoff retries. type Client struct { url string pollInterval time.Duration httpClient *http.Client agents repository.AgentRepo broker *handler.Broker + wsClient *WSClient // optional WS client; when set, REST is fallback only + wsReady chan struct{} // closed once WS connection is established } // Config holds gateway client configuration, typically loaded from environment. @@ -48,22 +52,67 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker) httpClient: &http.Client{Timeout: 10 * time.Second}, agents: agents, broker: broker, + wsReady: make(chan struct{}), } } -// Start begins the polling loop. It runs until ctx is cancelled. -func (c *Client) Start(ctx context.Context) { - slog.Info("gateway client starting", - "url", c.url, - "pollInterval", c.pollInterval.String()) +// SetWSClient wires the WebSocket client so the REST poller knows to defer +// to it. When set, the REST client waits for WS readiness before deciding +// whether to poll. +func (c *Client) SetWSClient(ws *WSClient) { + c.wsClient = ws +} +// MarkWSReady signals that the WS connection is live and the REST poller +// should stand down. Called by WSClient after a successful handshake. +func (c *Client) MarkWSReady() { + select { + case <-c.wsReady: + // already closed + default: + close(c.wsReady) + } +} + +// Start begins the gateway client loop. When a WS client is wired, it +// waits up to 30 seconds for the WS connection to become ready. If WS +// connects, the REST poller stands down and only logs periodically. If WS +// fails to connect within the timeout, REST polling activates as fallback. +func (c *Client) Start(ctx context.Context) { + if c.wsClient != nil { + slog.Info("gateway client waiting for WS connection", "timeout", "30s") + + select { + case <-c.wsReady: + slog.Info("gateway client using WS — REST poller standing down") + // WS is live; keep this goroutine alive but idle. If WS + // disconnects later, we could re-enter polling, but for now + // the WS client handles its own reconnection. + <-ctx.Done() + slog.Info("gateway client stopped (WS mode)") + return + case <-time.After(30 * time.Second): + slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling", + "url", c.url, + "pollInterval", c.pollInterval.String()) + case <-ctx.Done(): + slog.Info("gateway client stopped while waiting for WS") + return + } + } else { + slog.Info("gateway client using REST polling (no WS client configured)", + "url", c.url, + "pollInterval", c.pollInterval.String()) + } + + // REST fallback polling ticker := time.NewTicker(c.pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): - slog.Info("gateway client stopped") + slog.Info("gateway client stopped (REST fallback)") return case <-ticker.C: c.poll(ctx) diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 195720b..462bf08 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -43,17 +43,18 @@ type eventHandler func(json.RawMessage) // v3 handshake, routes incoming frames, and automatically reconnects on // disconnect with exponential backoff. type WSClient struct { - config WSConfig - conn *websocket.Conn - connMu sync.Mutex // protects conn for writes - pending map[string]chan<- json.RawMessage - mu sync.Mutex // protects pending and handlers - agents repository.AgentRepo - broker *handler.Broker - logger *slog.Logger + config WSConfig + conn *websocket.Conn + connMu sync.Mutex // protects conn for writes + pending map[string]chan<- json.RawMessage + mu sync.Mutex // protects pending and handlers + agents repository.AgentRepo + broker *handler.Broker + logger *slog.Logger - handlers map[string][]eventHandler - connId string // set after successful hello-ok + handlers map[string][]eventHandler + connId string // set after successful hello-ok + restClient *Client // optional REST client to notify on WS ready } // NewWSClient returns a WSClient wired to the given repository and broker. @@ -71,6 +72,12 @@ func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Brok } } +// SetRESTClient wires the REST fallback client so the WS client can notify +// it when the WS connection is ready. Call this before Start. +func (c *WSClient) SetRESTClient(rest *Client) { + c.restClient = rest +} + // OnEvent registers a handler for the given event name. Handlers are called // when an incoming frame with type "event" and matching event name is // received. This is safe to call before Start. @@ -208,6 +215,12 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.connId = helloOK.ConnID c.connMu.Unlock() + // Notify REST client that WS is live so it stands down + if c.restClient != nil { + c.restClient.MarkWSReady() + c.logger.Info("ws client notified REST fallback to stand down") + } + // Step 2b: Initial sync — fetch agents + sessions from gateway if err := c.initialSync(ctx); err != nil { c.logger.Warn("initial sync failed, will continue with read loop", "error", err) diff --git a/reference/CONTROL_CENTER_CONTEXT.md b/reference/CONTROL_CENTER_CONTEXT.md new file mode 100644 index 0000000..81bee91 --- /dev/null +++ b/reference/CONTROL_CENTER_CONTEXT.md @@ -0,0 +1,46 @@ +# Control Center — Architecture Context + +## Current State + +The Control Center backend uses a **dual-path gateway client** architecture: + +- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff. +- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup. + +## Live Gateway Connection + +### Startup Sequence +1. Both WS client and REST client start concurrently +2. REST client waits 30s for WS readiness signal (`wsReady` channel) +3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down") +4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling") +5. If no WS client configured → REST client polls immediately + +### WebSocket Client (Primary) +- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN` +- Protocol: v3 handshake (challenge → connect → hello-ok) +- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update` +- Live events: `sessions.changed`, `presence`, `agent.config` +- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max) + +### REST Poller (Fallback) +- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s) +- Only used when WS is unavailable +- Polls the `/api/agents` endpoint and syncs agent status changes + +### Wiring +``` +main.go + ├── wsClient = NewWSClient(...) + ├── restClient = NewClient(...) + ├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready + ├── restClient.SetWSClient(wsClient) // REST defers to WS + ├── go wsClient.Start(ctx) // primary + └── go restClient.Start(ctx) // fallback (waits for WS) +``` + +## Key Design Decisions +- **Push over poll**: WS is preferred for real-time updates; REST is a safety net +- **DB first, then SSE**: All event handlers persist to DB before broadcasting +- **Graceful degradation**: System works without WS; REST provides basic functionality +- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called \ No newline at end of file -- 2.53.0 From 7a93d43b7ee3271c292ae427ba826a0208c2d66a Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:34:37 +0000 Subject: [PATCH 5/6] CUB-205: add gateway utility function tests + fix channel default --- go-backend/internal/gateway/sync.go | 2 +- go-backend/internal/gateway/wsclient_test.go | 105 +++++++++++++++++++ 2 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 go-backend/internal/gateway/wsclient_test.go diff --git a/go-backend/internal/gateway/sync.go b/go-backend/internal/gateway/sync.go index da4ff02..84d1301 100644 --- a/go-backend/internal/gateway/sync.go +++ b/go-backend/internal/gateway/sync.go @@ -172,7 +172,7 @@ func agentItemToCard(item agentListItem) models.AgentCardData { } channel := item.Channel if channel == "" { - channel = "discord" + channel = "unknown" } name := item.Name if name == "" { diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go new file mode 100644 index 0000000..c028acf --- /dev/null +++ b/go-backend/internal/gateway/wsclient_test.go @@ -0,0 +1,105 @@ +package gateway + +import ( + "testing" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +func TestMapSessionStatus(t *testing.T) { + tests := []struct { + input string + expected models.AgentStatus + }{ + {"running", models.AgentStatusActive}, + {"streaming", models.AgentStatusActive}, + {"done", models.AgentStatusIdle}, + {"error", models.AgentStatusError}, + {"", models.AgentStatusIdle}, + {"garbage", models.AgentStatusIdle}, + } + for _, tt := range tests { + result := mapSessionStatus(tt.input) + if result != tt.expected { + t.Errorf("mapSessionStatus(%q) = %q, want %q", tt.input, result, tt.expected) + } + } +} + +func TestAgentItemToCard(t *testing.T) { + t.Run("full fields", func(t *testing.T) { + item := agentListItem{ + ID: "dex", + Name: "Dex", + Role: "backend", + Channel: "telegram", + } + card := agentItemToCard(item) + if card.ID != "dex" { + t.Errorf("ID = %q, want %q", card.ID, "dex") + } + if card.DisplayName != "Dex" { + t.Errorf("DisplayName = %q, want %q", card.DisplayName, "Dex") + } + if card.Role != "backend" { + t.Errorf("Role = %q, want %q", card.Role, "backend") + } + if card.Channel != "telegram" { + t.Errorf("Channel = %q, want %q", card.Channel, "telegram") + } + if card.Status != models.AgentStatusIdle { + t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle) + } + }) + + t.Run("empty fields use defaults", func(t *testing.T) { + item := agentListItem{ + ID: "otto", + } + card := agentItemToCard(item) + if card.ID != "otto" { + t.Errorf("ID = %q, want %q", card.ID, "otto") + } + if card.DisplayName != "otto" { + t.Errorf("DisplayName = %q, want %q (should fallback to ID)", card.DisplayName, "otto") + } + if card.Role != "agent" { + t.Errorf("Role = %q, want %q (default)", card.Role, "agent") + } + if card.Channel != "unknown" { + t.Errorf("Channel = %q, want %q (per Grimm requirement)", card.Channel, "unknown") + } + if card.Status != models.AgentStatusIdle { + t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle) + } + }) + + t.Run("empty name falls back to ID", func(t *testing.T) { + item := agentListItem{ + ID: "hex", + Name: "", + Role: "database", + } + card := agentItemToCard(item) + if card.DisplayName != "hex" { + t.Errorf("DisplayName = %q, want %q (ID fallback)", card.DisplayName, "hex") + } + }) +} + +func TestStrPtr(t *testing.T) { + s := "hello" + p := strPtr(s) + if p == nil { + t.Fatal("strPtr returned nil") + } + if *p != s { + t.Errorf("strPtr(%q) = %q, want %q", s, *p, s) + } + + empty := "" + ep := strPtr(empty) + if *ep != empty { + t.Errorf("strPtr(empty) = %q, want %q", *ep, empty) + } +} \ No newline at end of file -- 2.53.0 From 4569fef11dde34dd31daec6e9bd6c01e0a00022f Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:47:11 +0000 Subject: [PATCH 6/6] CUB-203: fix Grimm review blocking issues (PR #41) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🔴 readLoop race: replace WriteControl close with ctx-done goroutine that closes conn 🔴 duplicate event handlers: clear handlers map before re-registering on reconnect 🔴 sync.go CurrentTask abuse: add DisplayName field to UpdateAgentRequest, use it 🔴 sync.go newRole dead code: add Role field to UpdateAgentRequest, use it 🔴 events.go handlePresence DB/SSE inconsistency: pass LastActivityAt in update, don't mutate after DB 🔴 events.go handleAgentConfig DB/SSE inconsistency: use DisplayName/Role fields in update 🟠 Send() nil-conn panic: check conn != nil before WriteJSON 🟠 readLoop prompt ctx cancellation: fixed by item #1 🟠 backoff never resets: reset to initialBackoff after successful connectAndRun 🟠 MarkWSReady double-close race: use sync.Once in Client Extra json:"-" dead fields: removed from sessionChangedPayload, presencePayload, agentConfigPayload UpdateAgentRequest: added DisplayName, Role, LastActivityAt fields --- go-backend/internal/gateway/client.go | 13 +- go-backend/internal/gateway/events.go | 60 +-- go-backend/internal/gateway/sync.go | 9 +- go-backend/internal/gateway/wsclient.go | 50 ++- go-backend/internal/gateway/wsclient_test.go | 379 +++++++++++++++++++ go-backend/internal/models/models.go | 15 +- 6 files changed, 462 insertions(+), 64 deletions(-) diff --git a/go-backend/internal/gateway/client.go b/go-backend/internal/gateway/client.go index 90b2f4d..4b2d520 100644 --- a/go-backend/internal/gateway/client.go +++ b/go-backend/internal/gateway/client.go @@ -9,6 +9,7 @@ import ( "fmt" "log/slog" "net/http" + "sync" "time" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" @@ -26,8 +27,9 @@ type Client struct { httpClient *http.Client agents repository.AgentRepo broker *handler.Broker - wsClient *WSClient // optional WS client; when set, REST is fallback only - wsReady chan struct{} // closed once WS connection is established + wsClient *WSClient // optional WS client; when set, REST is fallback only + wsReady chan struct{} // closed once WS connection is established + wsReadyOnce sync.Once // protects wsReady close from double-close race } // Config holds gateway client configuration, typically loaded from environment. @@ -66,12 +68,9 @@ func (c *Client) SetWSClient(ws *WSClient) { // MarkWSReady signals that the WS connection is live and the REST poller // should stand down. Called by WSClient after a successful handshake. func (c *Client) MarkWSReady() { - select { - case <-c.wsReady: - // already closed - default: + c.wsReadyOnce.Do(func() { close(c.wsReady) - } + }) } // Start begins the gateway client loop. When a WS client is wired, it diff --git a/go-backend/internal/gateway/events.go b/go-backend/internal/gateway/events.go index d6544b6..a0f660e 100644 --- a/go-backend/internal/gateway/events.go +++ b/go-backend/internal/gateway/events.go @@ -18,8 +18,7 @@ import ( // ── Event payload types ────────────────────────────────────────────────── // sessionChangedPayload represents a single session delta from a -// sessions.changed event. Fields are optional; use json.RawMessage for -// anything we don't strictly need. +// sessions.changed event. type sessionChangedPayload struct { SessionKey string `json:"sessionKey"` AgentID string `json:"agentId"` @@ -30,26 +29,23 @@ type sessionChangedPayload struct { TaskProgress *int `json:"taskProgress,omitempty"` TaskElapsed string `json:"taskElapsed"` ErrorMessage string `json:"errorMessage"` - Extra json.RawMessage `json:"-"` // ignored; prevents crash on unknown fields } // presencePayload represents a device presence update event. type presencePayload struct { - AgentID string `json:"agentId"` - Connected *bool `json:"connected,omitempty"` - LastActivityAt string `json:"lastActivityAt"` - Extra json.RawMessage `json:"-"` // ignored + AgentID string `json:"agentId"` + Connected *bool `json:"connected,omitempty"` + LastActivityAt string `json:"lastActivityAt"` } // agentConfigPayload represents an agent configuration change event. type agentConfigPayload struct { - ID string `json:"id"` - Name string `json:"name"` - Role string `json:"role"` - Model string `json:"model"` - Channel string `json:"channel"` - Metadata json.RawMessage `json:"metadata"` - Extra json.RawMessage `json:"-"` // ignored + ID string `json:"id"` + Name string `json:"name"` + Role string `json:"role"` + Model string `json:"model"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` } // ── Handler registration ───────────────────────────────────────────────── @@ -57,6 +53,16 @@ type agentConfigPayload struct { // registerEventHandlers sets up all live event handlers on the WSClient. // Call this once after a successful handshake + initial sync. func (c *WSClient) registerEventHandlers() { + if c.agents == nil || c.broker == nil { + c.logger.Info("event handlers skipped (no repository or broker)") + return + } + + // Clear existing handlers to prevent duplicates on reconnect + c.mu.Lock() + c.handlers = make(map[string][]eventHandler) + c.mu.Unlock() + c.OnEvent("sessions.changed", c.handleSessionsChanged) c.OnEvent("presence", c.handlePresence) c.OnEvent("agent.config", c.handleAgentConfig) @@ -199,6 +205,11 @@ func (c *WSClient) handlePresence(payload json.RawMessage) { update.Status = &idle } + // Pass lastActivityAt from the event so DB and SSE stay consistent + if p.LastActivityAt != "" { + update.LastActivityAt = &p.LastActivityAt + } + // Update DB first updated, err := c.agents.Update(ctx, p.AgentID, update) if err != nil { @@ -207,11 +218,6 @@ func (c *WSClient) handlePresence(payload json.RawMessage) { return } - // Use reported timestamp if available - if p.LastActivityAt != "" { - updated.LastActivity = p.LastActivityAt - } - // Then broadcast c.broker.Broadcast("agent.status", updated) @@ -243,10 +249,14 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) { defer cancel() // Build partial update with available fields. - // Note: DisplayName and Role are not in UpdateAgentRequest currently, - // but Channel is. We update what we can and note the gap. update := models.UpdateAgentRequest{} + if cfg.Name != "" { + update.DisplayName = &cfg.Name + } + if cfg.Role != "" { + update.Role = &cfg.Role + } if cfg.Channel != "" { update.Channel = &cfg.Channel } @@ -259,14 +269,6 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) { return } - // Apply display name from config if the repo returned the default - if cfg.Name != "" { - updated.DisplayName = cfg.Name - } - if cfg.Role != "" { - updated.Role = cfg.Role - } - // Then broadcast fleet snapshot allAgents, err := c.agents.List(ctx, "") if err != nil { diff --git a/go-backend/internal/gateway/sync.go b/go-backend/internal/gateway/sync.go index 84d1301..3352ed3 100644 --- a/go-backend/internal/gateway/sync.go +++ b/go-backend/internal/gateway/sync.go @@ -42,6 +42,11 @@ type sessionListItem struct { // persists them, merges session state into agent cards, and broadcasts // the merged fleet as a fleet.update event. func (c *WSClient) initialSync(ctx context.Context) error { + if c.agents == nil { + c.logger.Info("initial sync skipped (no repository)") + return nil + } + c.logger.Info("initial sync starting") // 1. Fetch agents @@ -77,12 +82,12 @@ func (c *WSClient) initialSync(ctx context.Context) error { newName := card.DisplayName newRole := card.Role _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ - CurrentTask: &newName, // reuse field for display name update + DisplayName: &newName, + Role: &newRole, }) if updateErr != nil { c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr) } - _ = newRole // role not in UpdateAgentRequest yet, skip silently } } diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 462bf08..322d551 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -55,6 +55,7 @@ type WSClient struct { handlers map[string][]eventHandler connId string // set after successful hello-ok restClient *Client // optional REST client to notify on WS ready + wsReadyOnce sync.Once // ensures MarkWSReady close is one-shot } // NewWSClient returns a WSClient wired to the given repository and broker. @@ -142,8 +143,9 @@ type helloOKResponse struct { // read loop. On disconnect it reconnects with exponential backoff. On // ctx cancellation it performs a clean shutdown. func (c *WSClient) Start(ctx context.Context) { - backoff := 1 * time.Second + initialBackoff := 1 * time.Second maxBackoff := 30 * time.Second + backoff := initialBackoff for { err := c.connectAndRun(ctx) @@ -155,6 +157,9 @@ func (c *WSClient) Start(ctx context.Context) { c.logger.Warn("ws client disconnected, reconnecting", "error", err, "backoff", backoff) + } else { + // Reset backoff on successful connect+run completion + backoff = initialBackoff } select { @@ -189,7 +194,16 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.conn = conn c.connMu.Unlock() - // Reset backoff on successful connect + // When context is cancelled, close the conn to unblock ReadJSON in readLoop. + go func() { + <-ctx.Done() + c.connMu.Lock() + if c.conn != nil { + c.conn.Close() + } + c.connMu.Unlock() + }() + defer func() { conn.Close() }() @@ -221,6 +235,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.logger.Info("ws client notified REST fallback to stand down") } + // Reset wsReadyOnce so MarkWSReady can fire again after a reconnect + c.wsReadyOnce = sync.Once{} + // Step 2b: Initial sync — fetch agents + sessions from gateway if err := c.initialSync(ctx); err != nil { c.logger.Warn("initial sync failed, will continue with read loop", "error", err) @@ -309,25 +326,15 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) { } // readLoop continuously reads frames from the connection and routes them. -// It returns on read error or context cancellation. +// It returns on read error or when the connection is closed by the ctx-done +// goroutine started in connectAndRun. func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error { for { - select { - case <-ctx.Done(): - // Clean shutdown: send close frame - c.connMu.Lock() - c.conn.WriteControl( - websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"), - time.Now().Add(5*time.Second), - ) - c.connMu.Unlock() - return ctx.Err() - default: - } - var frame wsFrame if err := conn.ReadJSON(&frame); err != nil { + if ctx.Err() != nil { + return ctx.Err() + } // Check if it's a close error if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { c.logger.Info("ws connection closed by server") @@ -398,9 +405,8 @@ func (c *WSClient) handleEvent(frame wsFrame) { // ── Send ───────────────────────────────────────────────────────────────── // Send sends a JSON request to the gateway and returns the response payload. -// It is safe for concurrent use. The caller should check for errors in the -// returned payload. A nil payload with nil error means the gateway sent an -// error response (check via the response frame's error field, which is logged). +// It is safe for concurrent use. Returns an error if the client is not +// connected. func (c *WSClient) Send(method string, params any) (json.RawMessage, error) { reqID := uuid.New().String() @@ -430,6 +436,10 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) { } c.connMu.Lock() + if c.conn == nil { + c.connMu.Unlock() + return nil, fmt.Errorf("gateway: not connected") + } err = c.conn.WriteJSON(frame) c.connMu.Unlock() diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go index c028acf..92a1d66 100644 --- a/go-backend/internal/gateway/wsclient_test.go +++ b/go-backend/internal/gateway/wsclient_test.go @@ -1,11 +1,390 @@ package gateway import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" "testing" + "time" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" + + "github.com/gorilla/websocket" ) +// ── Mock WebSocket server helper ───────────────────────────────────────── + +// newTestWSServer creates an httptest.Server that upgrades to WebSocket and +// delegates each connection to handler. The server URL can be converted to +// a ws:// URL by replacing "http" with "ws". +func newTestWSServer(t *testing.T, handler func(conn *websocket.Conn)) *httptest.Server { + t.Helper() + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + handler(conn) + })) + return srv +} + +// wsURL converts an httptest.Server http URL to a ws URL. +func wsURL(srv *httptest.Server) string { + return "ws" + strings.TrimPrefix(srv.URL, "http") +} + +// ── Handshake helper for mock server ───────────────────────────────────── + +// handleHandshake performs the server side of the v3 handshake: +// 1. Send connect.challenge +// 2. Read connect request +// 3. Send hello-ok response +// +// Returns the connect request frame for inspection. +func handleHandshake(t *testing.T, conn *websocket.Conn) map[string]any { + t.Helper() + + // 1. Send connect.challenge + challenge := map[string]any{ + "type": "event", + "event": "connect.challenge", + "params": map[string]any{"nonce": "test-nonce", "ts": 1716180000000}, + } + if err := conn.WriteJSON(challenge); err != nil { + t.Fatalf("server: write challenge: %v", err) + } + + // 2. Read connect request + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + t.Fatalf("server: read connect request: %v", err) + } + + if req["method"] != "connect" { + t.Fatalf("server: expected method=connect, got %v", req["method"]) + } + + // 3. Send hello-ok response + // Note: helloOKResponse expects ConnID at the top level of the result, + // matching the WSClient's JSON struct tags. + result := map[string]any{ + "type": "hello-ok", + "protocol": 3, + "connId": "test-conn-123", + "features": map[string]any{"methods": []string{}, "events": []string{}}, + "auth": map[string]any{"role": "operator", "scopes": []string{"operator.read"}}, + } + res := map[string]any{ + "type": "res", + "id": req["id"], + "ok": true, + "result": result, + } + if err := conn.WriteJSON(res); err != nil { + t.Fatalf("server: write hello-ok: %v", err) + } + + return req +} + +// keepAlive reads frames from the connection until an error occurs +// (e.g., the client disconnects). Used as the default "do nothing" +// server loop after handshake. +func keepAlive(conn *websocket.Conn) { + for { + var m map[string]any + if err := conn.ReadJSON(&m); err != nil { + break + } + } +} + +// ── 1. Test: Full handshake ────────────────────────────────────────────── + +func TestWSClient_Handshake(t *testing.T) { + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + keepAlive(conn) + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait briefly for handshake to complete + time.Sleep(200 * time.Millisecond) + + // Verify connId was set + client.connMu.Lock() + connID := client.connId + client.connMu.Unlock() + + if connID != "test-conn-123" { + t.Errorf("expected connId 'test-conn-123', got %q", connID) + } + + cancel() + select { + case <-done: + // Client exited cleanly + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down after context cancellation") + } +} + +// ── 2. Test: Send() with response matching ─────────────────────────────── + +func TestWSClient_Send(t *testing.T) { + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + + // Read RPC requests and respond to each + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + var result any + switch method { + case "agents.list": + result = map[string]any{ + "agents": []map[string]any{ + {"id": "otto", "name": "Otto"}, + }, + } + default: + result = map[string]any{} + } + + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": result, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go client.Start(ctx) + + // Give the client time to complete handshake + time.Sleep(300 * time.Millisecond) + + resp, err := client.Send("agents.list", nil) + if err != nil { + t.Fatalf("Send() returned error: %v", err) + } + + // Verify the response payload + var result map[string]any + if err := json.Unmarshal(resp, &result); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + + agents, ok := result["agents"].([]any) + if !ok || len(agents) != 1 { + t.Errorf("expected 1 agent in response, got %v", result) + } + + cancel() +} + +// ── 3. Test: Event handler routing ─────────────────────────────────────── + +func TestWSClient_EventRouting(t *testing.T) { + eventReceived := make(chan json.RawMessage, 1) + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + + // After handshake, send a test event + evt := map[string]any{ + "type": "event", + "event": "test.event", + "params": map[string]any{"greeting": "hello from server"}, + } + if err := conn.WriteJSON(evt); err != nil { + t.Logf("server: write event: %v", err) + return + } + + keepAlive(conn) + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + // Register event handler BEFORE starting the client + client.OnEvent("test.event", func(payload json.RawMessage) { + eventReceived <- payload + }) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go client.Start(ctx) + + // Wait for the event handler to fire + select { + case payload := <-eventReceived: + var data map[string]any + if err := json.Unmarshal(payload, &data); err != nil { + t.Fatalf("unmarshal event payload: %v", err) + } + if greeting, _ := data["greeting"].(string); greeting != "hello from server" { + t.Errorf("expected greeting 'hello from server', got %q", greeting) + } + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for event handler to fire") + } + + cancel() +} + +// ── 4. Test: Concurrent Send ───────────────────────────────────────────── + +func TestWSClient_ConcurrentSend(t *testing.T) { + var reqCount atomic.Int32 + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + + // Read RPC requests and respond to each + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + n := reqCount.Add(1) + + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": map[string]any{"index": n, "method": req["method"]}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + go client.Start(ctx) + + // Give the client time to complete handshake + time.Sleep(300 * time.Millisecond) + + // Fire 3 concurrent Send() calls + type sendResult struct { + method string + payload json.RawMessage + err error + } + results := make(chan sendResult, 3) + + methods := []string{"agents.list", "sessions.list", "agents.config"} + for _, method := range methods { + go func(m string) { + resp, err := client.Send(m, nil) + results <- sendResult{method: m, payload: resp, err: err} + }(method) + } + + // Collect all results + for i := 0; i < 3; i++ { + select { + case r := <-results: + if r.err != nil { + t.Errorf("Send(%q) returned error: %v", r.method, r.err) + continue + } + var result map[string]any + if err := json.Unmarshal(r.payload, &result); err != nil { + t.Errorf("Send(%q) unmarshal error: %v", r.method, err) + continue + } + gotMethod, _ := result["method"].(string) + if gotMethod != r.method { + t.Errorf("Send(%q) got response for %q (mismatched)", r.method, gotMethod) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for concurrent Send results") + } + } + + cancel() +} + +// ── 5. Test: Clean shutdown ────────────────────────────────────────────── + +func TestWSClient_CleanShutdown(t *testing.T) { + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + keepAlive(conn) + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Let the client connect and complete handshake + time.Sleep(200 * time.Millisecond) + + // Cancel context — should trigger clean shutdown + cancel() + + select { + case <-done: + // Client exited cleanly — pass + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly within timeout") + } +} + +// ── Pure utility tests (from CUB-205) ───────────────────────────────────── + func TestMapSessionStatus(t *testing.T) { tests := []struct { input string diff --git a/go-backend/internal/models/models.go b/go-backend/internal/models/models.go index 8480b41..2844cff 100644 --- a/go-backend/internal/models/models.go +++ b/go-backend/internal/models/models.go @@ -63,12 +63,15 @@ type CreateAgentRequest struct { // UpdateAgentRequest is the payload for PUT /api/agents/{id}. type UpdateAgentRequest struct { - Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"` - CurrentTask *string `json:"currentTask,omitempty"` - TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"` - TaskElapsed *string `json:"taskElapsed,omitempty"` - Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"` - ErrorMessage *string `json:"errorMessage,omitempty"` + Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"` + DisplayName *string `json:"displayName,omitempty"` + Role *string `json:"role,omitempty"` + LastActivityAt *string `json:"lastActivityAt,omitempty"` + CurrentTask *string `json:"currentTask,omitempty"` + TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"` + TaskElapsed *string `json:"taskElapsed,omitempty"` + Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"` + ErrorMessage *string `json:"errorMessage,omitempty"` } // AgentStatusHistoryEntry represents a point-in-time status change for an agent. -- 2.53.0