Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4509b0c217 | |||
| f3ce08497a | |||
| fd60b0bb57 | |||
| b7b05bb4e3 | |||
| d370d5ec23 | |||
| 1b82e1d3a6 | |||
| 93bf434a47 | |||
| 010408cc45 | |||
| 23f9d4a8fb | |||
| d9a1640b10 | |||
| 6fd2d9bec4 | |||
| d28d6e8dac |
+1
-1
@@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
|
|||||||
# When using docker-compose, these are set in the services section
|
# When using docker-compose, these are set in the services section
|
||||||
# See docker-compose.yml for service-specific environment variables
|
# See docker-compose.yml for service-specific environment variables
|
||||||
|
|
||||||
# ── Database Configuration ─────────────────────────────────────────────
|
# ── Database Configuration ───────────────────────────────────────────────
|
||||||
# Set in the db service environment section of docker-compose.yml
|
# Set in the db service environment section of docker-compose.yml
|
||||||
# POSTGRES_USER=controlcenter
|
# POSTGRES_USER=controlcenter
|
||||||
# POSTGRES_PASSWORD=controlcenter
|
# POSTGRES_PASSWORD=controlcenter
|
||||||
|
|||||||
+304
@@ -0,0 +1,304 @@
|
|||||||
|
# Control Center — Project Context
|
||||||
|
|
||||||
|
> **Last updated:** 2026-05-21
|
||||||
|
> **Repo:** `CubeCraft-Creations/Control-Center` | **Host:** `code.cubecraftcreations.com`
|
||||||
|
> **Local clone:** `/mnt/ai-storage/projects/Control-Center` | **Default branch:** `dev`
|
||||||
|
> **Discord:** `DISCORD_DEV_CONTROL_CENTER_CHANNEL_ID`
|
||||||
|
> **Linear Epic:** [CUB-119](https://linear.app/cubecraft-creations/issue/CUB-119)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
Real-time dashboard for monitoring and controlling the OpenClaw agent fleet. Displays agent statuses, active tasks, sessions, and projects. Uses SSE for live updates from the Go backend, which connects to the OpenClaw gateway via WebSocket for live agent data.
|
||||||
|
|
||||||
|
**Completed refactor:** ASP.NET Core + Angular → Go + React is done (CUB-119 epic). All legacy code is removed from git.
|
||||||
|
|
||||||
|
## Tech Stack
|
||||||
|
|
||||||
|
| Layer | Technology | Notes |
|
||||||
|
|-------|-----------|-------|
|
||||||
|
| Backend | Go 1.24+ | Chi router, pgx (PostgreSQL), SSE broker, gorilla/websocket |
|
||||||
|
| Frontend | React 18 + TypeScript | Vite, Tailwind CSS, React Query, TanStack Router |
|
||||||
|
| Database | PostgreSQL 16+ | snake_case naming, 2 migrations |
|
||||||
|
| Real-time | SSE + WebSocket | SSE for browser, WebSocket for OpenClaw gateway |
|
||||||
|
| Gateway Integration | WebSocket client | OpenClaw gateway `/ws` — live agent + session RPC |
|
||||||
|
| API Client | TypeScript SDK | `api-client/` — shared models, WS client, HTTP client |
|
||||||
|
| CI/CD | Gitea Actions | `.gitea/workflows/dev.yml`, `deploy-dev.yaml` |
|
||||||
|
| Deployment | Docker Compose | PostgreSQL + Go backend + React/nginx |
|
||||||
|
| Testing | Vitest, Go test | Unit + integration tests for WS client, gateway, handlers |
|
||||||
|
| Design | Kiosk layout | Bottom nav (mobile), nav rail (desktop), quick-jump drawer |
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
```
|
||||||
|
OpenClaw Gateway (WebSocket)
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
Go Backend (Chi + pgx)
|
||||||
|
├── Gateway WS Client (connect, reconnect, agents.list, sessions.list RPC)
|
||||||
|
├── SSE Broker (fan-out: agent.status, agent.task, fleet.update)
|
||||||
|
├── REST API (/api/agents, /api/sessions, /api/tasks, /api/projects)
|
||||||
|
└── Repository/Store layers → PostgreSQL
|
||||||
|
│
|
||||||
|
├── SSE /api/events
|
||||||
|
▼
|
||||||
|
React Frontend
|
||||||
|
├── SSEProvider → useRealtimeSync → React Query cache
|
||||||
|
├── HubPage (dashboard), LogsPage, ProjectsPage, SessionsPage, SettingsPage
|
||||||
|
└── Layout (header bar + nav rail + bottom nav + quick-jump drawer)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Key Architecture Decisions
|
||||||
|
1. **Go replaced ASP.NET Core** — lighter runtime, faster cold-start, better concurrency for gateway polling
|
||||||
|
2. **React replaced Angular** — lighter than Angular for dashboard/kiosk use
|
||||||
|
3. **SSE over SignalR** — simpler server-side, unidirectional events sufficient for browser updates
|
||||||
|
4. **WebSocket for gateway integration** — bidirectional needed for RPC (agents.list, sessions.list)
|
||||||
|
5. **PostgreSQL** — shared with Extrudex pattern; migrations in `go-backend/migrations/`
|
||||||
|
6. **Agent state seeded on first boot** via `gateway.SeedDemoAgents` for offline dev
|
||||||
|
|
||||||
|
## Project Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
Control-Center/
|
||||||
|
├── go-backend/ # Go backend
|
||||||
|
│ ├── cmd/server/main.go # Entrypoint, wire deps, start gateway poller
|
||||||
|
│ ├── Dockerfile / go.mod / go.sum
|
||||||
|
│ ├── migrations/ # 001_initial_schema, 002_add_indexes
|
||||||
|
│ └── internal/
|
||||||
|
│ ├── config/config.go # Env vars (DATABASE_URL, GATEWAY_URL, etc.)
|
||||||
|
│ ├── db/db.go # PostgreSQL pool (pgx)
|
||||||
|
│ ├── gateway/
|
||||||
|
│ │ ├── client.go # GW poller → sync DB + SSE fan-out
|
||||||
|
│ │ ├── events.go # SSE event broker
|
||||||
|
│ │ ├── events_test.go
|
||||||
|
│ │ ├── sync.go # Initial sync from gateway
|
||||||
|
│ │ ├── sync_test.go
|
||||||
|
│ │ ├── wsclient.go # WebSocket client (handshake, connect, reconnect, RPC)
|
||||||
|
│ │ └── wsclient_test.go
|
||||||
|
│ ├── handler/
|
||||||
|
│ │ ├── agent.go # CRUD + history
|
||||||
|
│ │ ├── project.go # List projects
|
||||||
|
│ │ ├── session.go # List sessions
|
||||||
|
│ │ ├── sse.go # SSE broker: subscribe + broadcast
|
||||||
|
│ │ ├── task.go # List tasks
|
||||||
|
│ │ ├── helpers.go
|
||||||
|
│ │ ├── handler_test.go
|
||||||
|
│ │ └── mock_repos_test.go
|
||||||
|
│ ├── models/models.go # Domain types
|
||||||
|
│ ├── repository/ # DB access layer + interfaces
|
||||||
|
│ ├── router/router.go # Chi router: REST + SSE mount
|
||||||
|
│ └── store/ # Agent, Project, Session, Task stores
|
||||||
|
├── api-client/ # Shared TypeScript SDK
|
||||||
|
│ └── src/
|
||||||
|
│ ├── models/types.ts # Agent, Session, Task, Project, SSE event types
|
||||||
|
│ ├── services/http-client.ts # Axios REST client
|
||||||
|
│ ├── utils/
|
||||||
|
│ │ ├── config.ts # Client config
|
||||||
|
│ │ └── status-mapper.ts # Agent status → display mapping
|
||||||
|
│ └── websocket/
|
||||||
|
│ └── ws-client.ts # WebSocket client (handshake, Send, RPC, reconnector)
|
||||||
|
├── frontend/ # React frontend
|
||||||
|
│ ├── Dockerfile + nginx.conf
|
||||||
|
│ ├── package.json + vite.config.ts
|
||||||
|
│ ├── src/
|
||||||
|
│ │ ├── App.tsx / main.tsx
|
||||||
|
│ │ ├── components/
|
||||||
|
│ │ │ ├── ErrorBoundary.tsx
|
||||||
|
│ │ │ └── Layout.tsx # Header bar + nav rail + bottom nav + quick-jump
|
||||||
|
│ │ ├── contexts/
|
||||||
|
│ │ │ └── SSEContext.tsx # SSEProvider — wraps entire app
|
||||||
|
│ │ ├── hooks/
|
||||||
|
│ │ │ ├── useLocalStorage.ts
|
||||||
|
│ │ │ ├── useRealtimeSync.ts # SSE messages → React Query cache
|
||||||
|
│ │ │ ├── useRealtimeSync.test.tsx
|
||||||
|
│ │ │ ├── useSSE.ts # SSE: connect, reconnect, typed events
|
||||||
|
│ │ │ ├── useSSE.test.ts
|
||||||
|
│ │ │ └── useTheme.tsx
|
||||||
|
│ │ ├── pages/
|
||||||
|
│ │ │ ├── HubPage.tsx # Fleet dashboard (agent grid + stats)
|
||||||
|
│ │ │ ├── LogsPage.tsx # Agent log viewer
|
||||||
|
│ │ │ ├── ProjectsPage.tsx # Project list
|
||||||
|
│ │ │ ├── SessionsPage.tsx # Session list
|
||||||
|
│ │ │ └── SettingsPage.tsx # Settings + theme toggle
|
||||||
|
│ │ ├── services/
|
||||||
|
│ │ │ ├── api.ts # Axios REST client
|
||||||
|
│ │ │ └── sse.ts # SSE utilities
|
||||||
|
│ │ └── types/index.ts
|
||||||
|
│ └── vitest.config.ts
|
||||||
|
├── frontend-legacy/ # Original Angular frontend (kept for reference, not in git)
|
||||||
|
├── backend/ # Original ASP.NET backend (kept for reference, not in git)
|
||||||
|
│ ├── ControlCenter/ # ASP.NET Core project
|
||||||
|
│ └── Api/ # API layer
|
||||||
|
├── design/
|
||||||
|
│ ├── command-hub-spec.md # Detailed design spec
|
||||||
|
│ └── mockups/ # Desktop kiosk, mobile, quick-jump drawer
|
||||||
|
├── kiosk/
|
||||||
|
│ ├── control-center-kiosk.service # Systemd service
|
||||||
|
│ └── start-kiosk.sh # Kiosk startup script
|
||||||
|
├── reference/
|
||||||
|
│ └── CONTROL_CENTER_CONTEXT.md # Older context file (superseded by this one)
|
||||||
|
├── ci-image/Dockerfile # CI build image
|
||||||
|
├── docker-compose.yml
|
||||||
|
└── .env.example
|
||||||
|
```
|
||||||
|
|
||||||
|
## Database Schema (PostgreSQL)
|
||||||
|
|
||||||
|
### agents
|
||||||
|
| Column | Type | Notes |
|
||||||
|
|--------|------|-------|
|
||||||
|
| id | UUID PK | |
|
||||||
|
| display_name | VARCHAR(256) NOT NULL | |
|
||||||
|
| role | VARCHAR(256) | |
|
||||||
|
| status | VARCHAR(32) | active, idle, thinking, error |
|
||||||
|
| current_task | VARCHAR(512) | |
|
||||||
|
| task_progress | INTEGER DEFAULT 0 | |
|
||||||
|
| session_key | VARCHAR(256) | |
|
||||||
|
| channel | VARCHAR(256) | |
|
||||||
|
| last_activity | TIMESTAMP | |
|
||||||
|
| error_message | TEXT | |
|
||||||
|
| created_at | TIMESTAMP | |
|
||||||
|
| updated_at | TIMESTAMP | |
|
||||||
|
|
||||||
|
### sessions
|
||||||
|
| Column | Type |
|
||||||
|
|--------|------|
|
||||||
|
| id | UUID PK |
|
||||||
|
| session_key | VARCHAR(256) UNIQUE |
|
||||||
|
| agent_id | UUID FK → agents |
|
||||||
|
| channel | VARCHAR(256) |
|
||||||
|
| status | VARCHAR(32) |
|
||||||
|
| context_tokens | INTEGER |
|
||||||
|
| total_tokens | INTEGER |
|
||||||
|
| estimated_cost | NUMERIC |
|
||||||
|
| model | VARCHAR(256) |
|
||||||
|
| started_at | TIMESTAMP |
|
||||||
|
| last_activity_at | TIMESTAMP |
|
||||||
|
|
||||||
|
### tasks
|
||||||
|
| Column | Type |
|
||||||
|
|--------|------|
|
||||||
|
| id | UUID PK |
|
||||||
|
| agent_id | UUID FK → agents |
|
||||||
|
| title | VARCHAR(512) |
|
||||||
|
| description | TEXT |
|
||||||
|
| status | VARCHAR(32) |
|
||||||
|
| progress | INTEGER DEFAULT 0 |
|
||||||
|
| session_key | VARCHAR(256) |
|
||||||
|
| created_at, updated_at | TIMESTAMP |
|
||||||
|
|
||||||
|
### projects
|
||||||
|
| Column | Type |
|
||||||
|
|--------|------|
|
||||||
|
| id | UUID PK |
|
||||||
|
| name | VARCHAR(256) UNIQUE |
|
||||||
|
| description | TEXT |
|
||||||
|
| status | VARCHAR(32) |
|
||||||
|
| agent_ids | TEXT[] |
|
||||||
|
| created_at, updated_at | TIMESTAMP |
|
||||||
|
|
||||||
|
## API Endpoints
|
||||||
|
|
||||||
|
| Method | Endpoint | Description |
|
||||||
|
|--------|----------|-------------|
|
||||||
|
| GET | /health | Health check (probes DB) |
|
||||||
|
| GET | /api/agents | List all agents |
|
||||||
|
| POST | /api/agents | Create agent |
|
||||||
|
| GET | /api/agents/:id | Get agent detail |
|
||||||
|
| PUT | /api/agents/:id | Update agent |
|
||||||
|
| DELETE | /api/agents/:id | Delete agent |
|
||||||
|
| GET | /api/agents/:id/history | Agent status history |
|
||||||
|
| GET | /api/sessions | List sessions |
|
||||||
|
| GET | /api/tasks | List tasks |
|
||||||
|
| GET | /api/projects | List projects |
|
||||||
|
| GET | /api/events | SSE event stream |
|
||||||
|
|
||||||
|
## SSE Events
|
||||||
|
|
||||||
|
| Event Type | Payload |
|
||||||
|
|------------|---------|
|
||||||
|
| `agent.status` | Agent status change |
|
||||||
|
| `agent.task` | Agent current task updated |
|
||||||
|
| `agent.progress` | Task progress percentage |
|
||||||
|
| `fleet.update` | Full fleet snapshot |
|
||||||
|
| `connected` | Connection established |
|
||||||
|
|
||||||
|
## CI/CD Pipeline
|
||||||
|
|
||||||
|
### dev.yml
|
||||||
|
- Lint + typecheck: Go vet + golangci-lint + tsc
|
||||||
|
- Test: Go test + vitest
|
||||||
|
- Build: Go build + npm build
|
||||||
|
- Docker: Build and push images
|
||||||
|
- Triggers: push to dev/main
|
||||||
|
|
||||||
|
### deploy-dev.yaml
|
||||||
|
- Workflow dispatch
|
||||||
|
- SCP deploy script to dev host
|
||||||
|
- systemctl restart with rollback
|
||||||
|
|
||||||
|
## Docker Compose
|
||||||
|
|
||||||
|
| Service | Image/Build | Ports | Depends On |
|
||||||
|
|---------|-------------|-------|------------|
|
||||||
|
| postgres | postgres:16-alpine | 5432 | — |
|
||||||
|
| backend | ./go-backend/Dockerfile | 8080 | postgres (healthy) |
|
||||||
|
| frontend | ./frontend/Dockerfile | 3000 | backend (healthy) |
|
||||||
|
|
||||||
|
## Linear Issue Map
|
||||||
|
|
||||||
|
| CUB | Title | Status |
|
||||||
|
|-----|-------|--------|
|
||||||
|
| 119 | **Epic: Control Center Refactor — .NET → Go + React** | Todo |
|
||||||
|
| 120 | PostgreSQL schema + migrations | Done |
|
||||||
|
| 121 | React pages wired to real API | Done |
|
||||||
|
| 122 | React frontend scaffold | Done |
|
||||||
|
| 123 | Gateway integration + SSE streaming | Done |
|
||||||
|
| 124 | Go backend scaffold | Done |
|
||||||
|
| 125 | Real-time SSE frontend | Done |
|
||||||
|
| 126 | Docker Compose deployment | Done |
|
||||||
|
| 127 | CRUD API endpoints | Done |
|
||||||
|
| 200 | Live WebSocket gateway client (CUB-200-207 sub-epic) | In Review |
|
||||||
|
| 201 | agents.list + sessions.list RPC and data mapping | In Review |
|
||||||
|
| 202 | Real-time event subscription + SSE fan-out | In Review |
|
||||||
|
| 203 | WS client scaffold — handshake, connect, reconnect loop | In Review |
|
||||||
|
| 204 | Config, wiring, and graceful fallback | In Review |
|
||||||
|
| 205 | Unit tests — gateway utility functions | In Review |
|
||||||
|
| 206 | Unit tests — WSClient handshake, Send/RPC, frame router, reconnect | In Review |
|
||||||
|
| 207 | Unit tests — event handlers and initial sync | In Review |
|
||||||
|
|
||||||
|
### Legacy Issues (Angular/ASP.NET — all Done)
|
||||||
|
CUB-19 through CUB-63: All 27 Control Center issues completed, including minion mapping, breakroom UI, dark mode theme, agent cards, quick-jump drawer, adaptive nav, SignalR hub, and status animations.
|
||||||
|
|
||||||
|
## Known Limitations / Next Steps
|
||||||
|
|
||||||
|
1. Agent detail/history views are scaffolded but not fully implemented
|
||||||
|
2. 16-bit minion breakroom concept (CUB-59-63) was on Angular — needs React port if desired
|
||||||
|
3. `.env` must be created from `.env.example` with a valid `GATEWAY_URL` for live agent data
|
||||||
|
4. Docker containers not currently running — start with `docker compose up --build -d`
|
||||||
|
|
||||||
|
## Default Agent Assignments
|
||||||
|
|
||||||
|
| Area | Agent | Notes |
|
||||||
|
|------|-------|-------|
|
||||||
|
| Backend (Go API, Gateway WS, SSE) | Dex | gitea-dex MCP |
|
||||||
|
| Database (PostgreSQL schema) | Hex | gitea-hex MCP |
|
||||||
|
| Frontend (React, Tailwind) | Rex | gitea-rex MCP |
|
||||||
|
| Design (wireframes, UX) | Sketch | |
|
||||||
|
|
||||||
|
## Getting Started
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /mnt/ai-storage/projects/Control-Center
|
||||||
|
git checkout dev
|
||||||
|
git pull origin dev
|
||||||
|
|
||||||
|
# Docker Compose (recommended)
|
||||||
|
cp .env.example .env # edit GATEWAY_URL first
|
||||||
|
docker compose up --build -d
|
||||||
|
|
||||||
|
# Manual
|
||||||
|
cd go-backend && go run cmd/server/main.go # → :8080
|
||||||
|
cd frontend && npm install && npm run dev # → :5173 (Vite proxy to :8080)
|
||||||
|
```
|
||||||
@@ -112,7 +112,7 @@ func main() {
|
|||||||
<-quit
|
<-quit
|
||||||
slog.Info("shutting down server...")
|
slog.Info("shutting down server...")
|
||||||
|
|
||||||
cancel() // stop gateway polling
|
cancel() // stop gateway clients
|
||||||
|
|
||||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
defer shutdownCancel()
|
defer shutdownCancel()
|
||||||
|
|||||||
@@ -1,6 +1,10 @@
|
|||||||
// Package gateway provides an OpenClaw gateway integration client that
|
// Package gateway provides an OpenClaw gateway integration client that
|
||||||
// polls agent states, persists them via the repository layer, and broadcasts
|
// polls agent states, persists them via the repository layer, and broadcasts
|
||||||
// changes through the SSE broker for real-time frontend updates.
|
// changes through the SSE broker for real-time frontend updates.
|
||||||
|
//
|
||||||
|
// When a WSClient is wired via SetWSClient, the REST poller becomes a
|
||||||
|
// fallback: it waits for the WS client to signal readiness, and only starts
|
||||||
|
// polling if WS fails to connect within 30 seconds.
|
||||||
package gateway
|
package gateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -140,7 +144,6 @@ func (c *Client) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, ga := range agents {
|
for _, ga := range agents {
|
||||||
// Check if agent already exists; if so, update; otherwise create.
|
|
||||||
existing, err := c.agents.Get(ctx, ga.ID)
|
existing, err := c.agents.Get(ctx, ga.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Not found — create it
|
// Not found — create it
|
||||||
|
|||||||
@@ -229,7 +229,34 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
c.connId = helloOK.ConnID
|
c.connId = helloOK.ConnID
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
// Notify REST client that WS is live so it stands down
|
// Step 2b: Register live event handlers BEFORE starting the read
|
||||||
|
// loop. This eliminates the race window where readLoop dispatches
|
||||||
|
// live events as "unhandled" because no handlers are registered yet.
|
||||||
|
// The handlers only depend on c.agents and c.broker, which are wired
|
||||||
|
// in the constructor — they do not need initialSync to have completed.
|
||||||
|
c.registerEventHandlers()
|
||||||
|
|
||||||
|
// Step 2c: Start the read loop in a goroutine so that Send() in
|
||||||
|
// initialSync can receive responses. The read loop goroutine will
|
||||||
|
// continue running after initialSync completes, routing live events
|
||||||
|
// and any future RPC responses. Because handlers are already
|
||||||
|
// registered, any events arriving during or after initialSync are
|
||||||
|
// dispatched correctly.
|
||||||
|
readLoopErrCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
readLoopErrCh <- c.readLoop(ctx, conn)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Step 2d: Initial sync — fetch agents + sessions from gateway.
|
||||||
|
// This works because the read loop is active and will route
|
||||||
|
// response frames back to Send() via handleResponse.
|
||||||
|
if err := c.initialSync(ctx); err != nil {
|
||||||
|
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify REST client that WS is live so it stands down.
|
||||||
|
// This must happen AFTER initialSync so that the REST poller
|
||||||
|
// doesn't start polling while we're still syncing.
|
||||||
if c.restClient != nil {
|
if c.restClient != nil {
|
||||||
c.restClient.MarkWSReady()
|
c.restClient.MarkWSReady()
|
||||||
c.logger.Info("ws client notified REST fallback to stand down")
|
c.logger.Info("ws client notified REST fallback to stand down")
|
||||||
@@ -238,16 +265,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
||||||
c.wsReadyOnce = sync.Once{}
|
c.wsReadyOnce = sync.Once{}
|
||||||
|
|
||||||
// Step 2b: Initial sync — fetch agents + sessions from gateway
|
// Step 3: Wait for the read loop goroutine to finish (blocks
|
||||||
if err := c.initialSync(ctx); err != nil {
|
// until the connection drops or context is cancelled).
|
||||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
return <-readLoopErrCh
|
||||||
}
|
|
||||||
|
|
||||||
// Step 2c: Register live event handlers
|
|
||||||
c.registerEventHandlers()
|
|
||||||
|
|
||||||
// Step 3: Read loop
|
|
||||||
return c.readLoop(ctx, conn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// readChallenge reads the first frame from the gateway, which must be a
|
// readChallenge reads the first frame from the gateway, which must be a
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
@@ -466,6 +467,236 @@ func TestAgentItemToCard(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── 6. Test: Initial sync ordering (readLoop active before Send) ──────────
|
||||||
|
|
||||||
|
// TestConnectAndRun_InitialSyncOrdering verifies that the WS client
|
||||||
|
// completes initial sync successfully. This test would hang/timeout if
|
||||||
|
// readLoop were NOT started before initialSync, because Send() relies on
|
||||||
|
// readLoop→routeFrame→handleResponse to deliver RPC responses.
|
||||||
|
func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
|
||||||
|
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||||
|
broker := handler.NewBroker()
|
||||||
|
capture := newBroadcastCapture(broker)
|
||||||
|
defer capture.close()
|
||||||
|
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
// Handshake
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
|
||||||
|
// After handshake, respond to RPCs
|
||||||
|
for {
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
reqID, _ := req["id"].(string)
|
||||||
|
method, _ := req["method"].(string)
|
||||||
|
|
||||||
|
var result any
|
||||||
|
switch method {
|
||||||
|
case "agents.list":
|
||||||
|
result = []map[string]any{
|
||||||
|
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
||||||
|
{"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"},
|
||||||
|
}
|
||||||
|
case "sessions.list":
|
||||||
|
result = []map[string]any{
|
||||||
|
{"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"},
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
result = map[string]any{}
|
||||||
|
}
|
||||||
|
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": result,
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
client.Start(ctx)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for initial sync to complete by checking repo state.
|
||||||
|
// The agents should be persisted from the RPC responses.
|
||||||
|
deadline := time.Now().Add(5 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
repo.mu.Lock()
|
||||||
|
_, ottoOK := repo.agents["otto"]
|
||||||
|
_, dexOK := repo.agents["dex"]
|
||||||
|
repo.mu.Unlock()
|
||||||
|
if ottoOK && dexOK {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
repo.mu.Lock()
|
||||||
|
_, ottoOK := repo.agents["otto"]
|
||||||
|
_, dexOK := repo.agents["dex"]
|
||||||
|
repo.mu.Unlock()
|
||||||
|
|
||||||
|
if !ottoOK {
|
||||||
|
t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()")
|
||||||
|
}
|
||||||
|
if !dexOK {
|
||||||
|
t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("WSClient did not shut down cleanly")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
|
||||||
|
|
||||||
|
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
|
||||||
|
// arriving during initial sync are NOT dropped. This is a regression test
|
||||||
|
// for the race where readLoop started before registerEventHandlers(),
|
||||||
|
// causing events read during that window to be logged as "unhandled" and lost.
|
||||||
|
//
|
||||||
|
// The mock server sends a live event (sessions.changed) right after the
|
||||||
|
// handshake, interleaved with the RPC responses for agents.list and
|
||||||
|
// sessions.list. The test asserts the event is received by the handler.
|
||||||
|
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
|
||||||
|
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||||
|
broker := handler.NewBroker()
|
||||||
|
capture := newBroadcastCapture(broker)
|
||||||
|
defer capture.close()
|
||||||
|
|
||||||
|
// Pre-seed an agent so the event handler can update it.
|
||||||
|
repo.agents["otto"] = models.AgentCardData{
|
||||||
|
ID: "otto",
|
||||||
|
DisplayName: "Otto",
|
||||||
|
Status: models.AgentStatusIdle,
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
// Handshake
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
|
||||||
|
// After handshake, process RPCs and inject a live event.
|
||||||
|
for {
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
reqID, _ := req["id"].(string)
|
||||||
|
method, _ := req["method"].(string)
|
||||||
|
|
||||||
|
// Respond to agents.list RPC
|
||||||
|
if method == "agents.list" {
|
||||||
|
// Before responding, inject a live event — simulates
|
||||||
|
// a gateway pushing a presence update during sync.
|
||||||
|
evt := map[string]any{
|
||||||
|
"type": "event",
|
||||||
|
"event": "presence",
|
||||||
|
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(evt); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now send the RPC response
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": []map[string]any{
|
||||||
|
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Respond to sessions.list RPC
|
||||||
|
if method == "sessions.list" {
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": []map[string]any{},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default response for other methods
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": map[string]any{},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
client.Start(ctx)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for the presence event to be processed by checking the repo.
|
||||||
|
// The presence handler updates the agent, so we check for the
|
||||||
|
// lastActivityAt change.
|
||||||
|
deadline := time.Now().Add(5 * time.Second)
|
||||||
|
var lastActivity string
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
repo.mu.Lock()
|
||||||
|
if a, ok := repo.agents["otto"]; ok {
|
||||||
|
lastActivity = a.LastActivity
|
||||||
|
}
|
||||||
|
repo.mu.Unlock()
|
||||||
|
if lastActivity == "2025-05-20T12:30:00Z" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastActivity != "2025-05-20T12:30:00Z" {
|
||||||
|
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("WSClient did not shut down cleanly")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStrPtr(t *testing.T) {
|
func TestStrPtr(t *testing.T) {
|
||||||
s := "hello"
|
s := "hello"
|
||||||
p := strPtr(s)
|
p := strPtr(s)
|
||||||
|
|||||||
@@ -65,6 +65,21 @@ func New(deps *Dependencies) *chi.Mux {
|
|||||||
|
|
||||||
// ── API v1 routes ──────────────────────────────────────────────────────
|
// ── API v1 routes ──────────────────────────────────────────────────────
|
||||||
r.Route("/api", func(api chi.Router) {
|
r.Route("/api", func(api chi.Router) {
|
||||||
|
// Health check (under /api)
|
||||||
|
api.Get("/health", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
status := "ok"
|
||||||
|
if deps.Pool != nil {
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := deps.Pool.Ping(ctx); err != nil {
|
||||||
|
w.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
status = "db_unhealthy"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.Write([]byte(`{"status":"` + status + `"}`))
|
||||||
|
})
|
||||||
|
|
||||||
// Agents CRUD
|
// Agents CRUD
|
||||||
api.Route("/agents", func(agents chi.Router) {
|
api.Route("/agents", func(agents chi.Router) {
|
||||||
agents.Get("/", deps.Handler.ListAgents) // GET /api/agents
|
agents.Get("/", deps.Handler.ListAgents) // GET /api/agents
|
||||||
|
|||||||
Reference in New Issue
Block a user