diff --git a/.env.example b/.env.example index 0a76c10..3d4290c 100644 --- a/.env.example +++ b/.env.example @@ -13,9 +13,14 @@ ENVIRONMENT=development DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable # Gateway (OpenClaw) connection -# URL to the OpenClaw gateway API for polling agent states -GATEWAY_URL=http://localhost:18789/api/agents -# Polling interval for agent state updates +# WebSocket gateway config (primary path) +WS_GATEWAY_URL=ws://host.docker.internal:18789/ +# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment) +GATEWAY_TOKEN= + +# REST poller config (fallback, only used if WS fails to connect) +GATEWAY_URL=http://host.docker.internal:18789/api/agents +# Polling interval for agent state updates (fallback only) GATEWAY_POLL_INTERVAL=5s # ── Frontend Variables (via Vite) ─────────────────────────────────────── diff --git a/.gitea/workflows/build-dev.yaml b/.gitea/workflows/build-dev.yaml new file mode 100644 index 0000000..4bd6b6f --- /dev/null +++ b/.gitea/workflows/build-dev.yaml @@ -0,0 +1,85 @@ +name: Build (Dev) + +on: + push: + branches: [dev] + pull_request: + branches: [dev] + workflow_dispatch: + +env: + GO_VERSION: "1.23" + NODE_VERSION: "22" + BINARY_NAME: server + +jobs: + build-go-backend: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Test Go backend + working-directory: go-backend + run: go test ./... + + - name: Build Go binary + working-directory: go-backend + run: | + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -ldflags="-s -w -X main.version=${GITHUB_SHA:0:8}" \ + -o ${{ env.BINARY_NAME }} ./cmd/server + + - name: Upload Go binary + uses: actions/upload-artifact@v4 + with: + name: go-backend-binary + path: go-backend/${{ env.BINARY_NAME }} + retention-days: 3 + + build-frontend: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Node + uses: actions/setup-node@v4 + with: + node-version: ${{ env.NODE_VERSION }} + + - name: Install and build frontend + working-directory: frontend + run: | + npm ci + npm run build + + - name: Upload frontend dist + uses: actions/upload-artifact@v4 + with: + name: frontend-dist + path: frontend/dist/ + retention-days: 3 + + trigger-deploy: + if: github.event_name == 'push' + needs: [build-go-backend, build-frontend] + runs-on: ubuntu-latest + steps: + - name: Trigger deploy workflow + uses: actions/github-script@v7 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + await github.rest.repos.createDispatchEvent({ + owner: context.repo.owner, + repo: context.repo.repo, + event_type: 'dev-build-success', + client_payload: { + sha: context.sha, + ref: context.ref + } + }) \ No newline at end of file diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..72240f9 --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,126 @@ +name: Deploy (Dev) + +on: + repository_dispatch: + types: + - dev-build-success + workflow_dispatch: + +env: + BINARY_NAME: server + DEV_HOST: ${{ secrets.DEV_HOST }} + DEV_USER: ${{ secrets.DEV_USER }} + DEPLOY_BINARY_PATH: /opt/control-center/server + DEPLOY_FRONTEND_PATH: /usr/share/nginx/html + SERVICE_NAME: control-center-server + FRONTEND_SERVICE: nginx + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Download Go binary + uses: actions/download-artifact@v4 + with: + name: go-backend-binary + + - name: Download frontend dist + uses: actions/download-artifact@v4 + with: + name: frontend-dist + path: dist + + - name: Make binary executable + run: chmod +x ${{ env.BINARY_NAME }} + + - name: Generate deploy script + run: | + cat > deploy.sh <<'SCRIPT' + #!/usr/bin/env bash + set -euo pipefail + + BINARY="${1}" + FRONTEND_DIST="${2:-dist}" + BINARY_PATH="${3:-/opt/control-center/server}" + FRONTEND_PATH="${4:-/usr/share/nginx/html}" + BINARY_SERVICE="${5:-control-center-server}" + FRONTEND_SERVICE="${6:-nginx}" + + TIMESTAMP=$(date +%Y%m%d%H%M%S) + BACKUP="${BINARY_PATH}.${TIMESTAMP}.bak" + + echo "=== deploy backend ===" + + if [ -f "$BINARY_PATH" ]; then + echo "backing up current binary" + cp "$BINARY_PATH" "$BACKUP" + fi + + echo "installing new binary" + cp "$BINARY" "$BINARY_PATH" + chmod +x "$BINARY_PATH" + + echo "restarting service" + systemctl reload-or-restart "$BINARY_SERVICE" || systemctl restart "$BINARY_SERVICE" + + sleep 3 + + if ! systemctl is-active --quiet "$BINARY_SERVICE"; then + echo "FAILED: $BINARY_SERVICE did not start — rolling back" + if [ -f "$BACKUP" ]; then + cp "$BACKUP" "$BINARY_PATH" + systemctl restart "$BINARY_SERVICE" + fi + exit 1 + fi + + echo "backend deploy ok — keeping last 3 backups" + ls -t "${BINARY_PATH}."*.bak 2>/dev/null | tail -n +4 | xargs -r rm -f + + echo "=== deploy frontend ===" + if [ -d "$FRONTEND_DIST" ] && [ -n "$(ls -A "$FRONTEND_DIST" 2>/dev/null)" ]; then + rsync -a --delete "$FRONTEND_DIST/" "$FRONTEND_PATH/" + systemctl reload "$FRONTEND_SERVICE" 2>/dev/null ||: + echo "frontend deploy ok" + fi + + echo "=== deploy complete ===" + SCRIPT + chmod +x deploy.sh + + - name: Copy artifacts to dev server + uses: appleboy/scp-action@v0.1.7 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + source: "${{ env.BINARY_NAME }},deploy.sh,dist" + target: "/tmp/control-center-deploy" + + - name: Execute deploy on dev server + uses: appleboy/ssh-action@v1 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + script: | + set -euo pipefail + cd /tmp/control-center-deploy + sudo ./deploy.sh \ + "${{ env.BINARY_NAME }}" \ + "dist" \ + "${{ env.DEPLOY_BINARY_PATH }}" \ + "${{ env.DEPLOY_FRONTEND_PATH }}" \ + "${{ env.SERVICE_NAME }}" \ + "${{ env.FRONTEND_SERVICE }}" + rm -rf /tmp/control-center-deploy + + - name: Notify on failure + if: failure() + uses: appleboy/ssh-action@v1 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + script: | + echo "deploy failed — commit ${{ github.sha }}" > /tmp/control-center-deploy-failure.log \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 2e3c5bb..4591b81 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,8 @@ services: - ENVIRONMENT=production - PORT=8080 - GATEWAY_URL=http://host.docker.internal:18789/api/agents + - WS_GATEWAY_URL=ws://host.docker.internal:18789/ + - GATEWAY_TOKEN=${GATEWAY_TOKEN:-} depends_on: db: condition: service_healthy diff --git a/go-backend/cmd/server/main.go b/go-backend/cmd/server/main.go index dc760b4..cc96fb1 100644 --- a/go-backend/cmd/server/main.go +++ b/go-backend/cmd/server/main.go @@ -63,15 +63,29 @@ func main() { Broker: broker, }) - // ── Gateway client (polls OpenClaw for agent states) ─────────────────── + // ── Gateway clients (WS primary, REST fallback) ─────────────────── + // WS gateway client (primary path) + wsClient := gateway.NewWSClient(gateway.WSConfig{ + URL: cfg.WSGatewayURL, + AuthToken: cfg.WSGatewayToken, + }, agentRepo, broker, logger) + + // REST gateway client (fallback — only polls if WS fails to connect) gwClient := gateway.NewClient(gateway.Config{ - URL: cfg.GatewayURL, - PollInterval: cfg.GatewayPollInterval, + URL: cfg.GatewayRestURL, + PollInterval: cfg.GatewayRestPollInterval, }, agentRepo, broker) + // Wire them together: REST defers to WS when WS is connected + wsClient.SetRESTClient(gwClient) + gwClient.SetWSClient(wsClient) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Start WS client first (primary) + go wsClient.Start(ctx) + // Start REST client (will wait for WS, then stand down or fall back) go gwClient.Start(ctx) // ── Server ───────────────────────────────────────────────────────────── diff --git a/go-backend/go.mod b/go-backend/go.mod index 4b9db15..91bad3e 100644 --- a/go-backend/go.mod +++ b/go-backend/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-chi/cors v1.2.1 github.com/go-playground/validator/v10 v10.24.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.7.2 ) diff --git a/go-backend/go.sum b/go-backend/go.sum index f4041e3..448723d 100644 --- a/go-backend/go.sum +++ b/go-backend/go.sum @@ -17,6 +17,8 @@ github.com/go-playground/validator/v10 v10.24.0 h1:KHQckvo8G6hlWnrPX4NJJ+aBfWNAE github.com/go-playground/validator/v10 v10.24.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/go-backend/internal/config/config.go b/go-backend/internal/config/config.go index fe6c9f6..c6a26b3 100644 --- a/go-backend/internal/config/config.go +++ b/go-backend/internal/config/config.go @@ -10,13 +10,15 @@ import ( // Config holds all application configuration. type Config struct { - Port int - DatabaseURL string - CORSOrigin string - LogLevel string - Environment string - GatewayURL string - GatewayPollInterval time.Duration + Port int + DatabaseURL string + CORSOrigin string + LogLevel string + Environment string + GatewayRestURL string + GatewayRestPollInterval time.Duration + WSGatewayURL string + WSGatewayToken string } // Load reads configuration from environment variables, applying defaults where @@ -28,8 +30,10 @@ func Load() *Config { CORSOrigin: getEnv("CORS_ORIGIN", "*"), LogLevel: getEnv("LOG_LEVEL", "info"), Environment: getEnv("ENVIRONMENT", "development"), - GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"), - GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), + GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), + GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), + WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"), + WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), } } diff --git a/go-backend/internal/gateway/client.go b/go-backend/internal/gateway/client.go index 4258a84..4b2d520 100644 --- a/go-backend/internal/gateway/client.go +++ b/go-backend/internal/gateway/client.go @@ -9,6 +9,7 @@ import ( "fmt" "log/slog" "net/http" + "sync" "time" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" @@ -17,13 +18,18 @@ import ( ) // Client polls the OpenClaw gateway for agent status and keeps the database -// and SSE broker in sync. +// and SSE broker in sync. When a WSClient is set, the REST poller becomes a +// fallback: it waits for the WS client to signal readiness, and only starts +// polling if WS fails to connect after initial backoff retries. type Client struct { url string pollInterval time.Duration httpClient *http.Client agents repository.AgentRepo broker *handler.Broker + wsClient *WSClient // optional WS client; when set, REST is fallback only + wsReady chan struct{} // closed once WS connection is established + wsReadyOnce sync.Once // protects wsReady close from double-close race } // Config holds gateway client configuration, typically loaded from environment. @@ -48,22 +54,64 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker) httpClient: &http.Client{Timeout: 10 * time.Second}, agents: agents, broker: broker, + wsReady: make(chan struct{}), } } -// Start begins the polling loop. It runs until ctx is cancelled. -func (c *Client) Start(ctx context.Context) { - slog.Info("gateway client starting", - "url", c.url, - "pollInterval", c.pollInterval.String()) +// SetWSClient wires the WebSocket client so the REST poller knows to defer +// to it. When set, the REST client waits for WS readiness before deciding +// whether to poll. +func (c *Client) SetWSClient(ws *WSClient) { + c.wsClient = ws +} +// MarkWSReady signals that the WS connection is live and the REST poller +// should stand down. Called by WSClient after a successful handshake. +func (c *Client) MarkWSReady() { + c.wsReadyOnce.Do(func() { + close(c.wsReady) + }) +} + +// Start begins the gateway client loop. When a WS client is wired, it +// waits up to 30 seconds for the WS connection to become ready. If WS +// connects, the REST poller stands down and only logs periodically. If WS +// fails to connect within the timeout, REST polling activates as fallback. +func (c *Client) Start(ctx context.Context) { + if c.wsClient != nil { + slog.Info("gateway client waiting for WS connection", "timeout", "30s") + + select { + case <-c.wsReady: + slog.Info("gateway client using WS — REST poller standing down") + // WS is live; keep this goroutine alive but idle. If WS + // disconnects later, we could re-enter polling, but for now + // the WS client handles its own reconnection. + <-ctx.Done() + slog.Info("gateway client stopped (WS mode)") + return + case <-time.After(30 * time.Second): + slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling", + "url", c.url, + "pollInterval", c.pollInterval.String()) + case <-ctx.Done(): + slog.Info("gateway client stopped while waiting for WS") + return + } + } else { + slog.Info("gateway client using REST polling (no WS client configured)", + "url", c.url, + "pollInterval", c.pollInterval.String()) + } + + // REST fallback polling ticker := time.NewTicker(c.pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): - slog.Info("gateway client stopped") + slog.Info("gateway client stopped (REST fallback)") return case <-ticker.C: c.poll(ctx) diff --git a/go-backend/internal/gateway/events.go b/go-backend/internal/gateway/events.go new file mode 100644 index 0000000..a0f660e --- /dev/null +++ b/go-backend/internal/gateway/events.go @@ -0,0 +1,287 @@ +// Package gateway provides real-time event handlers for the Control Center +// WebSocket client. Handlers process gateway events (sessions.changed, +// presence, agent.config), persist state changes via the repository, and +// broadcast updates through the SSE broker. +// +// Rule: DB update first, then SSE broadcast. This keeps REST API responses +// consistent with SSE events. +package gateway + +import ( + "context" + "encoding/json" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── Event payload types ────────────────────────────────────────────────── + +// sessionChangedPayload represents a single session delta from a +// sessions.changed event. +type sessionChangedPayload struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, streaming, done, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` + CurrentTask string `json:"currentTask"` + TaskProgress *int `json:"taskProgress,omitempty"` + TaskElapsed string `json:"taskElapsed"` + ErrorMessage string `json:"errorMessage"` +} + +// presencePayload represents a device presence update event. +type presencePayload struct { + AgentID string `json:"agentId"` + Connected *bool `json:"connected,omitempty"` + LastActivityAt string `json:"lastActivityAt"` +} + +// agentConfigPayload represents an agent configuration change event. +type agentConfigPayload struct { + ID string `json:"id"` + Name string `json:"name"` + Role string `json:"role"` + Model string `json:"model"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` +} + +// ── Handler registration ───────────────────────────────────────────────── + +// registerEventHandlers sets up all live event handlers on the WSClient. +// Call this once after a successful handshake + initial sync. +func (c *WSClient) registerEventHandlers() { + if c.agents == nil || c.broker == nil { + c.logger.Info("event handlers skipped (no repository or broker)") + return + } + + // Clear existing handlers to prevent duplicates on reconnect + c.mu.Lock() + c.handlers = make(map[string][]eventHandler) + c.mu.Unlock() + + c.OnEvent("sessions.changed", c.handleSessionsChanged) + c.OnEvent("presence", c.handlePresence) + c.OnEvent("agent.config", c.handleAgentConfig) + + c.logger.Info("event handlers registered", + "events", []string{"sessions.changed", "presence", "agent.config"}) +} + +// ── sessions.changed ───────────────────────────────────────────────────── + +// handleSessionsChanged processes sessions.changed events from the gateway. +// The payload may be a single session object or an array of session deltas. +// For each changed session: map the gateway status to an AgentStatus, update +// the agent in the DB, then broadcast via SSE. +func (c *WSClient) handleSessionsChanged(payload json.RawMessage) { + c.logger.Debug("handleSessionsChanged start", "payload", string(payload)) + + // Try array first, then single object + var deltas []sessionChangedPayload + if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 { + // Array of deltas + } else { + // Try single object + var single sessionChangedPayload + if err := json.Unmarshal(payload, &single); err != nil { + c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err) + return + } + deltas = []sessionChangedPayload{single} + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for _, d := range deltas { + if d.AgentID == "" { + c.logger.Debug("sessions.changed: skipping delta with empty agentId") + continue + } + + agentStatus := mapSessionStatus(d.Status) + + // Build partial update + update := models.UpdateAgentRequest{ + Status: &agentStatus, + } + + // Session key + if d.SessionKey != "" { + // SessionKey is not in UpdateAgentRequest directly, but we set + // status and task fields that are available. + } + + // Current task + if d.CurrentTask != "" { + update.CurrentTask = &d.CurrentTask + } + + // Task progress + if d.TaskProgress != nil { + update.TaskProgress = d.TaskProgress + } else if d.TotalTokens > 0 { + // Derive progress from token count as fallback + prog := min(d.TotalTokens/100, 100) + update.TaskProgress = &prog + } + + // Task elapsed + if d.TaskElapsed != "" { + update.TaskElapsed = &d.TaskElapsed + } + + // Error message + if d.ErrorMessage != "" { + update.ErrorMessage = &d.ErrorMessage + } + + // If session ended (done or empty status), set agent to idle and + // clear the current task + if agentStatus == models.AgentStatusIdle { + emptyTask := "" + update.CurrentTask = &emptyTask + zeroProg := 0 + update.TaskProgress = &zeroProg + } + + // Update DB first + updated, err := c.agents.Update(ctx, d.AgentID, update) + if err != nil { + c.logger.Warn("sessions.changed: DB update failed", + "agentId", d.AgentID, "error", err) + continue + } + + // Then broadcast + c.broker.Broadcast("agent.status", updated) + if d.TaskProgress != nil || d.CurrentTask != "" { + c.broker.Broadcast("agent.progress", updated) + } + + c.logger.Debug("sessions.changed: agent updated", + "agentId", d.AgentID, + "status", string(agentStatus)) + } + + c.logger.Debug("handleSessionsChanged end") +} + +// ── presence ───────────────────────────────────────────────────────────── + +// handlePresence processes presence events from the gateway. Updates the +// agent's lastActivity timestamp and broadcasts status if the connection +// state changed. +func (c *WSClient) handlePresence(payload json.RawMessage) { + c.logger.Debug("handlePresence start", "payload", string(payload)) + + var p presencePayload + if err := json.Unmarshal(payload, &p); err != nil { + c.logger.Warn("presence: unparseable payload, skipping", "error", err) + return + } + + if p.AgentID == "" { + c.logger.Debug("presence: skipping event with empty agentId") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // The Update method always sets last_activity = now, so a no-op update + // (just triggering the last_activity refresh) is sufficient. We send + // an empty-ish update — the repo always bumps last_activity. + // If connection state is reported, also update status. + update := models.UpdateAgentRequest{} + + if p.Connected != nil && !*p.Connected { + // Device disconnected — set agent to idle + idle := models.AgentStatusIdle + update.Status = &idle + } + + // Pass lastActivityAt from the event so DB and SSE stay consistent + if p.LastActivityAt != "" { + update.LastActivityAt = &p.LastActivityAt + } + + // Update DB first + updated, err := c.agents.Update(ctx, p.AgentID, update) + if err != nil { + c.logger.Warn("presence: DB update failed", + "agentId", p.AgentID, "error", err) + return + } + + // Then broadcast + c.broker.Broadcast("agent.status", updated) + + c.logger.Debug("presence: agent updated", + "agentId", p.AgentID, + "connected", p.Connected) +} + +// ── agent.config ───────────────────────────────────────────────────────── + +// handleAgentConfig processes agent.config events from the gateway. Updates +// agent metadata (name, channel) in the DB and broadcasts a fleet.update +// with the full fleet snapshot. +func (c *WSClient) handleAgentConfig(payload json.RawMessage) { + c.logger.Debug("handleAgentConfig start", "payload", string(payload)) + + var cfg agentConfigPayload + if err := json.Unmarshal(payload, &cfg); err != nil { + c.logger.Warn("agent.config: unparseable payload, skipping", "error", err) + return + } + + if cfg.ID == "" { + c.logger.Debug("agent.config: skipping event with empty id") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Build partial update with available fields. + update := models.UpdateAgentRequest{} + + if cfg.Name != "" { + update.DisplayName = &cfg.Name + } + if cfg.Role != "" { + update.Role = &cfg.Role + } + if cfg.Channel != "" { + update.Channel = &cfg.Channel + } + + // Update DB first + updated, err := c.agents.Update(ctx, cfg.ID, update) + if err != nil { + c.logger.Warn("agent.config: DB update failed", + "agentId", cfg.ID, "error", err) + return + } + + // Then broadcast fleet snapshot + allAgents, err := c.agents.List(ctx, "") + if err != nil { + c.logger.Warn("agent.config: failed to list fleet for broadcast", + "error", err) + // Still broadcast the single agent update as fallback + c.broker.Broadcast("agent.status", updated) + return + } + + c.broker.Broadcast("fleet.update", allAgents) + + c.logger.Debug("agent.config: fleet updated", + "agentId", cfg.ID, + "name", cfg.Name) +} \ No newline at end of file diff --git a/go-backend/internal/gateway/sync.go b/go-backend/internal/gateway/sync.go new file mode 100644 index 0000000..3352ed3 --- /dev/null +++ b/go-backend/internal/gateway/sync.go @@ -0,0 +1,196 @@ +// Package gateway provides the initial sync logic that fetches agent and +// session data from the OpenClaw gateway via WS RPCs after handshake, +// persists to the repository, merges session state into agent cards, and +// broadcasts the merged fleet to SSE clients. +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── RPC response types ─────────────────────────────────────────────────── + +// agentListItem represents a single agent returned by the agents.list RPC. +// Fields are extracted gracefully from json.RawMessage so unknown fields +// from the gateway are silently ignored. +type agentListItem struct { + ID string `json:"id"` + Name string `json:"name"` + Model string `json:"model"` + Role string `json:"role"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` +} + +// sessionListItem represents a single session returned by the sessions.list RPC. +type sessionListItem struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, done, streaming, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` +} + +// ── Sync logic ────────────────────────────────────────────────────────── + +// initialSync fetches agents and sessions from the gateway via WS RPCs, +// persists them, merges session state into agent cards, and broadcasts +// the merged fleet as a fleet.update event. +func (c *WSClient) initialSync(ctx context.Context) error { + if c.agents == nil { + c.logger.Info("initial sync skipped (no repository)") + return nil + } + + c.logger.Info("initial sync starting") + + // 1. Fetch agents + agentsRaw, err := c.Send("agents.list", nil) + if err != nil { + return fmt.Errorf("agents.list RPC: %w", err) + } + + var agentItems []agentListItem + if err := json.Unmarshal(agentsRaw, &agentItems); err != nil { + return fmt.Errorf("parse agents.list response: %w", err) + } + + c.logger.Info("agents.list received", "count", len(agentItems)) + + // 2. Persist each agent + for _, item := range agentItems { + card := agentItemToCard(item) + + existing, err := c.agents.Get(ctx, card.ID) + if err != nil { + // Agent doesn't exist — create it + if createErr := c.agents.Create(ctx, card); createErr != nil { + c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr) + continue + } + c.logger.Info("sync: agent created", "id", card.ID) + continue + } + + // Agent exists — update if display name or role changed + if existing.DisplayName != card.DisplayName || existing.Role != card.Role { + newName := card.DisplayName + newRole := card.Role + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + DisplayName: &newName, + Role: &newRole, + }) + if updateErr != nil { + c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr) + } + } + } + + // 3. Fetch sessions + sessionsRaw, err := c.Send("sessions.list", nil) + if err != nil { + return fmt.Errorf("sessions.list RPC: %w", err) + } + + var sessionItems []sessionListItem + if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil { + return fmt.Errorf("parse sessions.list response: %w", err) + } + + c.logger.Info("sessions.list received", "count", len(sessionItems)) + + // 4. Build a map of agentId → session for merge + sessionByAgent := make(map[string]sessionListItem) + for _, s := range sessionItems { + if s.AgentID != "" { + sessionByAgent[s.AgentID] = s + } + } + + // 5. Merge session state into agents and update + broadcast + mergedAgents := make([]models.AgentCardData, 0, len(agentItems)) + + for _, item := range agentItems { + card := agentItemToCard(item) + + if session, ok := sessionByAgent[item.ID]; ok { + // Merge session state + card.SessionKey = session.SessionKey + card.Status = mapSessionStatus(session.Status) + card.LastActivity = session.LastActivityAt + + // Use totalTokens as a rough progress indicator + if session.TotalTokens > 0 { + prog := min(session.TotalTokens/100, 100) // normalize to 0-100 + card.TaskProgress = &prog + } + } + + // Persist merged state + existing, err := c.agents.Get(ctx, card.ID) + if err == nil && existing.Status != card.Status { + status := card.Status + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + Status: &status, + }) + if updateErr != nil { + c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr) + } + } + + mergedAgents = append(mergedAgents, card) + } + + // 6. Broadcast the full merged fleet + c.broker.Broadcast("fleet.update", mergedAgents) + c.logger.Info("initial sync complete", "agents", len(mergedAgents)) + + return nil +} + +// mapSessionStatus converts a gateway session status string to an AgentStatus. +// - "running" / "streaming" → active +// - "error" → error +// - "done" / "" / other → idle +func mapSessionStatus(status string) models.AgentStatus { + switch status { + case "running", "streaming": + return models.AgentStatusActive + case "error": + return models.AgentStatusError + default: + return models.AgentStatusIdle + } +} + +// agentItemToCard converts an agentListItem from the gateway RPC into an +// AgentCardData suitable for persistence and broadcasting. +func agentItemToCard(item agentListItem) models.AgentCardData { + role := item.Role + if role == "" { + role = "agent" + } + channel := item.Channel + if channel == "" { + channel = "unknown" + } + name := item.Name + if name == "" { + name = item.ID + } + + return models.AgentCardData{ + ID: item.ID, + DisplayName: name, + Role: role, + Status: models.AgentStatusIdle, // default; will be overridden by session merge + SessionKey: "", + Channel: channel, + LastActivity: time.Now().UTC().Format(time.RFC3339), + } +} \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go new file mode 100644 index 0000000..322d551 --- /dev/null +++ b/go-backend/internal/gateway/wsclient.go @@ -0,0 +1,460 @@ +// Package gateway provides WebSocket client integration with the OpenClaw +// gateway using WS protocol v3. The WSClient handles connection, handshake, +// frame routing, request/response correlation, and automatic reconnection +// with exponential backoff. +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository" + + "github.com/gorilla/websocket" + "github.com/google/uuid" +) + +// WSConfig holds WebSocket client configuration, typically loaded from +// environment variables. AuthToken must be set to a valid OpenClaw gateway +// operator token. +type WSConfig struct { + URL string // e.g. "ws://host.docker.internal:18789/" + AuthToken string // from OPENCLAW_GATEWAY_TOKEN +} + +// DefaultWSConfig returns sensible defaults for local development. +func DefaultWSConfig() WSConfig { + return WSConfig{ + URL: "ws://localhost:18789/", + AuthToken: "", + } +} + +// eventHandler is a callback invoked when a named event arrives from the +// gateway. +type eventHandler func(json.RawMessage) + +// WSClient connects to the OpenClaw gateway over WebSocket, completes the +// v3 handshake, routes incoming frames, and automatically reconnects on +// disconnect with exponential backoff. +type WSClient struct { + config WSConfig + conn *websocket.Conn + connMu sync.Mutex // protects conn for writes + pending map[string]chan<- json.RawMessage + mu sync.Mutex // protects pending and handlers + agents repository.AgentRepo + broker *handler.Broker + logger *slog.Logger + + handlers map[string][]eventHandler + connId string // set after successful hello-ok + restClient *Client // optional REST client to notify on WS ready + wsReadyOnce sync.Once // ensures MarkWSReady close is one-shot +} + +// NewWSClient returns a WSClient wired to the given repository and broker. +func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Broker, logger *slog.Logger) *WSClient { + if logger == nil { + logger = slog.Default() + } + return &WSClient{ + config: cfg, + pending: make(map[string]chan<- json.RawMessage), + agents: agents, + broker: broker, + logger: logger, + handlers: make(map[string][]eventHandler), + } +} + +// SetRESTClient wires the REST fallback client so the WS client can notify +// it when the WS connection is ready. Call this before Start. +func (c *WSClient) SetRESTClient(rest *Client) { + c.restClient = rest +} + +// OnEvent registers a handler for the given event name. Handlers are called +// when an incoming frame with type "event" and matching event name is +// received. This is safe to call before Start. +func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) { + c.mu.Lock() + defer c.mu.Unlock() + c.handlers[event] = append(c.handlers[event], handler) +} + +// ── Frame types ────────────────────────────────────────────────────────── + +// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol. +type wsFrame struct { + Type string `json:"type"` // "req", "res", "event" + ID string `json:"id,omitempty"` // request/response correlation + Method string `json:"method,omitempty"` // method name (req frames) + Event string `json:"event,omitempty"` // event name (event frames) + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *wsError `json:"error,omitempty"` +} + +// wsError represents an error in a response frame. +type wsError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// connectRequest builds the initial connect handshake payload. +type connectRequest struct { + MinProtocol int `json:"minProtocol"` + MaxProtocol int `json:"maxProtocol"` + Client connectClientInfo `json:"client"` + Role string `json:"role"` + Scopes []string `json:"scopes"` + Auth connectAuth `json:"auth"` +} + +type connectClientInfo struct { + ID string `json:"id"` + Version string `json:"version"` + Platform string `json:"platform"` + Mode string `json:"mode"` +} + +type connectAuth struct { + Token string `json:"token"` +} + +// helloOKResponse represents the expected response to a successful connect. +type helloOKResponse struct { + ConnID string `json:"connId"` + Features struct { + Methods []string `json:"methods"` + Events []string `json:"events"` + } `json:"features"` +} + +// ── Start loop ─────────────────────────────────────────────────────────── + +// Start connects to the gateway, completes the handshake, and begins the +// read loop. On disconnect it reconnects with exponential backoff. On +// ctx cancellation it performs a clean shutdown. +func (c *WSClient) Start(ctx context.Context) { + initialBackoff := 1 * time.Second + maxBackoff := 30 * time.Second + backoff := initialBackoff + + for { + err := c.connectAndRun(ctx) + if err != nil { + if ctx.Err() != nil { + c.logger.Info("ws client stopped (context cancelled)") + return + } + c.logger.Warn("ws client disconnected, reconnecting", + "error", err, + "backoff", backoff) + } else { + // Reset backoff on successful connect+run completion + backoff = initialBackoff + } + + select { + case <-ctx.Done(): + c.logger.Info("ws client stopped during backoff (context cancelled)") + return + case <-time.After(backoff): + // Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + +// connectAndRun dials the gateway, completes the handshake, and runs the +// read loop until an error occurs or ctx is cancelled. +func (c *WSClient) connectAndRun(ctx context.Context) error { + c.logger.Info("ws client connecting", "url", c.config.URL) + + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + conn, _, err := dialer.DialContext(ctx, c.config.URL, nil) + if err != nil { + return fmt.Errorf("dial failed: %w", err) + } + + c.connMu.Lock() + c.conn = conn + c.connMu.Unlock() + + // When context is cancelled, close the conn to unblock ReadJSON in readLoop. + go func() { + <-ctx.Done() + c.connMu.Lock() + if c.conn != nil { + c.conn.Close() + } + c.connMu.Unlock() + }() + + defer func() { + conn.Close() + }() + + // Step 1: Read the connect.challenge frame + if err := c.readChallenge(conn); err != nil { + return fmt.Errorf("handshake challenge: %w", err) + } + + // Step 2: Send connect request + helloOK, err := c.sendConnect(conn) + if err != nil { + return fmt.Errorf("handshake connect: %w", err) + } + + c.logger.Info("ws client handshake complete", + "connId", helloOK.ConnID, + "methods", helloOK.Features.Methods, + "events", helloOK.Features.Events) + + // Store connId for reference + c.connMu.Lock() + c.connId = helloOK.ConnID + c.connMu.Unlock() + + // Notify REST client that WS is live so it stands down + if c.restClient != nil { + c.restClient.MarkWSReady() + c.logger.Info("ws client notified REST fallback to stand down") + } + + // Reset wsReadyOnce so MarkWSReady can fire again after a reconnect + c.wsReadyOnce = sync.Once{} + + // Step 2b: Initial sync — fetch agents + sessions from gateway + if err := c.initialSync(ctx); err != nil { + c.logger.Warn("initial sync failed, will continue with read loop", "error", err) + } + + // Step 2c: Register live event handlers + c.registerEventHandlers() + + // Step 3: Read loop + return c.readLoop(ctx, conn) +} + +// readChallenge reads the first frame from the gateway, which must be a +// connect.challenge event. +func (c *WSClient) readChallenge(conn *websocket.Conn) error { + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + return fmt.Errorf("read challenge: %w", err) + } + + if frame.Type != "event" || frame.Event != "connect.challenge" { + return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event) + } + + c.logger.Debug("received connect.challenge", "params", string(frame.Params)) + return nil +} + +// sendConnect sends the connect request and waits for the hello-ok response. +func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) { + reqID := uuid.New().String() + params := connectRequest{ + MinProtocol: 3, + MaxProtocol: 3, + Client: connectClientInfo{ + ID: "control-center", + Version: "1.0", + Platform: "server", + Mode: "operator", + }, + Role: "operator", + Scopes: []string{"operator.read"}, + Auth: connectAuth{ + Token: c.config.AuthToken, + }, + } + + paramsJSON, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal connect params: %w", err) + } + + reqFrame := wsFrame{ + Type: "req", + ID: reqID, + Method: "connect", + Params: paramsJSON, + } + + if err := conn.WriteJSON(reqFrame); err != nil { + return nil, fmt.Errorf("write connect request: %w", err) + } + + // Read response + var resFrame wsFrame + if err := conn.ReadJSON(&resFrame); err != nil { + return nil, fmt.Errorf("read connect response: %w", err) + } + + if resFrame.Error != nil { + return nil, fmt.Errorf("connect rejected: code=%d msg=%s", resFrame.Error.Code, resFrame.Error.Message) + } + + if resFrame.ID != reqID { + return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID) + } + + // Check for hello-ok method in the result + // The gateway responds with method "hello-ok" on success + var helloOK helloOKResponse + if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil { + return nil, fmt.Errorf("parse hello-ok: %w", err) + } + + return &helloOK, nil +} + +// readLoop continuously reads frames from the connection and routes them. +// It returns on read error or when the connection is closed by the ctx-done +// goroutine started in connectAndRun. +func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error { + for { + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + // Check if it's a close error + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + c.logger.Info("ws connection closed by server") + return nil + } + if websocket.IsUnexpectedCloseError(err) { + c.logger.Warn("ws connection unexpectedly closed", "error", err) + return err + } + return fmt.Errorf("read frame: %w", err) + } + + c.routeFrame(frame) + } +} + +// routeFrame dispatches a received frame to the appropriate handler. +func (c *WSClient) routeFrame(frame wsFrame) { + switch frame.Type { + case "res": + c.handleResponse(frame) + case "event": + c.handleEvent(frame) + default: + c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID) + } +} + +// handleResponse correlates a response frame to a pending request channel. +func (c *WSClient) handleResponse(frame wsFrame) { + c.mu.Lock() + ch, ok := c.pending[frame.ID] + if ok { + delete(c.pending, frame.ID) + } + c.mu.Unlock() + + if !ok { + c.logger.Warn("received response for unknown request", "id", frame.ID) + return + } + + if frame.Error != nil { + // Send nil to signal error; caller checks via Send return + ch <- nil + return + } + + ch <- frame.Result +} + +// handleEvent dispatches an event frame to registered handlers. +func (c *WSClient) handleEvent(frame wsFrame) { + c.mu.Lock() + handlers := c.handlers[frame.Event] + c.mu.Unlock() + + if len(handlers) == 0 { + c.logger.Debug("unhandled event", "event", frame.Event) + return + } + + for _, h := range handlers { + h(frame.Params) + } +} + +// ── Send ───────────────────────────────────────────────────────────────── + +// Send sends a JSON request to the gateway and returns the response payload. +// It is safe for concurrent use. Returns an error if the client is not +// connected. +func (c *WSClient) Send(method string, params any) (json.RawMessage, error) { + reqID := uuid.New().String() + + paramsJSON, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal params: %w", err) + } + + // Register pending response channel + respCh := make(chan json.RawMessage, 1) + c.mu.Lock() + c.pending[reqID] = respCh + c.mu.Unlock() + + defer func() { + c.mu.Lock() + delete(c.pending, reqID) + c.mu.Unlock() + }() + + // Build and send frame + frame := wsFrame{ + Type: "req", + ID: reqID, + Method: method, + Params: paramsJSON, + } + + c.connMu.Lock() + if c.conn == nil { + c.connMu.Unlock() + return nil, fmt.Errorf("gateway: not connected") + } + err = c.conn.WriteJSON(frame) + c.connMu.Unlock() + + if err != nil { + return nil, fmt.Errorf("write request: %w", err) + } + + // Wait for response with timeout + select { + case resp := <-respCh: + if resp == nil { + return nil, fmt.Errorf("gateway returned error for request %s", reqID) + } + return resp, nil + case <-time.After(30 * time.Second): + return nil, fmt.Errorf("request %s timed out", reqID) + } +} \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go new file mode 100644 index 0000000..92a1d66 --- /dev/null +++ b/go-backend/internal/gateway/wsclient_test.go @@ -0,0 +1,484 @@ +package gateway + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" + + "github.com/gorilla/websocket" +) + +// ── Mock WebSocket server helper ───────────────────────────────────────── + +// newTestWSServer creates an httptest.Server that upgrades to WebSocket and +// delegates each connection to handler. The server URL can be converted to +// a ws:// URL by replacing "http" with "ws". +func newTestWSServer(t *testing.T, handler func(conn *websocket.Conn)) *httptest.Server { + t.Helper() + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + handler(conn) + })) + return srv +} + +// wsURL converts an httptest.Server http URL to a ws URL. +func wsURL(srv *httptest.Server) string { + return "ws" + strings.TrimPrefix(srv.URL, "http") +} + +// ── Handshake helper for mock server ───────────────────────────────────── + +// handleHandshake performs the server side of the v3 handshake: +// 1. Send connect.challenge +// 2. Read connect request +// 3. Send hello-ok response +// +// Returns the connect request frame for inspection. +func handleHandshake(t *testing.T, conn *websocket.Conn) map[string]any { + t.Helper() + + // 1. Send connect.challenge + challenge := map[string]any{ + "type": "event", + "event": "connect.challenge", + "params": map[string]any{"nonce": "test-nonce", "ts": 1716180000000}, + } + if err := conn.WriteJSON(challenge); err != nil { + t.Fatalf("server: write challenge: %v", err) + } + + // 2. Read connect request + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + t.Fatalf("server: read connect request: %v", err) + } + + if req["method"] != "connect" { + t.Fatalf("server: expected method=connect, got %v", req["method"]) + } + + // 3. Send hello-ok response + // Note: helloOKResponse expects ConnID at the top level of the result, + // matching the WSClient's JSON struct tags. + result := map[string]any{ + "type": "hello-ok", + "protocol": 3, + "connId": "test-conn-123", + "features": map[string]any{"methods": []string{}, "events": []string{}}, + "auth": map[string]any{"role": "operator", "scopes": []string{"operator.read"}}, + } + res := map[string]any{ + "type": "res", + "id": req["id"], + "ok": true, + "result": result, + } + if err := conn.WriteJSON(res); err != nil { + t.Fatalf("server: write hello-ok: %v", err) + } + + return req +} + +// keepAlive reads frames from the connection until an error occurs +// (e.g., the client disconnects). Used as the default "do nothing" +// server loop after handshake. +func keepAlive(conn *websocket.Conn) { + for { + var m map[string]any + if err := conn.ReadJSON(&m); err != nil { + break + } + } +} + +// ── 1. Test: Full handshake ────────────────────────────────────────────── + +func TestWSClient_Handshake(t *testing.T) { + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + keepAlive(conn) + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait briefly for handshake to complete + time.Sleep(200 * time.Millisecond) + + // Verify connId was set + client.connMu.Lock() + connID := client.connId + client.connMu.Unlock() + + if connID != "test-conn-123" { + t.Errorf("expected connId 'test-conn-123', got %q", connID) + } + + cancel() + select { + case <-done: + // Client exited cleanly + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down after context cancellation") + } +} + +// ── 2. Test: Send() with response matching ─────────────────────────────── + +func TestWSClient_Send(t *testing.T) { + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + + // Read RPC requests and respond to each + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + var result any + switch method { + case "agents.list": + result = map[string]any{ + "agents": []map[string]any{ + {"id": "otto", "name": "Otto"}, + }, + } + default: + result = map[string]any{} + } + + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": result, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go client.Start(ctx) + + // Give the client time to complete handshake + time.Sleep(300 * time.Millisecond) + + resp, err := client.Send("agents.list", nil) + if err != nil { + t.Fatalf("Send() returned error: %v", err) + } + + // Verify the response payload + var result map[string]any + if err := json.Unmarshal(resp, &result); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + + agents, ok := result["agents"].([]any) + if !ok || len(agents) != 1 { + t.Errorf("expected 1 agent in response, got %v", result) + } + + cancel() +} + +// ── 3. Test: Event handler routing ─────────────────────────────────────── + +func TestWSClient_EventRouting(t *testing.T) { + eventReceived := make(chan json.RawMessage, 1) + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + + // After handshake, send a test event + evt := map[string]any{ + "type": "event", + "event": "test.event", + "params": map[string]any{"greeting": "hello from server"}, + } + if err := conn.WriteJSON(evt); err != nil { + t.Logf("server: write event: %v", err) + return + } + + keepAlive(conn) + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + // Register event handler BEFORE starting the client + client.OnEvent("test.event", func(payload json.RawMessage) { + eventReceived <- payload + }) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go client.Start(ctx) + + // Wait for the event handler to fire + select { + case payload := <-eventReceived: + var data map[string]any + if err := json.Unmarshal(payload, &data); err != nil { + t.Fatalf("unmarshal event payload: %v", err) + } + if greeting, _ := data["greeting"].(string); greeting != "hello from server" { + t.Errorf("expected greeting 'hello from server', got %q", greeting) + } + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for event handler to fire") + } + + cancel() +} + +// ── 4. Test: Concurrent Send ───────────────────────────────────────────── + +func TestWSClient_ConcurrentSend(t *testing.T) { + var reqCount atomic.Int32 + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + + // Read RPC requests and respond to each + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + n := reqCount.Add(1) + + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": map[string]any{"index": n, "method": req["method"]}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + go client.Start(ctx) + + // Give the client time to complete handshake + time.Sleep(300 * time.Millisecond) + + // Fire 3 concurrent Send() calls + type sendResult struct { + method string + payload json.RawMessage + err error + } + results := make(chan sendResult, 3) + + methods := []string{"agents.list", "sessions.list", "agents.config"} + for _, method := range methods { + go func(m string) { + resp, err := client.Send(m, nil) + results <- sendResult{method: m, payload: resp, err: err} + }(method) + } + + // Collect all results + for i := 0; i < 3; i++ { + select { + case r := <-results: + if r.err != nil { + t.Errorf("Send(%q) returned error: %v", r.method, r.err) + continue + } + var result map[string]any + if err := json.Unmarshal(r.payload, &result); err != nil { + t.Errorf("Send(%q) unmarshal error: %v", r.method, err) + continue + } + gotMethod, _ := result["method"].(string) + if gotMethod != r.method { + t.Errorf("Send(%q) got response for %q (mismatched)", r.method, gotMethod) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for concurrent Send results") + } + } + + cancel() +} + +// ── 5. Test: Clean shutdown ────────────────────────────────────────────── + +func TestWSClient_CleanShutdown(t *testing.T) { + srv := newTestWSServer(t, func(conn *websocket.Conn) { + handleHandshake(t, conn) + keepAlive(conn) + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Let the client connect and complete handshake + time.Sleep(200 * time.Millisecond) + + // Cancel context — should trigger clean shutdown + cancel() + + select { + case <-done: + // Client exited cleanly — pass + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly within timeout") + } +} + +// ── Pure utility tests (from CUB-205) ───────────────────────────────────── + +func TestMapSessionStatus(t *testing.T) { + tests := []struct { + input string + expected models.AgentStatus + }{ + {"running", models.AgentStatusActive}, + {"streaming", models.AgentStatusActive}, + {"done", models.AgentStatusIdle}, + {"error", models.AgentStatusError}, + {"", models.AgentStatusIdle}, + {"garbage", models.AgentStatusIdle}, + } + for _, tt := range tests { + result := mapSessionStatus(tt.input) + if result != tt.expected { + t.Errorf("mapSessionStatus(%q) = %q, want %q", tt.input, result, tt.expected) + } + } +} + +func TestAgentItemToCard(t *testing.T) { + t.Run("full fields", func(t *testing.T) { + item := agentListItem{ + ID: "dex", + Name: "Dex", + Role: "backend", + Channel: "telegram", + } + card := agentItemToCard(item) + if card.ID != "dex" { + t.Errorf("ID = %q, want %q", card.ID, "dex") + } + if card.DisplayName != "Dex" { + t.Errorf("DisplayName = %q, want %q", card.DisplayName, "Dex") + } + if card.Role != "backend" { + t.Errorf("Role = %q, want %q", card.Role, "backend") + } + if card.Channel != "telegram" { + t.Errorf("Channel = %q, want %q", card.Channel, "telegram") + } + if card.Status != models.AgentStatusIdle { + t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle) + } + }) + + t.Run("empty fields use defaults", func(t *testing.T) { + item := agentListItem{ + ID: "otto", + } + card := agentItemToCard(item) + if card.ID != "otto" { + t.Errorf("ID = %q, want %q", card.ID, "otto") + } + if card.DisplayName != "otto" { + t.Errorf("DisplayName = %q, want %q (should fallback to ID)", card.DisplayName, "otto") + } + if card.Role != "agent" { + t.Errorf("Role = %q, want %q (default)", card.Role, "agent") + } + if card.Channel != "unknown" { + t.Errorf("Channel = %q, want %q (per Grimm requirement)", card.Channel, "unknown") + } + if card.Status != models.AgentStatusIdle { + t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle) + } + }) + + t.Run("empty name falls back to ID", func(t *testing.T) { + item := agentListItem{ + ID: "hex", + Name: "", + Role: "database", + } + card := agentItemToCard(item) + if card.DisplayName != "hex" { + t.Errorf("DisplayName = %q, want %q (ID fallback)", card.DisplayName, "hex") + } + }) +} + +func TestStrPtr(t *testing.T) { + s := "hello" + p := strPtr(s) + if p == nil { + t.Fatal("strPtr returned nil") + } + if *p != s { + t.Errorf("strPtr(%q) = %q, want %q", s, *p, s) + } + + empty := "" + ep := strPtr(empty) + if *ep != empty { + t.Errorf("strPtr(empty) = %q, want %q", *ep, empty) + } +} \ No newline at end of file diff --git a/go-backend/internal/models/models.go b/go-backend/internal/models/models.go index 8480b41..2844cff 100644 --- a/go-backend/internal/models/models.go +++ b/go-backend/internal/models/models.go @@ -63,12 +63,15 @@ type CreateAgentRequest struct { // UpdateAgentRequest is the payload for PUT /api/agents/{id}. type UpdateAgentRequest struct { - Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"` - CurrentTask *string `json:"currentTask,omitempty"` - TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"` - TaskElapsed *string `json:"taskElapsed,omitempty"` - Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"` - ErrorMessage *string `json:"errorMessage,omitempty"` + Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"` + DisplayName *string `json:"displayName,omitempty"` + Role *string `json:"role,omitempty"` + LastActivityAt *string `json:"lastActivityAt,omitempty"` + CurrentTask *string `json:"currentTask,omitempty"` + TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"` + TaskElapsed *string `json:"taskElapsed,omitempty"` + Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"` + ErrorMessage *string `json:"errorMessage,omitempty"` } // AgentStatusHistoryEntry represents a point-in-time status change for an agent. diff --git a/reference/CONTROL_CENTER_CONTEXT.md b/reference/CONTROL_CENTER_CONTEXT.md new file mode 100644 index 0000000..81bee91 --- /dev/null +++ b/reference/CONTROL_CENTER_CONTEXT.md @@ -0,0 +1,46 @@ +# Control Center — Architecture Context + +## Current State + +The Control Center backend uses a **dual-path gateway client** architecture: + +- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff. +- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup. + +## Live Gateway Connection + +### Startup Sequence +1. Both WS client and REST client start concurrently +2. REST client waits 30s for WS readiness signal (`wsReady` channel) +3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down") +4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling") +5. If no WS client configured → REST client polls immediately + +### WebSocket Client (Primary) +- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN` +- Protocol: v3 handshake (challenge → connect → hello-ok) +- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update` +- Live events: `sessions.changed`, `presence`, `agent.config` +- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max) + +### REST Poller (Fallback) +- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s) +- Only used when WS is unavailable +- Polls the `/api/agents` endpoint and syncs agent status changes + +### Wiring +``` +main.go + ├── wsClient = NewWSClient(...) + ├── restClient = NewClient(...) + ├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready + ├── restClient.SetWSClient(wsClient) // REST defers to WS + ├── go wsClient.Start(ctx) // primary + └── go restClient.Start(ctx) // fallback (waits for WS) +``` + +## Key Design Decisions +- **Push over poll**: WS is preferred for real-time updates; REST is a safety net +- **DB first, then SSE**: All event handlers persist to DB before broadcasting +- **Graceful degradation**: System works without WS; REST provides basic functionality +- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called \ No newline at end of file