Compare commits

..

12 Commits

Author SHA1 Message Date
Dex 4509b0c217 CUB-119: add GET /api/health endpoint to Chi router
Dev Build & Deploy / test-and-build (pull_request) Failing after 6s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
2026-05-23 20:09:24 -04:00
Otto f3ce08497a docs: add comprehensive project context file (CONTEXT.md) for agent reference
Dev Build & Deploy / test-and-build (push) Failing after 0s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-21 13:29:24 -04:00
overseer fd60b0bb57 Merge pull request 'CUB-200: Implement WebSocket Gateway Client' (!42) from agent/dex/CUB-200-ws-gateway-client into dev
Dev Build & Deploy / test-and-build (push) Failing after 1s
Dev Build & Deploy / docker-build-push (push) Has been skipped
Reviewed-on: #42
2026-05-21 06:54:55 -04:00
Rex b7b05bb4e3 CUB-200: fix event-loss race — register handlers before readLoop starts
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
Move registerEventHandlers() call before the readLoop goroutine starts
in connectAndRun(). This eliminates the startup window where live gateway
events were actively read and dropped as 'unhandled' because handler
registration happened only after initialSync completed.

The handlers only depend on c.agents and c.broker, which are wired in the
constructor — they do not require initialSync to have completed.

Also adds TestConnectAndRun_EventNotLostDuringSync regression test that
sends a live presence event during initial sync and asserts it is not lost.

All gateway tests pass with -race.
2026-05-20 21:52:39 +00:00
Rex d370d5ec23 CUB-200: fix WS initial sync ordering — start readLoop before initialSync
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
The root cause of the initial sync timeout was that connectAndRun called
initialSync (which uses Send/RPC) before starting readLoop, but Send's
response delivery depends on readLoop→routeFrame→handleResponse. Without
the readLoop running, agents.list and sessions.list would always time out.

Fix: start readLoop in a goroutine before calling initialSync so that
RPC responses are properly routed back to pending Send() calls. After
initialSync completes, event handlers are registered and MarkWSReady
is called. The connectAndRun function then blocks on the readLoop
goroutine's completion.

Also added TestConnectAndRun_InitialSyncOrdering which verifies that
agents are persisted from initial sync (would hang/timeout under the
old ordering).
2026-05-20 21:42:31 +00:00
Dex 1b82e1d3a6 CUB-200: resolve merge conflicts with dev — adopt dev's consolidated workflows and improved Go gateway code
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
2026-05-20 21:26:17 +00:00
overseer 93bf434a47 Merge pull request 'CUB-125: implement real-time SSE/WebSocket in React frontend' (!46) from agent/rex/CUB-125-realtime-sse-clean into dev
Dev Build & Deploy / test-and-build (push) Failing after 0s
Dev Build & Deploy / docker-build-push (push) Has been skipped
Reviewed-on: #46
2026-05-20 13:14:29 -04:00
Rex 010408cc45 CUB-125: address Grimm review — tests, type fixes, error state circuit breaker
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
- Add missing 'offline' to AgentStatus union type (types/index.ts)
- Add max-retry circuit breaker to useSSE; error state is now reachable
- Wire typed SSE payloads (SSEPayloadMap discriminated union) into useRealtimeSync
- Add Vitest + 20 unit tests: useSSE lifecycle, back-off, circuit breaker,
  event parsing, cleanup; useRealtimeSync event-to-invalidation mapping
- Rebase on dev to remove stale CUB-119 legacy-deletion commit and align
  CI workflow (dev already consolidated into single dev.yml)
- Tests: npm test → 20/20 pass; Build: npm run build → 0 errors
2026-05-20 12:58:21 -04:00
overseer 23f9d4a8fb CUB-125: implement real-time SSE/WebSocket in React frontend
- Add useSSE hook with exponential back-off reconnect (1s → 30s)
- Add useRealtimeSync hook: maps SSE events to React Query invalidation
  (agent.status → agents; agent.task/agent.progress → tasks+agents; fleet.update → all)
- Add SSEContext/SSEProvider so connection status is available app-wide
- Mount SSEProvider in main.tsx inside QueryClientProvider (no polling)
- Show live/connecting/reconnecting/disconnected badge in sidebar + mobile header
- Update SettingsPage: replace polling interval UI with SSE status panel
- Disable React Query polling (staleTime 60s); all updates pushed via SSE

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 12:58:21 -04:00
Dex d9a1640b10 CUB-200: sync CI workflows with dev branch
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
- Overwrite dev.yml with dev's consolidated version (parameterized Go/Node versions, cleaner install steps)
- Add deploy-dev.yaml from dev (was missing on this branch)
- build-dev.yaml confirmed absent (was deleted on dev in PR #45)
2026-05-20 16:29:57 +00:00
overseer 6fd2d9bec4 Merge branch 'dev' into agent/dex/CUB-200-ws-gateway-client
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
2026-05-20 08:12:36 -04:00
Dex d28d6e8dac CUB-200: implement WebSocket gateway client with v3 protocol
Dev Build / build-test (pull_request) Has been cancelled
Dev Build / deploy-dev (pull_request) Has been cancelled
Replace REST poller with WebSocket client as primary gateway connection:

- wsclient.go: WebSocket client with v3 handshake (connect.challenge →
  connect → hello-ok), frame routing (req/res/event), JSON-RPC Send(),
  auto-reconnect with exponential backoff (1s → 30s max)
- sync.go: Initial sync via agents.list + sessions.list RPCs, merge
  session runtime state into AgentCardData, broadcast fleet.update
- events.go: Real-time event handlers for sessions.changed, presence,
  and agent.config — DB update first, then SSE broadcast
- client.go: REST poller retained as fallback (WS is primary)
- config.go: Add GATEWAY_WS_URL and OPENCLAW_GATEWAY_TOKEN env vars
- main.go: Wire WS client as primary, REST as fallback
- .env.example: Document new WS config vars

Fallback: If WS connection fails, seeded demo data + REST polling
remain available.
2026-05-20 11:33:17 +00:00
8 changed files with 639 additions and 66 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
# When using docker-compose, these are set in the services section
# See docker-compose.yml for service-specific environment variables
# ── Database Configuration ─────────────────────────────────────────────
# ── Database Configuration ───────────────────────────────────────────────
# Set in the db service environment section of docker-compose.yml
# POSTGRES_USER=controlcenter
# POSTGRES_PASSWORD=controlcenter
+304
View File
@@ -0,0 +1,304 @@
# Control Center — Project Context
> **Last updated:** 2026-05-21
> **Repo:** `CubeCraft-Creations/Control-Center` | **Host:** `code.cubecraftcreations.com`
> **Local clone:** `/mnt/ai-storage/projects/Control-Center` | **Default branch:** `dev`
> **Discord:** `DISCORD_DEV_CONTROL_CENTER_CHANNEL_ID`
> **Linear Epic:** [CUB-119](https://linear.app/cubecraft-creations/issue/CUB-119)
---
## Overview
Real-time dashboard for monitoring and controlling the OpenClaw agent fleet. Displays agent statuses, active tasks, sessions, and projects. Uses SSE for live updates from the Go backend, which connects to the OpenClaw gateway via WebSocket for live agent data.
**Completed refactor:** ASP.NET Core + Angular → Go + React is done (CUB-119 epic). All legacy code is removed from git.
## Tech Stack
| Layer | Technology | Notes |
|-------|-----------|-------|
| Backend | Go 1.24+ | Chi router, pgx (PostgreSQL), SSE broker, gorilla/websocket |
| Frontend | React 18 + TypeScript | Vite, Tailwind CSS, React Query, TanStack Router |
| Database | PostgreSQL 16+ | snake_case naming, 2 migrations |
| Real-time | SSE + WebSocket | SSE for browser, WebSocket for OpenClaw gateway |
| Gateway Integration | WebSocket client | OpenClaw gateway `/ws` — live agent + session RPC |
| API Client | TypeScript SDK | `api-client/` — shared models, WS client, HTTP client |
| CI/CD | Gitea Actions | `.gitea/workflows/dev.yml`, `deploy-dev.yaml` |
| Deployment | Docker Compose | PostgreSQL + Go backend + React/nginx |
| Testing | Vitest, Go test | Unit + integration tests for WS client, gateway, handlers |
| Design | Kiosk layout | Bottom nav (mobile), nav rail (desktop), quick-jump drawer |
## Architecture
```
OpenClaw Gateway (WebSocket)
Go Backend (Chi + pgx)
├── Gateway WS Client (connect, reconnect, agents.list, sessions.list RPC)
├── SSE Broker (fan-out: agent.status, agent.task, fleet.update)
├── REST API (/api/agents, /api/sessions, /api/tasks, /api/projects)
└── Repository/Store layers → PostgreSQL
├── SSE /api/events
React Frontend
├── SSEProvider → useRealtimeSync → React Query cache
├── HubPage (dashboard), LogsPage, ProjectsPage, SessionsPage, SettingsPage
└── Layout (header bar + nav rail + bottom nav + quick-jump drawer)
```
### Key Architecture Decisions
1. **Go replaced ASP.NET Core** — lighter runtime, faster cold-start, better concurrency for gateway polling
2. **React replaced Angular** — lighter than Angular for dashboard/kiosk use
3. **SSE over SignalR** — simpler server-side, unidirectional events sufficient for browser updates
4. **WebSocket for gateway integration** — bidirectional needed for RPC (agents.list, sessions.list)
5. **PostgreSQL** — shared with Extrudex pattern; migrations in `go-backend/migrations/`
6. **Agent state seeded on first boot** via `gateway.SeedDemoAgents` for offline dev
## Project Structure
```
Control-Center/
├── go-backend/ # Go backend
│ ├── cmd/server/main.go # Entrypoint, wire deps, start gateway poller
│ ├── Dockerfile / go.mod / go.sum
│ ├── migrations/ # 001_initial_schema, 002_add_indexes
│ └── internal/
│ ├── config/config.go # Env vars (DATABASE_URL, GATEWAY_URL, etc.)
│ ├── db/db.go # PostgreSQL pool (pgx)
│ ├── gateway/
│ │ ├── client.go # GW poller → sync DB + SSE fan-out
│ │ ├── events.go # SSE event broker
│ │ ├── events_test.go
│ │ ├── sync.go # Initial sync from gateway
│ │ ├── sync_test.go
│ │ ├── wsclient.go # WebSocket client (handshake, connect, reconnect, RPC)
│ │ └── wsclient_test.go
│ ├── handler/
│ │ ├── agent.go # CRUD + history
│ │ ├── project.go # List projects
│ │ ├── session.go # List sessions
│ │ ├── sse.go # SSE broker: subscribe + broadcast
│ │ ├── task.go # List tasks
│ │ ├── helpers.go
│ │ ├── handler_test.go
│ │ └── mock_repos_test.go
│ ├── models/models.go # Domain types
│ ├── repository/ # DB access layer + interfaces
│ ├── router/router.go # Chi router: REST + SSE mount
│ └── store/ # Agent, Project, Session, Task stores
├── api-client/ # Shared TypeScript SDK
│ └── src/
│ ├── models/types.ts # Agent, Session, Task, Project, SSE event types
│ ├── services/http-client.ts # Axios REST client
│ ├── utils/
│ │ ├── config.ts # Client config
│ │ └── status-mapper.ts # Agent status → display mapping
│ └── websocket/
│ └── ws-client.ts # WebSocket client (handshake, Send, RPC, reconnector)
├── frontend/ # React frontend
│ ├── Dockerfile + nginx.conf
│ ├── package.json + vite.config.ts
│ ├── src/
│ │ ├── App.tsx / main.tsx
│ │ ├── components/
│ │ │ ├── ErrorBoundary.tsx
│ │ │ └── Layout.tsx # Header bar + nav rail + bottom nav + quick-jump
│ │ ├── contexts/
│ │ │ └── SSEContext.tsx # SSEProvider — wraps entire app
│ │ ├── hooks/
│ │ │ ├── useLocalStorage.ts
│ │ │ ├── useRealtimeSync.ts # SSE messages → React Query cache
│ │ │ ├── useRealtimeSync.test.tsx
│ │ │ ├── useSSE.ts # SSE: connect, reconnect, typed events
│ │ │ ├── useSSE.test.ts
│ │ │ └── useTheme.tsx
│ │ ├── pages/
│ │ │ ├── HubPage.tsx # Fleet dashboard (agent grid + stats)
│ │ │ ├── LogsPage.tsx # Agent log viewer
│ │ │ ├── ProjectsPage.tsx # Project list
│ │ │ ├── SessionsPage.tsx # Session list
│ │ │ └── SettingsPage.tsx # Settings + theme toggle
│ │ ├── services/
│ │ │ ├── api.ts # Axios REST client
│ │ │ └── sse.ts # SSE utilities
│ │ └── types/index.ts
│ └── vitest.config.ts
├── frontend-legacy/ # Original Angular frontend (kept for reference, not in git)
├── backend/ # Original ASP.NET backend (kept for reference, not in git)
│ ├── ControlCenter/ # ASP.NET Core project
│ └── Api/ # API layer
├── design/
│ ├── command-hub-spec.md # Detailed design spec
│ └── mockups/ # Desktop kiosk, mobile, quick-jump drawer
├── kiosk/
│ ├── control-center-kiosk.service # Systemd service
│ └── start-kiosk.sh # Kiosk startup script
├── reference/
│ └── CONTROL_CENTER_CONTEXT.md # Older context file (superseded by this one)
├── ci-image/Dockerfile # CI build image
├── docker-compose.yml
└── .env.example
```
## Database Schema (PostgreSQL)
### agents
| Column | Type | Notes |
|--------|------|-------|
| id | UUID PK | |
| display_name | VARCHAR(256) NOT NULL | |
| role | VARCHAR(256) | |
| status | VARCHAR(32) | active, idle, thinking, error |
| current_task | VARCHAR(512) | |
| task_progress | INTEGER DEFAULT 0 | |
| session_key | VARCHAR(256) | |
| channel | VARCHAR(256) | |
| last_activity | TIMESTAMP | |
| error_message | TEXT | |
| created_at | TIMESTAMP | |
| updated_at | TIMESTAMP | |
### sessions
| Column | Type |
|--------|------|
| id | UUID PK |
| session_key | VARCHAR(256) UNIQUE |
| agent_id | UUID FK → agents |
| channel | VARCHAR(256) |
| status | VARCHAR(32) |
| context_tokens | INTEGER |
| total_tokens | INTEGER |
| estimated_cost | NUMERIC |
| model | VARCHAR(256) |
| started_at | TIMESTAMP |
| last_activity_at | TIMESTAMP |
### tasks
| Column | Type |
|--------|------|
| id | UUID PK |
| agent_id | UUID FK → agents |
| title | VARCHAR(512) |
| description | TEXT |
| status | VARCHAR(32) |
| progress | INTEGER DEFAULT 0 |
| session_key | VARCHAR(256) |
| created_at, updated_at | TIMESTAMP |
### projects
| Column | Type |
|--------|------|
| id | UUID PK |
| name | VARCHAR(256) UNIQUE |
| description | TEXT |
| status | VARCHAR(32) |
| agent_ids | TEXT[] |
| created_at, updated_at | TIMESTAMP |
## API Endpoints
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | /health | Health check (probes DB) |
| GET | /api/agents | List all agents |
| POST | /api/agents | Create agent |
| GET | /api/agents/:id | Get agent detail |
| PUT | /api/agents/:id | Update agent |
| DELETE | /api/agents/:id | Delete agent |
| GET | /api/agents/:id/history | Agent status history |
| GET | /api/sessions | List sessions |
| GET | /api/tasks | List tasks |
| GET | /api/projects | List projects |
| GET | /api/events | SSE event stream |
## SSE Events
| Event Type | Payload |
|------------|---------|
| `agent.status` | Agent status change |
| `agent.task` | Agent current task updated |
| `agent.progress` | Task progress percentage |
| `fleet.update` | Full fleet snapshot |
| `connected` | Connection established |
## CI/CD Pipeline
### dev.yml
- Lint + typecheck: Go vet + golangci-lint + tsc
- Test: Go test + vitest
- Build: Go build + npm build
- Docker: Build and push images
- Triggers: push to dev/main
### deploy-dev.yaml
- Workflow dispatch
- SCP deploy script to dev host
- systemctl restart with rollback
## Docker Compose
| Service | Image/Build | Ports | Depends On |
|---------|-------------|-------|------------|
| postgres | postgres:16-alpine | 5432 | — |
| backend | ./go-backend/Dockerfile | 8080 | postgres (healthy) |
| frontend | ./frontend/Dockerfile | 3000 | backend (healthy) |
## Linear Issue Map
| CUB | Title | Status |
|-----|-------|--------|
| 119 | **Epic: Control Center Refactor — .NET → Go + React** | Todo |
| 120 | PostgreSQL schema + migrations | Done |
| 121 | React pages wired to real API | Done |
| 122 | React frontend scaffold | Done |
| 123 | Gateway integration + SSE streaming | Done |
| 124 | Go backend scaffold | Done |
| 125 | Real-time SSE frontend | Done |
| 126 | Docker Compose deployment | Done |
| 127 | CRUD API endpoints | Done |
| 200 | Live WebSocket gateway client (CUB-200-207 sub-epic) | In Review |
| 201 | agents.list + sessions.list RPC and data mapping | In Review |
| 202 | Real-time event subscription + SSE fan-out | In Review |
| 203 | WS client scaffold — handshake, connect, reconnect loop | In Review |
| 204 | Config, wiring, and graceful fallback | In Review |
| 205 | Unit tests — gateway utility functions | In Review |
| 206 | Unit tests — WSClient handshake, Send/RPC, frame router, reconnect | In Review |
| 207 | Unit tests — event handlers and initial sync | In Review |
### Legacy Issues (Angular/ASP.NET — all Done)
CUB-19 through CUB-63: All 27 Control Center issues completed, including minion mapping, breakroom UI, dark mode theme, agent cards, quick-jump drawer, adaptive nav, SignalR hub, and status animations.
## Known Limitations / Next Steps
1. Agent detail/history views are scaffolded but not fully implemented
2. 16-bit minion breakroom concept (CUB-59-63) was on Angular — needs React port if desired
3. `.env` must be created from `.env.example` with a valid `GATEWAY_URL` for live agent data
4. Docker containers not currently running — start with `docker compose up --build -d`
## Default Agent Assignments
| Area | Agent | Notes |
|------|-------|-------|
| Backend (Go API, Gateway WS, SSE) | Dex | gitea-dex MCP |
| Database (PostgreSQL schema) | Hex | gitea-hex MCP |
| Frontend (React, Tailwind) | Rex | gitea-rex MCP |
| Design (wireframes, UX) | Sketch | |
## Getting Started
```bash
cd /mnt/ai-storage/projects/Control-Center
git checkout dev
git pull origin dev
# Docker Compose (recommended)
cp .env.example .env # edit GATEWAY_URL first
docker compose up --build -d
# Manual
cd go-backend && go run cmd/server/main.go # → :8080
cd frontend && npm install && npm run dev # → :5173 (Vite proxy to :8080)
```
+1 -1
View File
@@ -112,7 +112,7 @@ func main() {
<-quit
slog.Info("shutting down server...")
cancel() // stop gateway polling
cancel() // stop gateway clients
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel()
+18 -18
View File
@@ -10,30 +10,30 @@ import (
// Config holds all application configuration.
type Config struct {
Port int
DatabaseURL string
CORSOrigin string
LogLevel string
Environment string
GatewayRestURL string
GatewayRestPollInterval time.Duration
WSGatewayURL string
WSGatewayToken string
Port int
DatabaseURL string
CORSOrigin string
LogLevel string
Environment string
GatewayRestURL string
GatewayRestPollInterval time.Duration
WSGatewayURL string
WSGatewayToken string
}
// Load reads configuration from environment variables, applying defaults where
// values are not set. All secrets come from the environment — nothing is hardcoded.
func Load() *Config {
return &Config{
Port: getEnvInt("PORT", 8080),
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
Port: getEnvInt("PORT", 8080),
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
}
}
+30 -27
View File
@@ -1,6 +1,10 @@
// Package gateway provides an OpenClaw gateway integration client that
// polls agent states, persists them via the repository layer, and broadcasts
// changes through the SSE broker for real-time frontend updates.
//
// When a WSClient is wired via SetWSClient, the REST poller becomes a
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect within 30 seconds.
package gateway
import (
@@ -29,7 +33,7 @@ type Client struct {
broker *handler.Broker
wsClient *WSClient // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race
wsReadyOnce sync.Once // protects wsReady close from double-close race
}
// Config holds gateway client configuration, typically loaded from environment.
@@ -140,7 +144,6 @@ func (c *Client) poll(ctx context.Context) {
}
for _, ga := range agents {
// Check if agent already exists; if so, update; otherwise create.
existing, err := c.agents.Get(ctx, ga.ID)
if err != nil {
// Not found — create it
@@ -185,51 +188,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
slog.Info("seeding demo agents")
demoAgents := []models.AgentCardData{
{
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusActive,
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Orchestrating tasks"),
SessionKey: "otto-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
SessionKey: "rex-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
},
{
ID: "dex",
DisplayName: "Dex",
Role: "Backend Dev",
Status: models.AgentStatusThinking,
ID: "dex",
DisplayName: "Dex",
Role: "Backend Dev",
Status: models.AgentStatusThinking,
CurrentTask: strPtr("Designing API contracts"),
SessionKey: "dex-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "hex",
DisplayName: "Hex",
Role: "Database Specialist",
Status: models.AgentStatusActive,
ID: "hex",
DisplayName: "Hex",
Role: "Database Specialist",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Reviewing schema migrations"),
SessionKey: "hex-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339),
},
{
ID: "pip",
DisplayName: "Pip",
Role: "Edge Device Dev",
Status: models.AgentStatusIdle,
ID: "pip",
DisplayName: "Pip",
Role: "Edge Device Dev",
Status: models.AgentStatusIdle,
SessionKey: "pip-session",
Channel: "discord",
Channel: "discord",
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
},
}
+35 -15
View File
@@ -92,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
type wsFrame struct {
Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames)
Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames)
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *wsError `json:"error,omitempty"`
@@ -229,7 +229,34 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
c.connId = helloOK.ConnID
c.connMu.Unlock()
// Notify REST client that WS is live so it stands down
// Step 2b: Register live event handlers BEFORE starting the read
// loop. This eliminates the race window where readLoop dispatches
// live events as "unhandled" because no handlers are registered yet.
// The handlers only depend on c.agents and c.broker, which are wired
// in the constructor — they do not need initialSync to have completed.
c.registerEventHandlers()
// Step 2c: Start the read loop in a goroutine so that Send() in
// initialSync can receive responses. The read loop goroutine will
// continue running after initialSync completes, routing live events
// and any future RPC responses. Because handlers are already
// registered, any events arriving during or after initialSync are
// dispatched correctly.
readLoopErrCh := make(chan error, 1)
go func() {
readLoopErrCh <- c.readLoop(ctx, conn)
}()
// Step 2d: Initial sync — fetch agents + sessions from gateway.
// This works because the read loop is active and will route
// response frames back to Send() via handleResponse.
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
}
// Notify REST client that WS is live so it stands down.
// This must happen AFTER initialSync so that the REST poller
// doesn't start polling while we're still syncing.
if c.restClient != nil {
c.restClient.MarkWSReady()
c.logger.Info("ws client notified REST fallback to stand down")
@@ -238,16 +265,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
c.wsReadyOnce = sync.Once{}
// Step 2b: Initial sync — fetch agents + sessions from gateway
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
}
// Step 2c: Register live event handlers
c.registerEventHandlers()
// Step 3: Read loop
return c.readLoop(ctx, conn)
// Step 3: Wait for the read loop goroutine to finish (blocks
// until the connection drops or context is cancelled).
return <-readLoopErrCh
}
// readChallenge reads the first frame from the gateway, which must be a
@@ -11,6 +11,7 @@ import (
"testing"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
"github.com/gorilla/websocket"
@@ -466,6 +467,236 @@ func TestAgentItemToCard(t *testing.T) {
})
}
// ── 6. Test: Initial sync ordering (readLoop active before Send) ──────────
// TestConnectAndRun_InitialSyncOrdering verifies that the WS client
// completes initial sync successfully. This test would hang/timeout if
// readLoop were NOT started before initialSync, because Send() relies on
// readLoop→routeFrame→handleResponse to deliver RPC responses.
func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
srv := newTestWSServer(t, func(conn *websocket.Conn) {
// Handshake
handleHandshake(t, conn)
// After handshake, respond to RPCs
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
var result any
switch method {
case "agents.list":
result = []map[string]any{
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
{"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"},
}
case "sessions.list":
result = []map[string]any{
{"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"},
}
default:
result = map[string]any{}
}
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait for initial sync to complete by checking repo state.
// The agents should be persisted from the RPC responses.
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
repo.mu.Lock()
_, ottoOK := repo.agents["otto"]
_, dexOK := repo.agents["dex"]
repo.mu.Unlock()
if ottoOK && dexOK {
break
}
time.Sleep(50 * time.Millisecond)
}
repo.mu.Lock()
_, ottoOK := repo.agents["otto"]
_, dexOK := repo.agents["dex"]
repo.mu.Unlock()
if !ottoOK {
t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()")
}
if !dexOK {
t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()")
}
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly")
}
}
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
// arriving during initial sync are NOT dropped. This is a regression test
// for the race where readLoop started before registerEventHandlers(),
// causing events read during that window to be logged as "unhandled" and lost.
//
// The mock server sends a live event (sessions.changed) right after the
// handshake, interleaved with the RPC responses for agents.list and
// sessions.list. The test asserts the event is received by the handler.
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Pre-seed an agent so the event handler can update it.
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Status: models.AgentStatusIdle,
}
srv := newTestWSServer(t, func(conn *websocket.Conn) {
// Handshake
handleHandshake(t, conn)
// After handshake, process RPCs and inject a live event.
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
// Respond to agents.list RPC
if method == "agents.list" {
// Before responding, inject a live event — simulates
// a gateway pushing a presence update during sync.
evt := map[string]any{
"type": "event",
"event": "presence",
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
}
if err := conn.WriteJSON(evt); err != nil {
break
}
// Now send the RPC response
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": []map[string]any{
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
},
}
if err := conn.WriteJSON(res); err != nil {
break
}
continue
}
// Respond to sessions.list RPC
if method == "sessions.list" {
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": []map[string]any{},
}
if err := conn.WriteJSON(res); err != nil {
break
}
continue
}
// Default response for other methods
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": map[string]any{},
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait for the presence event to be processed by checking the repo.
// The presence handler updates the agent, so we check for the
// lastActivityAt change.
deadline := time.Now().Add(5 * time.Second)
var lastActivity string
for time.Now().Before(deadline) {
repo.mu.Lock()
if a, ok := repo.agents["otto"]; ok {
lastActivity = a.LastActivity
}
repo.mu.Unlock()
if lastActivity == "2025-05-20T12:30:00Z" {
break
}
time.Sleep(50 * time.Millisecond)
}
if lastActivity != "2025-05-20T12:30:00Z" {
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
}
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly")
}
}
func TestStrPtr(t *testing.T) {
s := "hello"
p := strPtr(s)
+15
View File
@@ -65,6 +65,21 @@ func New(deps *Dependencies) *chi.Mux {
// ── API v1 routes ──────────────────────────────────────────────────────
r.Route("/api", func(api chi.Router) {
// Health check (under /api)
api.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
status := "ok"
if deps.Pool != nil {
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
if err := deps.Pool.Ping(ctx); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
status = "db_unhealthy"
}
}
w.Write([]byte(`{"status":"` + status + `"}`))
})
// Agents CRUD
api.Route("/agents", func(agents chi.Router) {
agents.Get("/", deps.Handler.ListAgents) // GET /api/agents