Compare commits

..

7 Commits

Author SHA1 Message Date
Joshua King
49b959aee5 Add CI Docker image with Go 1.23 + Node 22 pre-installed, update workflow to use go-react label
Some checks are pending
Dev Build & Deploy / test-and-build (push) Waiting to run
Dev Build & Deploy / docker-build-push (push) Blocked by required conditions
2026-05-20 08:47:16 -04:00
Joshua King
ae37d79aa8 Switch to ubuntu-dotnet runner label to bypass /var/run symlink issue
Some checks failed
Dev Build & Deploy / test-and-build (push) Has been cancelled
Dev Build & Deploy / docker-build-push (push) Has been cancelled
2026-05-20 08:39:05 -04:00
Joshua King
8fb4183abe Add container spec to fix /var/run symlink path escape error
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 7s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-20 08:30:49 -04:00
Joshua King
ee6ad10db9 Replace setup-go/setup-node actions with manual install for Gitea runner compatibility
Some checks failed
Dev Build & Deploy / docker-build-push (push) Blocked by required conditions
Dev Build & Deploy / test-and-build (push) Failing after 14m18s
2026-05-20 08:10:13 -04:00
Joshua King
5f42a3be18 Rename GITEA_TOKEN secret to ACCESS_TOKEN
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 4s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-20 08:08:21 -04:00
Joshua King
0e452941dd Remove deploy-dev job - deployment handled via docker-compose
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 0s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-20 08:06:52 -04:00
Joshua King
87cb517623 Update CI workflow to match Go+React stack with Docker registry push
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 1s
Dev Build & Deploy / docker-build-push (push) Has been skipped
Dev Build & Deploy / deploy-dev (push) Has been skipped
2026-05-20 08:04:50 -04:00
19 changed files with 87 additions and 2566 deletions

View File

@@ -13,14 +13,9 @@ ENVIRONMENT=development
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
# Gateway (OpenClaw) connection
# WebSocket gateway config (primary path)
WS_GATEWAY_URL=ws://host.docker.internal:18789/
# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment)
GATEWAY_TOKEN=
# REST poller config (fallback, only used if WS fails to connect)
GATEWAY_URL=http://host.docker.internal:18789/api/agents
# Polling interval for agent state updates (fallback only)
# URL to the OpenClaw gateway API for polling agent states
GATEWAY_URL=http://localhost:18789/api/agents
# Polling interval for agent state updates
GATEWAY_POLL_INTERVAL=5s
# ── Frontend Variables (via Vite) ───────────────────────────────────────

View File

@@ -1,85 +0,0 @@
name: Build (Dev)
on:
push:
branches: [dev]
pull_request:
branches: [dev]
workflow_dispatch:
env:
GO_VERSION: "1.23"
NODE_VERSION: "22"
BINARY_NAME: server
jobs:
build-go-backend:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: Test Go backend
working-directory: go-backend
run: go test ./...
- name: Build Go binary
working-directory: go-backend
run: |
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
go build -ldflags="-s -w -X main.version=${GITHUB_SHA:0:8}" \
-o ${{ env.BINARY_NAME }} ./cmd/server
- name: Upload Go binary
uses: actions/upload-artifact@v4
with:
name: go-backend-binary
path: go-backend/${{ env.BINARY_NAME }}
retention-days: 3
build-frontend:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Node
uses: actions/setup-node@v4
with:
node-version: ${{ env.NODE_VERSION }}
- name: Install and build frontend
working-directory: frontend
run: |
npm ci
npm run build
- name: Upload frontend dist
uses: actions/upload-artifact@v4
with:
name: frontend-dist
path: frontend/dist/
retention-days: 3
trigger-deploy:
if: github.event_name == 'push'
needs: [build-go-backend, build-frontend]
runs-on: ubuntu-latest
steps:
- name: Trigger deploy workflow
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.repos.createDispatchEvent({
owner: context.repo.owner,
repo: context.repo.repo,
event_type: 'dev-build-success',
client_payload: {
sha: context.sha,
ref: context.ref
}
})

View File

@@ -1,126 +0,0 @@
name: Deploy (Dev)
on:
repository_dispatch:
types:
- dev-build-success
workflow_dispatch:
env:
BINARY_NAME: server
DEV_HOST: ${{ secrets.DEV_HOST }}
DEV_USER: ${{ secrets.DEV_USER }}
DEPLOY_BINARY_PATH: /opt/control-center/server
DEPLOY_FRONTEND_PATH: /usr/share/nginx/html
SERVICE_NAME: control-center-server
FRONTEND_SERVICE: nginx
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Download Go binary
uses: actions/download-artifact@v4
with:
name: go-backend-binary
- name: Download frontend dist
uses: actions/download-artifact@v4
with:
name: frontend-dist
path: dist
- name: Make binary executable
run: chmod +x ${{ env.BINARY_NAME }}
- name: Generate deploy script
run: |
cat > deploy.sh <<'SCRIPT'
#!/usr/bin/env bash
set -euo pipefail
BINARY="${1}"
FRONTEND_DIST="${2:-dist}"
BINARY_PATH="${3:-/opt/control-center/server}"
FRONTEND_PATH="${4:-/usr/share/nginx/html}"
BINARY_SERVICE="${5:-control-center-server}"
FRONTEND_SERVICE="${6:-nginx}"
TIMESTAMP=$(date +%Y%m%d%H%M%S)
BACKUP="${BINARY_PATH}.${TIMESTAMP}.bak"
echo "=== deploy backend ==="
if [ -f "$BINARY_PATH" ]; then
echo "backing up current binary"
cp "$BINARY_PATH" "$BACKUP"
fi
echo "installing new binary"
cp "$BINARY" "$BINARY_PATH"
chmod +x "$BINARY_PATH"
echo "restarting service"
systemctl reload-or-restart "$BINARY_SERVICE" || systemctl restart "$BINARY_SERVICE"
sleep 3
if ! systemctl is-active --quiet "$BINARY_SERVICE"; then
echo "FAILED: $BINARY_SERVICE did not start — rolling back"
if [ -f "$BACKUP" ]; then
cp "$BACKUP" "$BINARY_PATH"
systemctl restart "$BINARY_SERVICE"
fi
exit 1
fi
echo "backend deploy ok — keeping last 3 backups"
ls -t "${BINARY_PATH}."*.bak 2>/dev/null | tail -n +4 | xargs -r rm -f
echo "=== deploy frontend ==="
if [ -d "$FRONTEND_DIST" ] && [ -n "$(ls -A "$FRONTEND_DIST" 2>/dev/null)" ]; then
rsync -a --delete "$FRONTEND_DIST/" "$FRONTEND_PATH/"
systemctl reload "$FRONTEND_SERVICE" 2>/dev/null ||:
echo "frontend deploy ok"
fi
echo "=== deploy complete ==="
SCRIPT
chmod +x deploy.sh
- name: Copy artifacts to dev server
uses: appleboy/scp-action@v0.1.7
with:
host: ${{ env.DEV_HOST }}
username: ${{ env.DEV_USER }}
key: ${{ secrets.DEV_SSH_KEY }}
source: "${{ env.BINARY_NAME }},deploy.sh,dist"
target: "/tmp/control-center-deploy"
- name: Execute deploy on dev server
uses: appleboy/ssh-action@v1
with:
host: ${{ env.DEV_HOST }}
username: ${{ env.DEV_USER }}
key: ${{ secrets.DEV_SSH_KEY }}
script: |
set -euo pipefail
cd /tmp/control-center-deploy
sudo ./deploy.sh \
"${{ env.BINARY_NAME }}" \
"dist" \
"${{ env.DEPLOY_BINARY_PATH }}" \
"${{ env.DEPLOY_FRONTEND_PATH }}" \
"${{ env.SERVICE_NAME }}" \
"${{ env.FRONTEND_SERVICE }}"
rm -rf /tmp/control-center-deploy
- name: Notify on failure
if: failure()
uses: appleboy/ssh-action@v1
with:
host: ${{ env.DEV_HOST }}
username: ${{ env.DEV_USER }}
key: ${{ secrets.DEV_SSH_KEY }}
script: |
echo "deploy failed — commit ${{ github.sha }}" > /tmp/control-center-deploy-failure.log

View File

@@ -1,4 +1,4 @@
name: Dev Build
name: Dev Build & Deploy
on:
pull_request:
@@ -6,45 +6,70 @@ on:
push:
branches: [dev]
env:
REGISTRY: code.cubecraftcreations.com
BACKEND_IMAGE: ${{ gitea.repository }}/backend
FRONTEND_IMAGE: ${{ gitea.repository }}/frontend
jobs:
build-test:
runs-on: ubuntu-dotnet
test-and-build:
runs-on: go-react
steps:
- uses: actions/checkout@v4
- name: Restore backend
run: dotnet restore
- name: Run backend tests
run: go test ./...
working-directory: ./go-backend
- name: Build backend
run: dotnet build --no-restore --configuration Release
- name: Test backend
run: dotnet test --no-build --configuration Release
- name: Setup Node
uses: actions/setup-node@v4
with:
node-version: "24"
run: go build -ldflags="-w -s" -o /tmp/server ./cmd/server
working-directory: ./go-backend
- name: Install frontend deps
run: npm ci
working-directory: ./frontend
- name: Lint frontend
run: npm run lint
working-directory: ./frontend
- name: Build frontend
run: npm run build
working-directory: ./frontend
deploy-dev:
needs: build-test
docker-build-push:
needs: test-and-build
if: gitea.event_name == 'push'
runs-on: ubuntu-latest
runs-on: go-react
steps:
- name: Deploy dev
run: |
echo "${{ secrets.DEV_DEPLOY_SSH_KEY }}" > /tmp/dev_key
chmod 600 /tmp/dev_key
ssh -i /tmp/dev_key -o StrictHostKeyChecking=no \
${{ secrets.DEV_DEPLOY_USER }}@${{ secrets.DEV_DEPLOY_HOST }} \
"${{ secrets.DEV_DEPLOY_PATH }}/deploy.sh"
- uses: actions/checkout@v4
- name: Login to Gitea Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ gitea.actor }}
password: ${{ secrets.ACCESS_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build & push backend image
uses: docker/build-push-action@v6
with:
context: ./go-backend
push: true
tags: |
${{ env.REGISTRY }}/${{ env.BACKEND_IMAGE }}:dev
${{ env.REGISTRY }}/${{ env.BACKEND_IMAGE }}:${{ gitea.sha }}
- name: Build & push frontend image
uses: docker/build-push-action@v6
with:
context: ./frontend
push: true
tags: |
${{ env.REGISTRY }}/${{ env.FRONTEND_IMAGE }}:dev
${{ env.REGISTRY }}/${{ env.FRONTEND_IMAGE }}:${{ gitea.sha }}

11
ci-image/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM catthehacker/ubuntu:act-latest
# Install Go 1.23
RUN curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
# Install Node 22
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
ENV PATH="/usr/local/go/bin:${PATH}"

View File

@@ -16,8 +16,6 @@ services:
- ENVIRONMENT=production
- PORT=8080
- GATEWAY_URL=http://host.docker.internal:18789/api/agents
- WS_GATEWAY_URL=ws://host.docker.internal:18789/
- GATEWAY_TOKEN=${GATEWAY_TOKEN:-}
depends_on:
db:
condition: service_healthy

View File

@@ -63,29 +63,15 @@ func main() {
Broker: broker,
})
// ── Gateway clients (WS primary, REST fallback) ───────────────────
// WS gateway client (primary path)
wsClient := gateway.NewWSClient(gateway.WSConfig{
URL: cfg.WSGatewayURL,
AuthToken: cfg.WSGatewayToken,
}, agentRepo, broker, logger)
// REST gateway client (fallback — only polls if WS fails to connect)
// ── Gateway client (polls OpenClaw for agent states) ───────────────────
gwClient := gateway.NewClient(gateway.Config{
URL: cfg.GatewayRestURL,
PollInterval: cfg.GatewayRestPollInterval,
URL: cfg.GatewayURL,
PollInterval: cfg.GatewayPollInterval,
}, agentRepo, broker)
// Wire them together: REST defers to WS when WS is connected
wsClient.SetRESTClient(gwClient)
gwClient.SetWSClient(wsClient)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start WS client first (primary)
go wsClient.Start(ctx)
// Start REST client (will wait for WS, then stand down or fall back)
go gwClient.Start(ctx)
// ── Server ─────────────────────────────────────────────────────────────

View File

@@ -7,7 +7,6 @@ require (
github.com/go-chi/cors v1.2.1
github.com/go-playground/validator/v10 v10.24.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/jackc/pgx/v5 v5.7.2
)

View File

@@ -17,8 +17,6 @@ github.com/go-playground/validator/v10 v10.24.0 h1:KHQckvo8G6hlWnrPX4NJJ+aBfWNAE
github.com/go-playground/validator/v10 v10.24.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=

View File

@@ -15,10 +15,8 @@ type Config struct {
CORSOrigin string
LogLevel string
Environment string
GatewayRestURL string
GatewayRestPollInterval time.Duration
WSGatewayURL string
WSGatewayToken string
GatewayURL string
GatewayPollInterval time.Duration
}
// Load reads configuration from environment variables, applying defaults where
@@ -30,10 +28,8 @@ func Load() *Config {
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"),
GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
}
}

View File

@@ -9,7 +9,6 @@ import (
"fmt"
"log/slog"
"net/http"
"sync"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
@@ -18,18 +17,13 @@ import (
)
// Client polls the OpenClaw gateway for agent status and keeps the database
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect after initial backoff retries.
// and SSE broker in sync.
type Client struct {
url string
pollInterval time.Duration
httpClient *http.Client
agents repository.AgentRepo
broker *handler.Broker
wsClient *WSClient // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race
}
// Config holds gateway client configuration, typically loaded from environment.
@@ -54,64 +48,22 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker)
httpClient: &http.Client{Timeout: 10 * time.Second},
agents: agents,
broker: broker,
wsReady: make(chan struct{}),
}
}
// SetWSClient wires the WebSocket client so the REST poller knows to defer
// to it. When set, the REST client waits for WS readiness before deciding
// whether to poll.
func (c *Client) SetWSClient(ws *WSClient) {
c.wsClient = ws
}
// MarkWSReady signals that the WS connection is live and the REST poller
// should stand down. Called by WSClient after a successful handshake.
func (c *Client) MarkWSReady() {
c.wsReadyOnce.Do(func() {
close(c.wsReady)
})
}
// Start begins the gateway client loop. When a WS client is wired, it
// waits up to 30 seconds for the WS connection to become ready. If WS
// connects, the REST poller stands down and only logs periodically. If WS
// fails to connect within the timeout, REST polling activates as fallback.
// Start begins the polling loop. It runs until ctx is cancelled.
func (c *Client) Start(ctx context.Context) {
if c.wsClient != nil {
slog.Info("gateway client waiting for WS connection", "timeout", "30s")
select {
case <-c.wsReady:
slog.Info("gateway client using WS — REST poller standing down")
// WS is live; keep this goroutine alive but idle. If WS
// disconnects later, we could re-enter polling, but for now
// the WS client handles its own reconnection.
<-ctx.Done()
slog.Info("gateway client stopped (WS mode)")
return
case <-time.After(30 * time.Second):
slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling",
slog.Info("gateway client starting",
"url", c.url,
"pollInterval", c.pollInterval.String())
case <-ctx.Done():
slog.Info("gateway client stopped while waiting for WS")
return
}
} else {
slog.Info("gateway client using REST polling (no WS client configured)",
"url", c.url,
"pollInterval", c.pollInterval.String())
}
// REST fallback polling
ticker := time.NewTicker(c.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("gateway client stopped (REST fallback)")
slog.Info("gateway client stopped")
return
case <-ticker.C:
c.poll(ctx)

View File

@@ -1,287 +0,0 @@
// Package gateway provides real-time event handlers for the Control Center
// WebSocket client. Handlers process gateway events (sessions.changed,
// presence, agent.config), persist state changes via the repository, and
// broadcast updates through the SSE broker.
//
// Rule: DB update first, then SSE broadcast. This keeps REST API responses
// consistent with SSE events.
package gateway
import (
"context"
"encoding/json"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
// ── Event payload types ──────────────────────────────────────────────────
// sessionChangedPayload represents a single session delta from a
// sessions.changed event.
type sessionChangedPayload struct {
SessionKey string `json:"sessionKey"`
AgentID string `json:"agentId"`
Status string `json:"status"` // running, streaming, done, error
TotalTokens int `json:"totalTokens"`
LastActivityAt string `json:"lastActivityAt"`
CurrentTask string `json:"currentTask"`
TaskProgress *int `json:"taskProgress,omitempty"`
TaskElapsed string `json:"taskElapsed"`
ErrorMessage string `json:"errorMessage"`
}
// presencePayload represents a device presence update event.
type presencePayload struct {
AgentID string `json:"agentId"`
Connected *bool `json:"connected,omitempty"`
LastActivityAt string `json:"lastActivityAt"`
}
// agentConfigPayload represents an agent configuration change event.
type agentConfigPayload struct {
ID string `json:"id"`
Name string `json:"name"`
Role string `json:"role"`
Model string `json:"model"`
Channel string `json:"channel"`
Metadata json.RawMessage `json:"metadata"`
}
// ── Handler registration ─────────────────────────────────────────────────
// registerEventHandlers sets up all live event handlers on the WSClient.
// Call this once after a successful handshake + initial sync.
func (c *WSClient) registerEventHandlers() {
if c.agents == nil || c.broker == nil {
c.logger.Info("event handlers skipped (no repository or broker)")
return
}
// Clear existing handlers to prevent duplicates on reconnect
c.mu.Lock()
c.handlers = make(map[string][]eventHandler)
c.mu.Unlock()
c.OnEvent("sessions.changed", c.handleSessionsChanged)
c.OnEvent("presence", c.handlePresence)
c.OnEvent("agent.config", c.handleAgentConfig)
c.logger.Info("event handlers registered",
"events", []string{"sessions.changed", "presence", "agent.config"})
}
// ── sessions.changed ─────────────────────────────────────────────────────
// handleSessionsChanged processes sessions.changed events from the gateway.
// The payload may be a single session object or an array of session deltas.
// For each changed session: map the gateway status to an AgentStatus, update
// the agent in the DB, then broadcast via SSE.
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
c.logger.Debug("handleSessionsChanged start", "payload", string(payload))
// Try array first, then single object
var deltas []sessionChangedPayload
if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 {
// Array of deltas
} else {
// Try single object
var single sessionChangedPayload
if err := json.Unmarshal(payload, &single); err != nil {
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
return
}
deltas = []sessionChangedPayload{single}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, d := range deltas {
if d.AgentID == "" {
c.logger.Debug("sessions.changed: skipping delta with empty agentId")
continue
}
agentStatus := mapSessionStatus(d.Status)
// Build partial update
update := models.UpdateAgentRequest{
Status: &agentStatus,
}
// Session key
if d.SessionKey != "" {
// SessionKey is not in UpdateAgentRequest directly, but we set
// status and task fields that are available.
}
// Current task
if d.CurrentTask != "" {
update.CurrentTask = &d.CurrentTask
}
// Task progress
if d.TaskProgress != nil {
update.TaskProgress = d.TaskProgress
} else if d.TotalTokens > 0 {
// Derive progress from token count as fallback
prog := min(d.TotalTokens/100, 100)
update.TaskProgress = &prog
}
// Task elapsed
if d.TaskElapsed != "" {
update.TaskElapsed = &d.TaskElapsed
}
// Error message
if d.ErrorMessage != "" {
update.ErrorMessage = &d.ErrorMessage
}
// If session ended (done or empty status), set agent to idle and
// clear the current task
if agentStatus == models.AgentStatusIdle {
emptyTask := ""
update.CurrentTask = &emptyTask
zeroProg := 0
update.TaskProgress = &zeroProg
}
// Update DB first
updated, err := c.agents.Update(ctx, d.AgentID, update)
if err != nil {
c.logger.Warn("sessions.changed: DB update failed",
"agentId", d.AgentID, "error", err)
continue
}
// Then broadcast
c.broker.Broadcast("agent.status", updated)
if d.TaskProgress != nil || d.CurrentTask != "" {
c.broker.Broadcast("agent.progress", updated)
}
c.logger.Debug("sessions.changed: agent updated",
"agentId", d.AgentID,
"status", string(agentStatus))
}
c.logger.Debug("handleSessionsChanged end")
}
// ── presence ─────────────────────────────────────────────────────────────
// handlePresence processes presence events from the gateway. Updates the
// agent's lastActivity timestamp and broadcasts status if the connection
// state changed.
func (c *WSClient) handlePresence(payload json.RawMessage) {
c.logger.Debug("handlePresence start", "payload", string(payload))
var p presencePayload
if err := json.Unmarshal(payload, &p); err != nil {
c.logger.Warn("presence: unparseable payload, skipping", "error", err)
return
}
if p.AgentID == "" {
c.logger.Debug("presence: skipping event with empty agentId")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// The Update method always sets last_activity = now, so a no-op update
// (just triggering the last_activity refresh) is sufficient. We send
// an empty-ish update — the repo always bumps last_activity.
// If connection state is reported, also update status.
update := models.UpdateAgentRequest{}
if p.Connected != nil && !*p.Connected {
// Device disconnected — set agent to idle
idle := models.AgentStatusIdle
update.Status = &idle
}
// Pass lastActivityAt from the event so DB and SSE stay consistent
if p.LastActivityAt != "" {
update.LastActivityAt = &p.LastActivityAt
}
// Update DB first
updated, err := c.agents.Update(ctx, p.AgentID, update)
if err != nil {
c.logger.Warn("presence: DB update failed",
"agentId", p.AgentID, "error", err)
return
}
// Then broadcast
c.broker.Broadcast("agent.status", updated)
c.logger.Debug("presence: agent updated",
"agentId", p.AgentID,
"connected", p.Connected)
}
// ── agent.config ─────────────────────────────────────────────────────────
// handleAgentConfig processes agent.config events from the gateway. Updates
// agent metadata (name, channel) in the DB and broadcasts a fleet.update
// with the full fleet snapshot.
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
c.logger.Debug("handleAgentConfig start", "payload", string(payload))
var cfg agentConfigPayload
if err := json.Unmarshal(payload, &cfg); err != nil {
c.logger.Warn("agent.config: unparseable payload, skipping", "error", err)
return
}
if cfg.ID == "" {
c.logger.Debug("agent.config: skipping event with empty id")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Build partial update with available fields.
update := models.UpdateAgentRequest{}
if cfg.Name != "" {
update.DisplayName = &cfg.Name
}
if cfg.Role != "" {
update.Role = &cfg.Role
}
if cfg.Channel != "" {
update.Channel = &cfg.Channel
}
// Update DB first
updated, err := c.agents.Update(ctx, cfg.ID, update)
if err != nil {
c.logger.Warn("agent.config: DB update failed",
"agentId", cfg.ID, "error", err)
return
}
// Then broadcast fleet snapshot
allAgents, err := c.agents.List(ctx, "")
if err != nil {
c.logger.Warn("agent.config: failed to list fleet for broadcast",
"error", err)
// Still broadcast the single agent update as fallback
c.broker.Broadcast("agent.status", updated)
return
}
c.broker.Broadcast("fleet.update", allAgents)
c.logger.Debug("agent.config: fleet updated",
"agentId", cfg.ID,
"name", cfg.Name)
}

View File

@@ -1,516 +0,0 @@
package gateway
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"testing"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
// ── Mock AgentRepo ────────────────────────────────────────────────────────
type mockAgentRepo struct {
mu sync.Mutex
agents map[string]models.AgentCardData
updateCalls []updateCall
}
type updateCall struct {
id string
req models.UpdateAgentRequest
}
func (m *mockAgentRepo) Get(_ context.Context, id string) (models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[id]
if !ok {
return models.AgentCardData{}, errNotFound
}
return a, nil
}
func (m *mockAgentRepo) Update(_ context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[id]
if !ok {
return models.AgentCardData{}, errNotFound
}
if req.Status != nil {
a.Status = *req.Status
}
if req.DisplayName != nil {
a.DisplayName = *req.DisplayName
}
if req.Role != nil {
a.Role = *req.Role
}
if req.Channel != nil {
a.Channel = *req.Channel
}
if req.CurrentTask != nil {
a.CurrentTask = req.CurrentTask
}
if req.TaskProgress != nil {
a.TaskProgress = req.TaskProgress
}
if req.TaskElapsed != nil {
a.TaskElapsed = req.TaskElapsed
}
if req.ErrorMessage != nil {
a.ErrorMessage = req.ErrorMessage
}
if req.LastActivityAt != nil {
a.LastActivity = *req.LastActivityAt
}
m.agents[id] = a
m.updateCalls = append(m.updateCalls, updateCall{id, req})
return a, nil
}
func (m *mockAgentRepo) Create(_ context.Context, a models.AgentCardData) error {
m.mu.Lock()
defer m.mu.Unlock()
m.agents[a.ID] = a
return nil
}
func (m *mockAgentRepo) List(_ context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
var result []models.AgentCardData
for _, a := range m.agents {
if statusFilter == "" || a.Status == statusFilter {
result = append(result, a)
}
}
return result, nil
}
func (m *mockAgentRepo) Delete(_ context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.agents, id)
return nil
}
func (m *mockAgentRepo) Count(_ context.Context) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.agents), nil
}
// errNotFound is returned by the mock repo when an agent is not found.
var errNotFound = fmt.Errorf("not found")
// ── Broadcast capture helper ───────────────────────────────────────────────
// broadcastCapture wraps a real Broker and captures all broadcasts
// via a subscribed channel. Use captured() to retrieve events that have
// been received so far. Call close() to unsubscribe when done.
type broadcastCapture struct {
broker *handler.Broker
ch chan handler.SSEEvent
}
func newBroadcastCapture(broker *handler.Broker) *broadcastCapture {
return &broadcastCapture{
broker: broker,
ch: broker.Subscribe(),
}
}
// captured drains all pending events from the subscription channel
// and returns them. This is synchronous — it only returns events that
// have already been sent to the channel.
func (bc *broadcastCapture) captured() []handler.SSEEvent {
var events []handler.SSEEvent
for {
select {
case evt := <-bc.ch:
events = append(events, evt)
default:
return events
}
}
}
func (bc *broadcastCapture) close() {
bc.broker.Unsubscribe(bc.ch)
}
// ── Test helpers ──────────────────────────────────────────────────────────
// newTestWSClient creates a WSClient wired to a mock repo and a real broker.
// Returns the client, the mock repo, and a broadcast capture.
func newTestWSClient() (*WSClient, *mockAgentRepo, *handler.Broker, *broadcastCapture) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
client := NewWSClient(WSConfig{}, repo, broker, slog.Default())
return client, repo, broker, capture
}
// ── Tests ─────────────────────────────────────────────────────────────────
func TestHandleSessionsChanged_Active(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Status: models.AgentStatusIdle,
}
payload := json.RawMessage(`{
"sessionKey": "s1",
"agentId": "otto",
"status": "running",
"totalTokens": 500,
"currentTask": "Orchestrating tasks"
}`)
client.handleSessionsChanged(payload)
// Verify: agent status updated to active
repo.mu.Lock()
agent := repo.agents["otto"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
if agent.Status != models.AgentStatusActive {
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusActive)
}
// Verify: update was called
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
if calls[0].id != "otto" {
t.Errorf("update call agentId = %q, want %q", calls[0].id, "otto")
}
// Verify: broker broadcast "agent.status"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandleSessionsChanged_Idle(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["dex"] = models.AgentCardData{
ID: "dex",
DisplayName: "Dex",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Writing API"),
}
payload := json.RawMessage(`{
"sessionKey": "s2",
"agentId": "dex",
"status": "done",
"totalTokens": 1000
}`)
client.handleSessionsChanged(payload)
repo.mu.Lock()
agent := repo.agents["dex"]
repo.mu.Unlock()
// Verify: agent goes idle
if agent.Status != models.AgentStatusIdle {
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusIdle)
}
// Verify: current task cleared (set to empty string)
if agent.CurrentTask != nil && *agent.CurrentTask != "" {
t.Errorf("current task = %q, want empty (cleared on idle)", *agent.CurrentTask)
}
// Verify: broker fires "agent.status"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandleSessionsChanged_ArrayPayload(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusIdle}
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
payload := json.RawMessage(`[
{"sessionKey":"s1","agentId":"otto","status":"running","totalTokens":100},
{"sessionKey":"s2","agentId":"dex","status":"streaming","totalTokens":200}
]`)
client.handleSessionsChanged(payload)
repo.mu.Lock()
otto := repo.agents["otto"]
dex := repo.agents["dex"]
repo.mu.Unlock()
if otto.Status != models.AgentStatusActive {
t.Errorf("otto status = %q, want active", otto.Status)
}
if dex.Status != models.AgentStatusActive {
t.Errorf("dex status = %q, want active", dex.Status)
}
// Both should produce broadcasts
events := capture.captured()
statusCount := 0
for _, evt := range events {
if evt.EventType == "agent.status" {
statusCount++
}
}
if statusCount < 2 {
t.Errorf("expected at least 2 agent.status broadcasts, got %d", statusCount)
}
}
func TestHandleSessionsChanged_SkipsEmptyAgentID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"sessionKey":"s1","agentId":"","status":"running"}`)
client.handleSessionsChanged(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
}
}
func TestHandleSessionsChanged_UnparseablePayload(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`not json at all`)
client.handleSessionsChanged(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for unparseable payload, got %d", len(events))
}
}
func TestHandlePresence(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["pip"] = models.AgentCardData{
ID: "pip",
DisplayName: "Pip",
Status: models.AgentStatusActive,
}
payload := json.RawMessage(`{
"agentId": "pip",
"connected": true,
"lastActivityAt": "2025-01-01T00:00:00Z"
}`)
client.handlePresence(payload)
repo.mu.Lock()
agent := repo.agents["pip"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
// Agent should still be active (connected=true doesn't change status)
if agent.Status != models.AgentStatusActive {
t.Errorf("agent status = %q, want active", agent.Status)
}
// Update should have been called (for lastActivityAt)
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
// Verify broadcast
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandlePresence_Disconnect(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["pip"] = models.AgentCardData{
ID: "pip",
DisplayName: "Pip",
Status: models.AgentStatusActive,
}
payload := json.RawMessage(`{
"agentId": "pip",
"connected": false
}`)
client.handlePresence(payload)
repo.mu.Lock()
agent := repo.agents["pip"]
repo.mu.Unlock()
// Agent should go idle on disconnect
if agent.Status != models.AgentStatusIdle {
t.Errorf("agent status = %q, want idle after disconnect", agent.Status)
}
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status' on disconnect")
}
}
func TestHandlePresence_EmptyAgentID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"agentId":"","connected":true}`)
client.handlePresence(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
}
}
func TestHandleAgentConfig(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["rex"] = models.AgentCardData{
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
Channel: "discord",
}
payload := json.RawMessage(`{
"id": "rex",
"name": "Rex the Dev",
"role": "Senior Frontend",
"channel": "telegram"
}`)
client.handleAgentConfig(payload)
repo.mu.Lock()
agent := repo.agents["rex"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
// Verify DisplayName and Role updated
if agent.DisplayName != "Rex the Dev" {
t.Errorf("displayName = %q, want %q", agent.DisplayName, "Rex the Dev")
}
if agent.Role != "Senior Frontend" {
t.Errorf("role = %q, want %q", agent.Role, "Senior Frontend")
}
if agent.Channel != "telegram" {
t.Errorf("channel = %q, want %q", agent.Channel, "telegram")
}
// Verify update was called
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
// Verify broker fires "fleet.update"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "fleet.update" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'fleet.update'")
}
}
func TestHandleAgentConfig_EmptyID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"id":"","name":"Ghost"}`)
client.handleAgentConfig(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty id, got %d", len(events))
}
}
func TestHandleAgentConfig_NotFound(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"id":"unknown","name":"Ghost","role":"Phantom"}`)
client.handleAgentConfig(payload)
// Agent doesn't exist in repo, so Update will fail → handler logs warning, returns early
events := capture.captured()
for _, evt := range events {
if evt.EventType == "fleet.update" {
t.Error("fleet.update should not be broadcast when agent update fails")
}
}
}

View File

@@ -1,196 +0,0 @@
// Package gateway provides the initial sync logic that fetches agent and
// session data from the OpenClaw gateway via WS RPCs after handshake,
// persists to the repository, merges session state into agent cards, and
// broadcasts the merged fleet to SSE clients.
package gateway
import (
"context"
"encoding/json"
"fmt"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
// ── RPC response types ───────────────────────────────────────────────────
// agentListItem represents a single agent returned by the agents.list RPC.
// Fields are extracted gracefully from json.RawMessage so unknown fields
// from the gateway are silently ignored.
type agentListItem struct {
ID string `json:"id"`
Name string `json:"name"`
Model string `json:"model"`
Role string `json:"role"`
Channel string `json:"channel"`
Metadata json.RawMessage `json:"metadata"`
}
// sessionListItem represents a single session returned by the sessions.list RPC.
type sessionListItem struct {
SessionKey string `json:"sessionKey"`
AgentID string `json:"agentId"`
Status string `json:"status"` // running, done, streaming, error
TotalTokens int `json:"totalTokens"`
LastActivityAt string `json:"lastActivityAt"`
}
// ── Sync logic ──────────────────────────────────────────────────────────
// initialSync fetches agents and sessions from the gateway via WS RPCs,
// persists them, merges session state into agent cards, and broadcasts
// the merged fleet as a fleet.update event.
func (c *WSClient) initialSync(ctx context.Context) error {
if c.agents == nil {
c.logger.Info("initial sync skipped (no repository)")
return nil
}
c.logger.Info("initial sync starting")
// 1. Fetch agents
agentsRaw, err := c.Send("agents.list", nil)
if err != nil {
return fmt.Errorf("agents.list RPC: %w", err)
}
var agentItems []agentListItem
if err := json.Unmarshal(agentsRaw, &agentItems); err != nil {
return fmt.Errorf("parse agents.list response: %w", err)
}
c.logger.Info("agents.list received", "count", len(agentItems))
// 2. Persist each agent
for _, item := range agentItems {
card := agentItemToCard(item)
existing, err := c.agents.Get(ctx, card.ID)
if err != nil {
// Agent doesn't exist — create it
if createErr := c.agents.Create(ctx, card); createErr != nil {
c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr)
continue
}
c.logger.Info("sync: agent created", "id", card.ID)
continue
}
// Agent exists — update if display name or role changed
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
newName := card.DisplayName
newRole := card.Role
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
DisplayName: &newName,
Role: &newRole,
})
if updateErr != nil {
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
}
}
}
// 3. Fetch sessions
sessionsRaw, err := c.Send("sessions.list", nil)
if err != nil {
return fmt.Errorf("sessions.list RPC: %w", err)
}
var sessionItems []sessionListItem
if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil {
return fmt.Errorf("parse sessions.list response: %w", err)
}
c.logger.Info("sessions.list received", "count", len(sessionItems))
// 4. Build a map of agentId → session for merge
sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems {
if s.AgentID != "" {
sessionByAgent[s.AgentID] = s
}
}
// 5. Merge session state into agents and update + broadcast
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems {
card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok {
// Merge session state
card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt
// Use totalTokens as a rough progress indicator
if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100) // normalize to 0-100
card.TaskProgress = &prog
}
}
// Persist merged state
existing, err := c.agents.Get(ctx, card.ID)
if err == nil && existing.Status != card.Status {
status := card.Status
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
Status: &status,
})
if updateErr != nil {
c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr)
}
}
mergedAgents = append(mergedAgents, card)
}
// 6. Broadcast the full merged fleet
c.broker.Broadcast("fleet.update", mergedAgents)
c.logger.Info("initial sync complete", "agents", len(mergedAgents))
return nil
}
// mapSessionStatus converts a gateway session status string to an AgentStatus.
// - "running" / "streaming" → active
// - "error" → error
// - "done" / "" / other → idle
func mapSessionStatus(status string) models.AgentStatus {
switch status {
case "running", "streaming":
return models.AgentStatusActive
case "error":
return models.AgentStatusError
default:
return models.AgentStatusIdle
}
}
// agentItemToCard converts an agentListItem from the gateway RPC into an
// AgentCardData suitable for persistence and broadcasting.
func agentItemToCard(item agentListItem) models.AgentCardData {
role := item.Role
if role == "" {
role = "agent"
}
channel := item.Channel
if channel == "" {
channel = "unknown"
}
name := item.Name
if name == "" {
name = item.ID
}
return models.AgentCardData{
ID: item.ID,
DisplayName: name,
Role: role,
Status: models.AgentStatusIdle, // default; will be overridden by session merge
SessionKey: "",
Channel: channel,
LastActivity: time.Now().UTC().Format(time.RFC3339),
}
}

View File

@@ -1,236 +0,0 @@
package gateway
import (
"context"
"testing"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
func TestInitialSync(t *testing.T) {
_ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// --- Test agentItemToCard + session merge (the core of initialSync) ---
agentItems := []agentListItem{
{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"},
{ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"},
}
sessionItems := []sessionListItem{
{SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"},
{SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"},
}
// Build sessionByAgent map (mirrors initialSync logic)
sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems {
if s.AgentID != "" {
sessionByAgent[s.AgentID] = s
}
}
// Merge and verify
merged := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems {
card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok {
card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt
if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100)
card.TaskProgress = &prog
}
}
merged = append(merged, card)
}
// Verify otto: running → active
if merged[0].ID != "otto" {
t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto")
}
if merged[0].Status != models.AgentStatusActive {
t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive)
}
if merged[0].SessionKey != "s1" {
t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1")
}
if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 {
t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress)
}
// Verify dex: done → idle
if merged[1].ID != "dex" {
t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex")
}
if merged[1].Status != models.AgentStatusIdle {
t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle)
}
if merged[1].SessionKey != "s2" {
t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2")
}
}
func TestInitialSync_PersistCreatesNew(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Simulate the persist logic from initialSync:
// new agents should be created
card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"})
ctx := context.Background()
// Agent doesn't exist → create
_, err := repo.Get(ctx, card.ID)
if err == nil {
t.Fatal("expected agent to not exist yet")
}
if err := repo.Create(ctx, card); err != nil {
t.Fatalf("Create failed: %v", err)
}
got, err := repo.Get(ctx, card.ID)
if err != nil {
t.Fatalf("Get after Create failed: %v", err)
}
if got.ID != "otto" {
t.Errorf("got.ID = %q, want %q", got.ID, "otto")
}
if got.DisplayName != "Otto" {
t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto")
}
if got.Role != "Orchestrator" {
t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator")
}
}
func TestInitialSync_PersistUpdatesExisting(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
ctx := context.Background()
// Pre-populate with existing agent
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Role: "Old Role",
Status: models.AgentStatusIdle,
}
// Simulate initialSync: agent exists, name/role changed → update
newName := "Otto Prime"
newRole := "Super Orchestrator"
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
DisplayName: &newName,
Role: &newRole,
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
got, err := repo.Get(ctx, "otto")
if err != nil {
t.Fatalf("Get after Update failed: %v", err)
}
if got.DisplayName != "Otto Prime" {
t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime")
}
if got.Role != "Super Orchestrator" {
t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator")
}
}
func TestInitialSync_MergesSessionStatus(t *testing.T) {
// When initialSync merges session state, an agent whose existing status
// differs from the session-derived status should be updated.
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
ctx := context.Background()
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusIdle,
}
// Simulate session merge: session says "running" → agent should go active
activeStatus := mapSessionStatus("running")
if activeStatus != models.AgentStatusActive {
t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus)
}
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
Status: &activeStatus,
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
got, err := repo.Get(ctx, "otto")
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if got.Status != models.AgentStatusActive {
t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive)
}
}
func TestInitialSync_BroadcastsFleet(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Create some agents in the repo
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive}
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
// Simulate the final broadcast from initialSync
mergedAgents := []models.AgentCardData{
repo.agents["otto"],
repo.agents["dex"],
}
broker.Broadcast("fleet.update", mergedAgents)
events := capture.captured()
if len(events) == 0 {
t.Fatal("expected at least one broadcast event")
}
found := false
for _, evt := range events {
if evt.EventType == "fleet.update" {
found = true
// Verify data is the merged agents list
agents, ok := evt.Data.([]models.AgentCardData)
if !ok {
t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data)
}
if len(agents) != 2 {
t.Errorf("fleet.update agents count = %d, want 2", len(agents))
}
break
}
}
if !found {
t.Error("expected fleet.update broadcast event")
}
}

View File

@@ -1,460 +0,0 @@
// Package gateway provides WebSocket client integration with the OpenClaw
// gateway using WS protocol v3. The WSClient handles connection, handshake,
// frame routing, request/response correlation, and automatic reconnection
// with exponential backoff.
package gateway
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
"github.com/gorilla/websocket"
"github.com/google/uuid"
)
// WSConfig holds WebSocket client configuration, typically loaded from
// environment variables. AuthToken must be set to a valid OpenClaw gateway
// operator token.
type WSConfig struct {
URL string // e.g. "ws://host.docker.internal:18789/"
AuthToken string // from OPENCLAW_GATEWAY_TOKEN
}
// DefaultWSConfig returns sensible defaults for local development.
func DefaultWSConfig() WSConfig {
return WSConfig{
URL: "ws://localhost:18789/",
AuthToken: "",
}
}
// eventHandler is a callback invoked when a named event arrives from the
// gateway.
type eventHandler func(json.RawMessage)
// WSClient connects to the OpenClaw gateway over WebSocket, completes the
// v3 handshake, routes incoming frames, and automatically reconnects on
// disconnect with exponential backoff.
type WSClient struct {
config WSConfig
conn *websocket.Conn
connMu sync.Mutex // protects conn for writes
pending map[string]chan<- json.RawMessage
mu sync.Mutex // protects pending and handlers
agents repository.AgentRepo
broker *handler.Broker
logger *slog.Logger
handlers map[string][]eventHandler
connId string // set after successful hello-ok
restClient *Client // optional REST client to notify on WS ready
wsReadyOnce sync.Once // ensures MarkWSReady close is one-shot
}
// NewWSClient returns a WSClient wired to the given repository and broker.
func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Broker, logger *slog.Logger) *WSClient {
if logger == nil {
logger = slog.Default()
}
return &WSClient{
config: cfg,
pending: make(map[string]chan<- json.RawMessage),
agents: agents,
broker: broker,
logger: logger,
handlers: make(map[string][]eventHandler),
}
}
// SetRESTClient wires the REST fallback client so the WS client can notify
// it when the WS connection is ready. Call this before Start.
func (c *WSClient) SetRESTClient(rest *Client) {
c.restClient = rest
}
// OnEvent registers a handler for the given event name. Handlers are called
// when an incoming frame with type "event" and matching event name is
// received. This is safe to call before Start.
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
c.mu.Lock()
defer c.mu.Unlock()
c.handlers[event] = append(c.handlers[event], handler)
}
// ── Frame types ──────────────────────────────────────────────────────────
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
type wsFrame struct {
Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames)
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *wsError `json:"error,omitempty"`
}
// wsError represents an error in a response frame.
type wsError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// connectRequest builds the initial connect handshake payload.
type connectRequest struct {
MinProtocol int `json:"minProtocol"`
MaxProtocol int `json:"maxProtocol"`
Client connectClientInfo `json:"client"`
Role string `json:"role"`
Scopes []string `json:"scopes"`
Auth connectAuth `json:"auth"`
}
type connectClientInfo struct {
ID string `json:"id"`
Version string `json:"version"`
Platform string `json:"platform"`
Mode string `json:"mode"`
}
type connectAuth struct {
Token string `json:"token"`
}
// helloOKResponse represents the expected response to a successful connect.
type helloOKResponse struct {
ConnID string `json:"connId"`
Features struct {
Methods []string `json:"methods"`
Events []string `json:"events"`
} `json:"features"`
}
// ── Start loop ───────────────────────────────────────────────────────────
// Start connects to the gateway, completes the handshake, and begins the
// read loop. On disconnect it reconnects with exponential backoff. On
// ctx cancellation it performs a clean shutdown.
func (c *WSClient) Start(ctx context.Context) {
initialBackoff := 1 * time.Second
maxBackoff := 30 * time.Second
backoff := initialBackoff
for {
err := c.connectAndRun(ctx)
if err != nil {
if ctx.Err() != nil {
c.logger.Info("ws client stopped (context cancelled)")
return
}
c.logger.Warn("ws client disconnected, reconnecting",
"error", err,
"backoff", backoff)
} else {
// Reset backoff on successful connect+run completion
backoff = initialBackoff
}
select {
case <-ctx.Done():
c.logger.Info("ws client stopped during backoff (context cancelled)")
return
case <-time.After(backoff):
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s
backoff = backoff * 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}
// connectAndRun dials the gateway, completes the handshake, and runs the
// read loop until an error occurs or ctx is cancelled.
func (c *WSClient) connectAndRun(ctx context.Context) error {
c.logger.Info("ws client connecting", "url", c.config.URL)
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
conn, _, err := dialer.DialContext(ctx, c.config.URL, nil)
if err != nil {
return fmt.Errorf("dial failed: %w", err)
}
c.connMu.Lock()
c.conn = conn
c.connMu.Unlock()
// When context is cancelled, close the conn to unblock ReadJSON in readLoop.
go func() {
<-ctx.Done()
c.connMu.Lock()
if c.conn != nil {
c.conn.Close()
}
c.connMu.Unlock()
}()
defer func() {
conn.Close()
}()
// Step 1: Read the connect.challenge frame
if err := c.readChallenge(conn); err != nil {
return fmt.Errorf("handshake challenge: %w", err)
}
// Step 2: Send connect request
helloOK, err := c.sendConnect(conn)
if err != nil {
return fmt.Errorf("handshake connect: %w", err)
}
c.logger.Info("ws client handshake complete",
"connId", helloOK.ConnID,
"methods", helloOK.Features.Methods,
"events", helloOK.Features.Events)
// Store connId for reference
c.connMu.Lock()
c.connId = helloOK.ConnID
c.connMu.Unlock()
// Notify REST client that WS is live so it stands down
if c.restClient != nil {
c.restClient.MarkWSReady()
c.logger.Info("ws client notified REST fallback to stand down")
}
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
c.wsReadyOnce = sync.Once{}
// Step 2b: Initial sync — fetch agents + sessions from gateway
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
}
// Step 2c: Register live event handlers
c.registerEventHandlers()
// Step 3: Read loop
return c.readLoop(ctx, conn)
}
// readChallenge reads the first frame from the gateway, which must be a
// connect.challenge event.
func (c *WSClient) readChallenge(conn *websocket.Conn) error {
var frame wsFrame
if err := conn.ReadJSON(&frame); err != nil {
return fmt.Errorf("read challenge: %w", err)
}
if frame.Type != "event" || frame.Event != "connect.challenge" {
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
}
c.logger.Debug("received connect.challenge", "params", string(frame.Params))
return nil
}
// sendConnect sends the connect request and waits for the hello-ok response.
func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
reqID := uuid.New().String()
params := connectRequest{
MinProtocol: 3,
MaxProtocol: 3,
Client: connectClientInfo{
ID: "control-center",
Version: "1.0",
Platform: "server",
Mode: "operator",
},
Role: "operator",
Scopes: []string{"operator.read"},
Auth: connectAuth{
Token: c.config.AuthToken,
},
}
paramsJSON, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal connect params: %w", err)
}
reqFrame := wsFrame{
Type: "req",
ID: reqID,
Method: "connect",
Params: paramsJSON,
}
if err := conn.WriteJSON(reqFrame); err != nil {
return nil, fmt.Errorf("write connect request: %w", err)
}
// Read response
var resFrame wsFrame
if err := conn.ReadJSON(&resFrame); err != nil {
return nil, fmt.Errorf("read connect response: %w", err)
}
if resFrame.Error != nil {
return nil, fmt.Errorf("connect rejected: code=%d msg=%s", resFrame.Error.Code, resFrame.Error.Message)
}
if resFrame.ID != reqID {
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
}
// Check for hello-ok method in the result
// The gateway responds with method "hello-ok" on success
var helloOK helloOKResponse
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
return nil, fmt.Errorf("parse hello-ok: %w", err)
}
return &helloOK, nil
}
// readLoop continuously reads frames from the connection and routes them.
// It returns on read error or when the connection is closed by the ctx-done
// goroutine started in connectAndRun.
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
for {
var frame wsFrame
if err := conn.ReadJSON(&frame); err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
// Check if it's a close error
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
c.logger.Info("ws connection closed by server")
return nil
}
if websocket.IsUnexpectedCloseError(err) {
c.logger.Warn("ws connection unexpectedly closed", "error", err)
return err
}
return fmt.Errorf("read frame: %w", err)
}
c.routeFrame(frame)
}
}
// routeFrame dispatches a received frame to the appropriate handler.
func (c *WSClient) routeFrame(frame wsFrame) {
switch frame.Type {
case "res":
c.handleResponse(frame)
case "event":
c.handleEvent(frame)
default:
c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID)
}
}
// handleResponse correlates a response frame to a pending request channel.
func (c *WSClient) handleResponse(frame wsFrame) {
c.mu.Lock()
ch, ok := c.pending[frame.ID]
if ok {
delete(c.pending, frame.ID)
}
c.mu.Unlock()
if !ok {
c.logger.Warn("received response for unknown request", "id", frame.ID)
return
}
if frame.Error != nil {
// Send nil to signal error; caller checks via Send return
ch <- nil
return
}
ch <- frame.Result
}
// handleEvent dispatches an event frame to registered handlers.
func (c *WSClient) handleEvent(frame wsFrame) {
c.mu.Lock()
handlers := c.handlers[frame.Event]
c.mu.Unlock()
if len(handlers) == 0 {
c.logger.Debug("unhandled event", "event", frame.Event)
return
}
for _, h := range handlers {
h(frame.Params)
}
}
// ── Send ─────────────────────────────────────────────────────────────────
// Send sends a JSON request to the gateway and returns the response payload.
// It is safe for concurrent use. Returns an error if the client is not
// connected.
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
reqID := uuid.New().String()
paramsJSON, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal params: %w", err)
}
// Register pending response channel
respCh := make(chan json.RawMessage, 1)
c.mu.Lock()
c.pending[reqID] = respCh
c.mu.Unlock()
defer func() {
c.mu.Lock()
delete(c.pending, reqID)
c.mu.Unlock()
}()
// Build and send frame
frame := wsFrame{
Type: "req",
ID: reqID,
Method: method,
Params: paramsJSON,
}
c.connMu.Lock()
if c.conn == nil {
c.connMu.Unlock()
return nil, fmt.Errorf("gateway: not connected")
}
err = c.conn.WriteJSON(frame)
c.connMu.Unlock()
if err != nil {
return nil, fmt.Errorf("write request: %w", err)
}
// Wait for response with timeout
select {
case resp := <-respCh:
if resp == nil {
return nil, fmt.Errorf("gateway returned error for request %s", reqID)
}
return resp, nil
case <-time.After(30 * time.Second):
return nil, fmt.Errorf("request %s timed out", reqID)
}
}

View File

@@ -1,484 +0,0 @@
package gateway
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
"github.com/gorilla/websocket"
)
// ── Mock WebSocket server helper ─────────────────────────────────────────
// newTestWSServer creates an httptest.Server that upgrades to WebSocket and
// delegates each connection to handler. The server URL can be converted to
// a ws:// URL by replacing "http" with "ws".
func newTestWSServer(t *testing.T, handler func(conn *websocket.Conn)) *httptest.Server {
t.Helper()
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
handler(conn)
}))
return srv
}
// wsURL converts an httptest.Server http URL to a ws URL.
func wsURL(srv *httptest.Server) string {
return "ws" + strings.TrimPrefix(srv.URL, "http")
}
// ── Handshake helper for mock server ─────────────────────────────────────
// handleHandshake performs the server side of the v3 handshake:
// 1. Send connect.challenge
// 2. Read connect request
// 3. Send hello-ok response
//
// Returns the connect request frame for inspection.
func handleHandshake(t *testing.T, conn *websocket.Conn) map[string]any {
t.Helper()
// 1. Send connect.challenge
challenge := map[string]any{
"type": "event",
"event": "connect.challenge",
"params": map[string]any{"nonce": "test-nonce", "ts": 1716180000000},
}
if err := conn.WriteJSON(challenge); err != nil {
t.Fatalf("server: write challenge: %v", err)
}
// 2. Read connect request
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
t.Fatalf("server: read connect request: %v", err)
}
if req["method"] != "connect" {
t.Fatalf("server: expected method=connect, got %v", req["method"])
}
// 3. Send hello-ok response
// Note: helloOKResponse expects ConnID at the top level of the result,
// matching the WSClient's JSON struct tags.
result := map[string]any{
"type": "hello-ok",
"protocol": 3,
"connId": "test-conn-123",
"features": map[string]any{"methods": []string{}, "events": []string{}},
"auth": map[string]any{"role": "operator", "scopes": []string{"operator.read"}},
}
res := map[string]any{
"type": "res",
"id": req["id"],
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
t.Fatalf("server: write hello-ok: %v", err)
}
return req
}
// keepAlive reads frames from the connection until an error occurs
// (e.g., the client disconnects). Used as the default "do nothing"
// server loop after handshake.
func keepAlive(conn *websocket.Conn) {
for {
var m map[string]any
if err := conn.ReadJSON(&m); err != nil {
break
}
}
}
// ── 1. Test: Full handshake ──────────────────────────────────────────────
func TestWSClient_Handshake(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait briefly for handshake to complete
time.Sleep(200 * time.Millisecond)
// Verify connId was set
client.connMu.Lock()
connID := client.connId
client.connMu.Unlock()
if connID != "test-conn-123" {
t.Errorf("expected connId 'test-conn-123', got %q", connID)
}
cancel()
select {
case <-done:
// Client exited cleanly
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down after context cancellation")
}
}
// ── 2. Test: Send() with response matching ───────────────────────────────
func TestWSClient_Send(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// Read RPC requests and respond to each
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
var result any
switch method {
case "agents.list":
result = map[string]any{
"agents": []map[string]any{
{"id": "otto", "name": "Otto"},
},
}
default:
result = map[string]any{}
}
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go client.Start(ctx)
// Give the client time to complete handshake
time.Sleep(300 * time.Millisecond)
resp, err := client.Send("agents.list", nil)
if err != nil {
t.Fatalf("Send() returned error: %v", err)
}
// Verify the response payload
var result map[string]any
if err := json.Unmarshal(resp, &result); err != nil {
t.Fatalf("unmarshal response: %v", err)
}
agents, ok := result["agents"].([]any)
if !ok || len(agents) != 1 {
t.Errorf("expected 1 agent in response, got %v", result)
}
cancel()
}
// ── 3. Test: Event handler routing ───────────────────────────────────────
func TestWSClient_EventRouting(t *testing.T) {
eventReceived := make(chan json.RawMessage, 1)
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// After handshake, send a test event
evt := map[string]any{
"type": "event",
"event": "test.event",
"params": map[string]any{"greeting": "hello from server"},
}
if err := conn.WriteJSON(evt); err != nil {
t.Logf("server: write event: %v", err)
return
}
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
// Register event handler BEFORE starting the client
client.OnEvent("test.event", func(payload json.RawMessage) {
eventReceived <- payload
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go client.Start(ctx)
// Wait for the event handler to fire
select {
case payload := <-eventReceived:
var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil {
t.Fatalf("unmarshal event payload: %v", err)
}
if greeting, _ := data["greeting"].(string); greeting != "hello from server" {
t.Errorf("expected greeting 'hello from server', got %q", greeting)
}
case <-time.After(3 * time.Second):
t.Fatal("timed out waiting for event handler to fire")
}
cancel()
}
// ── 4. Test: Concurrent Send ─────────────────────────────────────────────
func TestWSClient_ConcurrentSend(t *testing.T) {
var reqCount atomic.Int32
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// Read RPC requests and respond to each
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
n := reqCount.Add(1)
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": map[string]any{"index": n, "method": req["method"]},
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go client.Start(ctx)
// Give the client time to complete handshake
time.Sleep(300 * time.Millisecond)
// Fire 3 concurrent Send() calls
type sendResult struct {
method string
payload json.RawMessage
err error
}
results := make(chan sendResult, 3)
methods := []string{"agents.list", "sessions.list", "agents.config"}
for _, method := range methods {
go func(m string) {
resp, err := client.Send(m, nil)
results <- sendResult{method: m, payload: resp, err: err}
}(method)
}
// Collect all results
for i := 0; i < 3; i++ {
select {
case r := <-results:
if r.err != nil {
t.Errorf("Send(%q) returned error: %v", r.method, r.err)
continue
}
var result map[string]any
if err := json.Unmarshal(r.payload, &result); err != nil {
t.Errorf("Send(%q) unmarshal error: %v", r.method, err)
continue
}
gotMethod, _ := result["method"].(string)
if gotMethod != r.method {
t.Errorf("Send(%q) got response for %q (mismatched)", r.method, gotMethod)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for concurrent Send results")
}
}
cancel()
}
// ── 5. Test: Clean shutdown ──────────────────────────────────────────────
func TestWSClient_CleanShutdown(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Let the client connect and complete handshake
time.Sleep(200 * time.Millisecond)
// Cancel context — should trigger clean shutdown
cancel()
select {
case <-done:
// Client exited cleanly — pass
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly within timeout")
}
}
// ── Pure utility tests (from CUB-205) ─────────────────────────────────────
func TestMapSessionStatus(t *testing.T) {
tests := []struct {
input string
expected models.AgentStatus
}{
{"running", models.AgentStatusActive},
{"streaming", models.AgentStatusActive},
{"done", models.AgentStatusIdle},
{"error", models.AgentStatusError},
{"", models.AgentStatusIdle},
{"garbage", models.AgentStatusIdle},
}
for _, tt := range tests {
result := mapSessionStatus(tt.input)
if result != tt.expected {
t.Errorf("mapSessionStatus(%q) = %q, want %q", tt.input, result, tt.expected)
}
}
}
func TestAgentItemToCard(t *testing.T) {
t.Run("full fields", func(t *testing.T) {
item := agentListItem{
ID: "dex",
Name: "Dex",
Role: "backend",
Channel: "telegram",
}
card := agentItemToCard(item)
if card.ID != "dex" {
t.Errorf("ID = %q, want %q", card.ID, "dex")
}
if card.DisplayName != "Dex" {
t.Errorf("DisplayName = %q, want %q", card.DisplayName, "Dex")
}
if card.Role != "backend" {
t.Errorf("Role = %q, want %q", card.Role, "backend")
}
if card.Channel != "telegram" {
t.Errorf("Channel = %q, want %q", card.Channel, "telegram")
}
if card.Status != models.AgentStatusIdle {
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
}
})
t.Run("empty fields use defaults", func(t *testing.T) {
item := agentListItem{
ID: "otto",
}
card := agentItemToCard(item)
if card.ID != "otto" {
t.Errorf("ID = %q, want %q", card.ID, "otto")
}
if card.DisplayName != "otto" {
t.Errorf("DisplayName = %q, want %q (should fallback to ID)", card.DisplayName, "otto")
}
if card.Role != "agent" {
t.Errorf("Role = %q, want %q (default)", card.Role, "agent")
}
if card.Channel != "unknown" {
t.Errorf("Channel = %q, want %q (per Grimm requirement)", card.Channel, "unknown")
}
if card.Status != models.AgentStatusIdle {
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
}
})
t.Run("empty name falls back to ID", func(t *testing.T) {
item := agentListItem{
ID: "hex",
Name: "",
Role: "database",
}
card := agentItemToCard(item)
if card.DisplayName != "hex" {
t.Errorf("DisplayName = %q, want %q (ID fallback)", card.DisplayName, "hex")
}
})
}
func TestStrPtr(t *testing.T) {
s := "hello"
p := strPtr(s)
if p == nil {
t.Fatal("strPtr returned nil")
}
if *p != s {
t.Errorf("strPtr(%q) = %q, want %q", s, *p, s)
}
empty := ""
ep := strPtr(empty)
if *ep != empty {
t.Errorf("strPtr(empty) = %q, want %q", *ep, empty)
}
}

View File

@@ -64,9 +64,6 @@ type CreateAgentRequest struct {
// UpdateAgentRequest is the payload for PUT /api/agents/{id}.
type UpdateAgentRequest struct {
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
DisplayName *string `json:"displayName,omitempty"`
Role *string `json:"role,omitempty"`
LastActivityAt *string `json:"lastActivityAt,omitempty"`
CurrentTask *string `json:"currentTask,omitempty"`
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
TaskElapsed *string `json:"taskElapsed,omitempty"`

View File

@@ -1,46 +0,0 @@
# Control Center — Architecture Context
## Current State
The Control Center backend uses a **dual-path gateway client** architecture:
- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff.
- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup.
## Live Gateway Connection
### Startup Sequence
1. Both WS client and REST client start concurrently
2. REST client waits 30s for WS readiness signal (`wsReady` channel)
3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down")
4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling")
5. If no WS client configured → REST client polls immediately
### WebSocket Client (Primary)
- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN`
- Protocol: v3 handshake (challenge → connect → hello-ok)
- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update`
- Live events: `sessions.changed`, `presence`, `agent.config`
- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max)
### REST Poller (Fallback)
- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s)
- Only used when WS is unavailable
- Polls the `/api/agents` endpoint and syncs agent status changes
### Wiring
```
main.go
├── wsClient = NewWSClient(...)
├── restClient = NewClient(...)
├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready
├── restClient.SetWSClient(wsClient) // REST defers to WS
├── go wsClient.Start(ctx) // primary
└── go restClient.Start(ctx) // fallback (waits for WS)
```
## Key Design Decisions
- **Push over poll**: WS is preferred for real-time updates; REST is a safety net
- **DB first, then SSE**: All event handlers persist to DB before broadcasting
- **Graceful degradation**: System works without WS; REST provides basic functionality
- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called