diff --git a/.env.example b/.env.example index 0a76c10..3d4290c 100644 --- a/.env.example +++ b/.env.example @@ -13,9 +13,14 @@ ENVIRONMENT=development DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable # Gateway (OpenClaw) connection -# URL to the OpenClaw gateway API for polling agent states -GATEWAY_URL=http://localhost:18789/api/agents -# Polling interval for agent state updates +# WebSocket gateway config (primary path) +WS_GATEWAY_URL=ws://host.docker.internal:18789/ +# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment) +GATEWAY_TOKEN= + +# REST poller config (fallback, only used if WS fails to connect) +GATEWAY_URL=http://host.docker.internal:18789/api/agents +# Polling interval for agent state updates (fallback only) GATEWAY_POLL_INTERVAL=5s # ── Frontend Variables (via Vite) ─────────────────────────────────────── diff --git a/.gitea/workflows/build-dev.yaml b/.gitea/workflows/build-dev.yaml new file mode 100644 index 0000000..4bd6b6f --- /dev/null +++ b/.gitea/workflows/build-dev.yaml @@ -0,0 +1,85 @@ +name: Build (Dev) + +on: + push: + branches: [dev] + pull_request: + branches: [dev] + workflow_dispatch: + +env: + GO_VERSION: "1.23" + NODE_VERSION: "22" + BINARY_NAME: server + +jobs: + build-go-backend: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Test Go backend + working-directory: go-backend + run: go test ./... + + - name: Build Go binary + working-directory: go-backend + run: | + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -ldflags="-s -w -X main.version=${GITHUB_SHA:0:8}" \ + -o ${{ env.BINARY_NAME }} ./cmd/server + + - name: Upload Go binary + uses: actions/upload-artifact@v4 + with: + name: go-backend-binary + path: go-backend/${{ env.BINARY_NAME }} + retention-days: 3 + + build-frontend: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Node + uses: actions/setup-node@v4 + with: + node-version: ${{ env.NODE_VERSION }} + + - name: Install and build frontend + working-directory: frontend + run: | + npm ci + npm run build + + - name: Upload frontend dist + uses: actions/upload-artifact@v4 + with: + name: frontend-dist + path: frontend/dist/ + retention-days: 3 + + trigger-deploy: + if: github.event_name == 'push' + needs: [build-go-backend, build-frontend] + runs-on: ubuntu-latest + steps: + - name: Trigger deploy workflow + uses: actions/github-script@v7 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + await github.rest.repos.createDispatchEvent({ + owner: context.repo.owner, + repo: context.repo.repo, + event_type: 'dev-build-success', + client_payload: { + sha: context.sha, + ref: context.ref + } + }) \ No newline at end of file diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..72240f9 --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,126 @@ +name: Deploy (Dev) + +on: + repository_dispatch: + types: + - dev-build-success + workflow_dispatch: + +env: + BINARY_NAME: server + DEV_HOST: ${{ secrets.DEV_HOST }} + DEV_USER: ${{ secrets.DEV_USER }} + DEPLOY_BINARY_PATH: /opt/control-center/server + DEPLOY_FRONTEND_PATH: /usr/share/nginx/html + SERVICE_NAME: control-center-server + FRONTEND_SERVICE: nginx + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Download Go binary + uses: actions/download-artifact@v4 + with: + name: go-backend-binary + + - name: Download frontend dist + uses: actions/download-artifact@v4 + with: + name: frontend-dist + path: dist + + - name: Make binary executable + run: chmod +x ${{ env.BINARY_NAME }} + + - name: Generate deploy script + run: | + cat > deploy.sh <<'SCRIPT' + #!/usr/bin/env bash + set -euo pipefail + + BINARY="${1}" + FRONTEND_DIST="${2:-dist}" + BINARY_PATH="${3:-/opt/control-center/server}" + FRONTEND_PATH="${4:-/usr/share/nginx/html}" + BINARY_SERVICE="${5:-control-center-server}" + FRONTEND_SERVICE="${6:-nginx}" + + TIMESTAMP=$(date +%Y%m%d%H%M%S) + BACKUP="${BINARY_PATH}.${TIMESTAMP}.bak" + + echo "=== deploy backend ===" + + if [ -f "$BINARY_PATH" ]; then + echo "backing up current binary" + cp "$BINARY_PATH" "$BACKUP" + fi + + echo "installing new binary" + cp "$BINARY" "$BINARY_PATH" + chmod +x "$BINARY_PATH" + + echo "restarting service" + systemctl reload-or-restart "$BINARY_SERVICE" || systemctl restart "$BINARY_SERVICE" + + sleep 3 + + if ! systemctl is-active --quiet "$BINARY_SERVICE"; then + echo "FAILED: $BINARY_SERVICE did not start — rolling back" + if [ -f "$BACKUP" ]; then + cp "$BACKUP" "$BINARY_PATH" + systemctl restart "$BINARY_SERVICE" + fi + exit 1 + fi + + echo "backend deploy ok — keeping last 3 backups" + ls -t "${BINARY_PATH}."*.bak 2>/dev/null | tail -n +4 | xargs -r rm -f + + echo "=== deploy frontend ===" + if [ -d "$FRONTEND_DIST" ] && [ -n "$(ls -A "$FRONTEND_DIST" 2>/dev/null)" ]; then + rsync -a --delete "$FRONTEND_DIST/" "$FRONTEND_PATH/" + systemctl reload "$FRONTEND_SERVICE" 2>/dev/null ||: + echo "frontend deploy ok" + fi + + echo "=== deploy complete ===" + SCRIPT + chmod +x deploy.sh + + - name: Copy artifacts to dev server + uses: appleboy/scp-action@v0.1.7 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + source: "${{ env.BINARY_NAME }},deploy.sh,dist" + target: "/tmp/control-center-deploy" + + - name: Execute deploy on dev server + uses: appleboy/ssh-action@v1 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + script: | + set -euo pipefail + cd /tmp/control-center-deploy + sudo ./deploy.sh \ + "${{ env.BINARY_NAME }}" \ + "dist" \ + "${{ env.DEPLOY_BINARY_PATH }}" \ + "${{ env.DEPLOY_FRONTEND_PATH }}" \ + "${{ env.SERVICE_NAME }}" \ + "${{ env.FRONTEND_SERVICE }}" + rm -rf /tmp/control-center-deploy + + - name: Notify on failure + if: failure() + uses: appleboy/ssh-action@v1 + with: + host: ${{ env.DEV_HOST }} + username: ${{ env.DEV_USER }} + key: ${{ secrets.DEV_SSH_KEY }} + script: | + echo "deploy failed — commit ${{ github.sha }}" > /tmp/control-center-deploy-failure.log \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 2e3c5bb..4591b81 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,8 @@ services: - ENVIRONMENT=production - PORT=8080 - GATEWAY_URL=http://host.docker.internal:18789/api/agents + - WS_GATEWAY_URL=ws://host.docker.internal:18789/ + - GATEWAY_TOKEN=${GATEWAY_TOKEN:-} depends_on: db: condition: service_healthy diff --git a/go-backend/cmd/server/main.go b/go-backend/cmd/server/main.go index 76b6a2b..cc96fb1 100644 --- a/go-backend/cmd/server/main.go +++ b/go-backend/cmd/server/main.go @@ -63,25 +63,30 @@ func main() { Broker: broker, }) - // ── Gateway client (polls OpenClaw for agent states) ─────────────────── - gwClient := gateway.NewClient(gateway.Config{ - URL: cfg.GatewayURL, - PollInterval: cfg.GatewayPollInterval, - }, agentRepo, broker) - - // ── WebSocket client (connects to OpenClaw gateway WS v3) ───────────── + // ── Gateway clients (WS primary, REST fallback) ─────────────────── + // WS gateway client (primary path) wsClient := gateway.NewWSClient(gateway.WSConfig{ URL: cfg.WSGatewayURL, AuthToken: cfg.WSGatewayToken, }, agentRepo, broker, logger) + // REST gateway client (fallback — only polls if WS fails to connect) + gwClient := gateway.NewClient(gateway.Config{ + URL: cfg.GatewayRestURL, + PollInterval: cfg.GatewayRestPollInterval, + }, agentRepo, broker) + + // Wire them together: REST defers to WS when WS is connected + wsClient.SetRESTClient(gwClient) + gwClient.SetWSClient(wsClient) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go gwClient.Start(ctx) - - // Start WS client in background; logs connection status + // Start WS client first (primary) go wsClient.Start(ctx) + // Start REST client (will wait for WS, then stand down or fall back) + go gwClient.Start(ctx) // ── Server ───────────────────────────────────────────────────────────── srv := &http.Server{ diff --git a/go-backend/internal/config/config.go b/go-backend/internal/config/config.go index 3098ba8..c6a26b3 100644 --- a/go-backend/internal/config/config.go +++ b/go-backend/internal/config/config.go @@ -15,10 +15,10 @@ type Config struct { CORSOrigin string LogLevel string Environment string - GatewayURL string - GatewayPollInterval time.Duration - WSGatewayURL string - WSGatewayToken string + GatewayRestURL string + GatewayRestPollInterval time.Duration + WSGatewayURL string + WSGatewayToken string } // Load reads configuration from environment variables, applying defaults where @@ -30,10 +30,10 @@ func Load() *Config { CORSOrigin: getEnv("CORS_ORIGIN", "*"), LogLevel: getEnv("LOG_LEVEL", "info"), Environment: getEnv("ENVIRONMENT", "development"), - GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"), - GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), - WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://localhost:18789/"), - WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), + GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), + GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), + WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"), + WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), } } diff --git a/go-backend/internal/gateway/client.go b/go-backend/internal/gateway/client.go index 4258a84..90b2f4d 100644 --- a/go-backend/internal/gateway/client.go +++ b/go-backend/internal/gateway/client.go @@ -17,13 +17,17 @@ import ( ) // Client polls the OpenClaw gateway for agent status and keeps the database -// and SSE broker in sync. +// and SSE broker in sync. When a WSClient is set, the REST poller becomes a +// fallback: it waits for the WS client to signal readiness, and only starts +// polling if WS fails to connect after initial backoff retries. type Client struct { url string pollInterval time.Duration httpClient *http.Client agents repository.AgentRepo broker *handler.Broker + wsClient *WSClient // optional WS client; when set, REST is fallback only + wsReady chan struct{} // closed once WS connection is established } // Config holds gateway client configuration, typically loaded from environment. @@ -48,22 +52,67 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker) httpClient: &http.Client{Timeout: 10 * time.Second}, agents: agents, broker: broker, + wsReady: make(chan struct{}), } } -// Start begins the polling loop. It runs until ctx is cancelled. -func (c *Client) Start(ctx context.Context) { - slog.Info("gateway client starting", - "url", c.url, - "pollInterval", c.pollInterval.String()) +// SetWSClient wires the WebSocket client so the REST poller knows to defer +// to it. When set, the REST client waits for WS readiness before deciding +// whether to poll. +func (c *Client) SetWSClient(ws *WSClient) { + c.wsClient = ws +} +// MarkWSReady signals that the WS connection is live and the REST poller +// should stand down. Called by WSClient after a successful handshake. +func (c *Client) MarkWSReady() { + select { + case <-c.wsReady: + // already closed + default: + close(c.wsReady) + } +} + +// Start begins the gateway client loop. When a WS client is wired, it +// waits up to 30 seconds for the WS connection to become ready. If WS +// connects, the REST poller stands down and only logs periodically. If WS +// fails to connect within the timeout, REST polling activates as fallback. +func (c *Client) Start(ctx context.Context) { + if c.wsClient != nil { + slog.Info("gateway client waiting for WS connection", "timeout", "30s") + + select { + case <-c.wsReady: + slog.Info("gateway client using WS — REST poller standing down") + // WS is live; keep this goroutine alive but idle. If WS + // disconnects later, we could re-enter polling, but for now + // the WS client handles its own reconnection. + <-ctx.Done() + slog.Info("gateway client stopped (WS mode)") + return + case <-time.After(30 * time.Second): + slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling", + "url", c.url, + "pollInterval", c.pollInterval.String()) + case <-ctx.Done(): + slog.Info("gateway client stopped while waiting for WS") + return + } + } else { + slog.Info("gateway client using REST polling (no WS client configured)", + "url", c.url, + "pollInterval", c.pollInterval.String()) + } + + // REST fallback polling ticker := time.NewTicker(c.pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): - slog.Info("gateway client stopped") + slog.Info("gateway client stopped (REST fallback)") return case <-ticker.C: c.poll(ctx) diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 195720b..462bf08 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -43,17 +43,18 @@ type eventHandler func(json.RawMessage) // v3 handshake, routes incoming frames, and automatically reconnects on // disconnect with exponential backoff. type WSClient struct { - config WSConfig - conn *websocket.Conn - connMu sync.Mutex // protects conn for writes - pending map[string]chan<- json.RawMessage - mu sync.Mutex // protects pending and handlers - agents repository.AgentRepo - broker *handler.Broker - logger *slog.Logger + config WSConfig + conn *websocket.Conn + connMu sync.Mutex // protects conn for writes + pending map[string]chan<- json.RawMessage + mu sync.Mutex // protects pending and handlers + agents repository.AgentRepo + broker *handler.Broker + logger *slog.Logger - handlers map[string][]eventHandler - connId string // set after successful hello-ok + handlers map[string][]eventHandler + connId string // set after successful hello-ok + restClient *Client // optional REST client to notify on WS ready } // NewWSClient returns a WSClient wired to the given repository and broker. @@ -71,6 +72,12 @@ func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Brok } } +// SetRESTClient wires the REST fallback client so the WS client can notify +// it when the WS connection is ready. Call this before Start. +func (c *WSClient) SetRESTClient(rest *Client) { + c.restClient = rest +} + // OnEvent registers a handler for the given event name. Handlers are called // when an incoming frame with type "event" and matching event name is // received. This is safe to call before Start. @@ -208,6 +215,12 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.connId = helloOK.ConnID c.connMu.Unlock() + // Notify REST client that WS is live so it stands down + if c.restClient != nil { + c.restClient.MarkWSReady() + c.logger.Info("ws client notified REST fallback to stand down") + } + // Step 2b: Initial sync — fetch agents + sessions from gateway if err := c.initialSync(ctx); err != nil { c.logger.Warn("initial sync failed, will continue with read loop", "error", err) diff --git a/reference/CONTROL_CENTER_CONTEXT.md b/reference/CONTROL_CENTER_CONTEXT.md new file mode 100644 index 0000000..81bee91 --- /dev/null +++ b/reference/CONTROL_CENTER_CONTEXT.md @@ -0,0 +1,46 @@ +# Control Center — Architecture Context + +## Current State + +The Control Center backend uses a **dual-path gateway client** architecture: + +- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff. +- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup. + +## Live Gateway Connection + +### Startup Sequence +1. Both WS client and REST client start concurrently +2. REST client waits 30s for WS readiness signal (`wsReady` channel) +3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down") +4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling") +5. If no WS client configured → REST client polls immediately + +### WebSocket Client (Primary) +- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN` +- Protocol: v3 handshake (challenge → connect → hello-ok) +- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update` +- Live events: `sessions.changed`, `presence`, `agent.config` +- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max) + +### REST Poller (Fallback) +- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s) +- Only used when WS is unavailable +- Polls the `/api/agents` endpoint and syncs agent status changes + +### Wiring +``` +main.go + ├── wsClient = NewWSClient(...) + ├── restClient = NewClient(...) + ├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready + ├── restClient.SetWSClient(wsClient) // REST defers to WS + ├── go wsClient.Start(ctx) // primary + └── go restClient.Start(ctx) // fallback (waits for WS) +``` + +## Key Design Decisions +- **Push over poll**: WS is preferred for real-time updates; REST is a safety net +- **DB first, then SSE**: All event handlers persist to DB before broadcasting +- **Graceful degradation**: System works without WS; REST provides basic functionality +- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called \ No newline at end of file