Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4509b0c217 | |||
| f3ce08497a | |||
| fd60b0bb57 | |||
| b7b05bb4e3 | |||
| d370d5ec23 | |||
| 1b82e1d3a6 | |||
| 93bf434a47 | |||
| 010408cc45 | |||
| 23f9d4a8fb | |||
| d9a1640b10 | |||
| 6fd2d9bec4 | |||
| d28d6e8dac |
+1
-1
@@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
|
|||||||
# When using docker-compose, these are set in the services section
|
# When using docker-compose, these are set in the services section
|
||||||
# See docker-compose.yml for service-specific environment variables
|
# See docker-compose.yml for service-specific environment variables
|
||||||
|
|
||||||
# ── Database Configuration ─────────────────────────────────────────────
|
# ── Database Configuration ───────────────────────────────────────────────
|
||||||
# Set in the db service environment section of docker-compose.yml
|
# Set in the db service environment section of docker-compose.yml
|
||||||
# POSTGRES_USER=controlcenter
|
# POSTGRES_USER=controlcenter
|
||||||
# POSTGRES_PASSWORD=controlcenter
|
# POSTGRES_PASSWORD=controlcenter
|
||||||
|
|||||||
+304
@@ -0,0 +1,304 @@
|
|||||||
|
# Control Center — Project Context
|
||||||
|
|
||||||
|
> **Last updated:** 2026-05-21
|
||||||
|
> **Repo:** `CubeCraft-Creations/Control-Center` | **Host:** `code.cubecraftcreations.com`
|
||||||
|
> **Local clone:** `/mnt/ai-storage/projects/Control-Center` | **Default branch:** `dev`
|
||||||
|
> **Discord:** `DISCORD_DEV_CONTROL_CENTER_CHANNEL_ID`
|
||||||
|
> **Linear Epic:** [CUB-119](https://linear.app/cubecraft-creations/issue/CUB-119)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
Real-time dashboard for monitoring and controlling the OpenClaw agent fleet. Displays agent statuses, active tasks, sessions, and projects. Uses SSE for live updates from the Go backend, which connects to the OpenClaw gateway via WebSocket for live agent data.
|
||||||
|
|
||||||
|
**Completed refactor:** ASP.NET Core + Angular → Go + React is done (CUB-119 epic). All legacy code is removed from git.
|
||||||
|
|
||||||
|
## Tech Stack
|
||||||
|
|
||||||
|
| Layer | Technology | Notes |
|
||||||
|
|-------|-----------|-------|
|
||||||
|
| Backend | Go 1.24+ | Chi router, pgx (PostgreSQL), SSE broker, gorilla/websocket |
|
||||||
|
| Frontend | React 18 + TypeScript | Vite, Tailwind CSS, React Query, TanStack Router |
|
||||||
|
| Database | PostgreSQL 16+ | snake_case naming, 2 migrations |
|
||||||
|
| Real-time | SSE + WebSocket | SSE for browser, WebSocket for OpenClaw gateway |
|
||||||
|
| Gateway Integration | WebSocket client | OpenClaw gateway `/ws` — live agent + session RPC |
|
||||||
|
| API Client | TypeScript SDK | `api-client/` — shared models, WS client, HTTP client |
|
||||||
|
| CI/CD | Gitea Actions | `.gitea/workflows/dev.yml`, `deploy-dev.yaml` |
|
||||||
|
| Deployment | Docker Compose | PostgreSQL + Go backend + React/nginx |
|
||||||
|
| Testing | Vitest, Go test | Unit + integration tests for WS client, gateway, handlers |
|
||||||
|
| Design | Kiosk layout | Bottom nav (mobile), nav rail (desktop), quick-jump drawer |
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
```
|
||||||
|
OpenClaw Gateway (WebSocket)
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
Go Backend (Chi + pgx)
|
||||||
|
├── Gateway WS Client (connect, reconnect, agents.list, sessions.list RPC)
|
||||||
|
├── SSE Broker (fan-out: agent.status, agent.task, fleet.update)
|
||||||
|
├── REST API (/api/agents, /api/sessions, /api/tasks, /api/projects)
|
||||||
|
└── Repository/Store layers → PostgreSQL
|
||||||
|
│
|
||||||
|
├── SSE /api/events
|
||||||
|
▼
|
||||||
|
React Frontend
|
||||||
|
├── SSEProvider → useRealtimeSync → React Query cache
|
||||||
|
├── HubPage (dashboard), LogsPage, ProjectsPage, SessionsPage, SettingsPage
|
||||||
|
└── Layout (header bar + nav rail + bottom nav + quick-jump drawer)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Key Architecture Decisions
|
||||||
|
1. **Go replaced ASP.NET Core** — lighter runtime, faster cold-start, better concurrency for gateway polling
|
||||||
|
2. **React replaced Angular** — lighter than Angular for dashboard/kiosk use
|
||||||
|
3. **SSE over SignalR** — simpler server-side, unidirectional events sufficient for browser updates
|
||||||
|
4. **WebSocket for gateway integration** — bidirectional needed for RPC (agents.list, sessions.list)
|
||||||
|
5. **PostgreSQL** — shared with Extrudex pattern; migrations in `go-backend/migrations/`
|
||||||
|
6. **Agent state seeded on first boot** via `gateway.SeedDemoAgents` for offline dev
|
||||||
|
|
||||||
|
## Project Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
Control-Center/
|
||||||
|
├── go-backend/ # Go backend
|
||||||
|
│ ├── cmd/server/main.go # Entrypoint, wire deps, start gateway poller
|
||||||
|
│ ├── Dockerfile / go.mod / go.sum
|
||||||
|
│ ├── migrations/ # 001_initial_schema, 002_add_indexes
|
||||||
|
│ └── internal/
|
||||||
|
│ ├── config/config.go # Env vars (DATABASE_URL, GATEWAY_URL, etc.)
|
||||||
|
│ ├── db/db.go # PostgreSQL pool (pgx)
|
||||||
|
│ ├── gateway/
|
||||||
|
│ │ ├── client.go # GW poller → sync DB + SSE fan-out
|
||||||
|
│ │ ├── events.go # SSE event broker
|
||||||
|
│ │ ├── events_test.go
|
||||||
|
│ │ ├── sync.go # Initial sync from gateway
|
||||||
|
│ │ ├── sync_test.go
|
||||||
|
│ │ ├── wsclient.go # WebSocket client (handshake, connect, reconnect, RPC)
|
||||||
|
│ │ └── wsclient_test.go
|
||||||
|
│ ├── handler/
|
||||||
|
│ │ ├── agent.go # CRUD + history
|
||||||
|
│ │ ├── project.go # List projects
|
||||||
|
│ │ ├── session.go # List sessions
|
||||||
|
│ │ ├── sse.go # SSE broker: subscribe + broadcast
|
||||||
|
│ │ ├── task.go # List tasks
|
||||||
|
│ │ ├── helpers.go
|
||||||
|
│ │ ├── handler_test.go
|
||||||
|
│ │ └── mock_repos_test.go
|
||||||
|
│ ├── models/models.go # Domain types
|
||||||
|
│ ├── repository/ # DB access layer + interfaces
|
||||||
|
│ ├── router/router.go # Chi router: REST + SSE mount
|
||||||
|
│ └── store/ # Agent, Project, Session, Task stores
|
||||||
|
├── api-client/ # Shared TypeScript SDK
|
||||||
|
│ └── src/
|
||||||
|
│ ├── models/types.ts # Agent, Session, Task, Project, SSE event types
|
||||||
|
│ ├── services/http-client.ts # Axios REST client
|
||||||
|
│ ├── utils/
|
||||||
|
│ │ ├── config.ts # Client config
|
||||||
|
│ │ └── status-mapper.ts # Agent status → display mapping
|
||||||
|
│ └── websocket/
|
||||||
|
│ └── ws-client.ts # WebSocket client (handshake, Send, RPC, reconnector)
|
||||||
|
├── frontend/ # React frontend
|
||||||
|
│ ├── Dockerfile + nginx.conf
|
||||||
|
│ ├── package.json + vite.config.ts
|
||||||
|
│ ├── src/
|
||||||
|
│ │ ├── App.tsx / main.tsx
|
||||||
|
│ │ ├── components/
|
||||||
|
│ │ │ ├── ErrorBoundary.tsx
|
||||||
|
│ │ │ └── Layout.tsx # Header bar + nav rail + bottom nav + quick-jump
|
||||||
|
│ │ ├── contexts/
|
||||||
|
│ │ │ └── SSEContext.tsx # SSEProvider — wraps entire app
|
||||||
|
│ │ ├── hooks/
|
||||||
|
│ │ │ ├── useLocalStorage.ts
|
||||||
|
│ │ │ ├── useRealtimeSync.ts # SSE messages → React Query cache
|
||||||
|
│ │ │ ├── useRealtimeSync.test.tsx
|
||||||
|
│ │ │ ├── useSSE.ts # SSE: connect, reconnect, typed events
|
||||||
|
│ │ │ ├── useSSE.test.ts
|
||||||
|
│ │ │ └── useTheme.tsx
|
||||||
|
│ │ ├── pages/
|
||||||
|
│ │ │ ├── HubPage.tsx # Fleet dashboard (agent grid + stats)
|
||||||
|
│ │ │ ├── LogsPage.tsx # Agent log viewer
|
||||||
|
│ │ │ ├── ProjectsPage.tsx # Project list
|
||||||
|
│ │ │ ├── SessionsPage.tsx # Session list
|
||||||
|
│ │ │ └── SettingsPage.tsx # Settings + theme toggle
|
||||||
|
│ │ ├── services/
|
||||||
|
│ │ │ ├── api.ts # Axios REST client
|
||||||
|
│ │ │ └── sse.ts # SSE utilities
|
||||||
|
│ │ └── types/index.ts
|
||||||
|
│ └── vitest.config.ts
|
||||||
|
├── frontend-legacy/ # Original Angular frontend (kept for reference, not in git)
|
||||||
|
├── backend/ # Original ASP.NET backend (kept for reference, not in git)
|
||||||
|
│ ├── ControlCenter/ # ASP.NET Core project
|
||||||
|
│ └── Api/ # API layer
|
||||||
|
├── design/
|
||||||
|
│ ├── command-hub-spec.md # Detailed design spec
|
||||||
|
│ └── mockups/ # Desktop kiosk, mobile, quick-jump drawer
|
||||||
|
├── kiosk/
|
||||||
|
│ ├── control-center-kiosk.service # Systemd service
|
||||||
|
│ └── start-kiosk.sh # Kiosk startup script
|
||||||
|
├── reference/
|
||||||
|
│ └── CONTROL_CENTER_CONTEXT.md # Older context file (superseded by this one)
|
||||||
|
├── ci-image/Dockerfile # CI build image
|
||||||
|
├── docker-compose.yml
|
||||||
|
└── .env.example
|
||||||
|
```
|
||||||
|
|
||||||
|
## Database Schema (PostgreSQL)
|
||||||
|
|
||||||
|
### agents
|
||||||
|
| Column | Type | Notes |
|
||||||
|
|--------|------|-------|
|
||||||
|
| id | UUID PK | |
|
||||||
|
| display_name | VARCHAR(256) NOT NULL | |
|
||||||
|
| role | VARCHAR(256) | |
|
||||||
|
| status | VARCHAR(32) | active, idle, thinking, error |
|
||||||
|
| current_task | VARCHAR(512) | |
|
||||||
|
| task_progress | INTEGER DEFAULT 0 | |
|
||||||
|
| session_key | VARCHAR(256) | |
|
||||||
|
| channel | VARCHAR(256) | |
|
||||||
|
| last_activity | TIMESTAMP | |
|
||||||
|
| error_message | TEXT | |
|
||||||
|
| created_at | TIMESTAMP | |
|
||||||
|
| updated_at | TIMESTAMP | |
|
||||||
|
|
||||||
|
### sessions
|
||||||
|
| Column | Type |
|
||||||
|
|--------|------|
|
||||||
|
| id | UUID PK |
|
||||||
|
| session_key | VARCHAR(256) UNIQUE |
|
||||||
|
| agent_id | UUID FK → agents |
|
||||||
|
| channel | VARCHAR(256) |
|
||||||
|
| status | VARCHAR(32) |
|
||||||
|
| context_tokens | INTEGER |
|
||||||
|
| total_tokens | INTEGER |
|
||||||
|
| estimated_cost | NUMERIC |
|
||||||
|
| model | VARCHAR(256) |
|
||||||
|
| started_at | TIMESTAMP |
|
||||||
|
| last_activity_at | TIMESTAMP |
|
||||||
|
|
||||||
|
### tasks
|
||||||
|
| Column | Type |
|
||||||
|
|--------|------|
|
||||||
|
| id | UUID PK |
|
||||||
|
| agent_id | UUID FK → agents |
|
||||||
|
| title | VARCHAR(512) |
|
||||||
|
| description | TEXT |
|
||||||
|
| status | VARCHAR(32) |
|
||||||
|
| progress | INTEGER DEFAULT 0 |
|
||||||
|
| session_key | VARCHAR(256) |
|
||||||
|
| created_at, updated_at | TIMESTAMP |
|
||||||
|
|
||||||
|
### projects
|
||||||
|
| Column | Type |
|
||||||
|
|--------|------|
|
||||||
|
| id | UUID PK |
|
||||||
|
| name | VARCHAR(256) UNIQUE |
|
||||||
|
| description | TEXT |
|
||||||
|
| status | VARCHAR(32) |
|
||||||
|
| agent_ids | TEXT[] |
|
||||||
|
| created_at, updated_at | TIMESTAMP |
|
||||||
|
|
||||||
|
## API Endpoints
|
||||||
|
|
||||||
|
| Method | Endpoint | Description |
|
||||||
|
|--------|----------|-------------|
|
||||||
|
| GET | /health | Health check (probes DB) |
|
||||||
|
| GET | /api/agents | List all agents |
|
||||||
|
| POST | /api/agents | Create agent |
|
||||||
|
| GET | /api/agents/:id | Get agent detail |
|
||||||
|
| PUT | /api/agents/:id | Update agent |
|
||||||
|
| DELETE | /api/agents/:id | Delete agent |
|
||||||
|
| GET | /api/agents/:id/history | Agent status history |
|
||||||
|
| GET | /api/sessions | List sessions |
|
||||||
|
| GET | /api/tasks | List tasks |
|
||||||
|
| GET | /api/projects | List projects |
|
||||||
|
| GET | /api/events | SSE event stream |
|
||||||
|
|
||||||
|
## SSE Events
|
||||||
|
|
||||||
|
| Event Type | Payload |
|
||||||
|
|------------|---------|
|
||||||
|
| `agent.status` | Agent status change |
|
||||||
|
| `agent.task` | Agent current task updated |
|
||||||
|
| `agent.progress` | Task progress percentage |
|
||||||
|
| `fleet.update` | Full fleet snapshot |
|
||||||
|
| `connected` | Connection established |
|
||||||
|
|
||||||
|
## CI/CD Pipeline
|
||||||
|
|
||||||
|
### dev.yml
|
||||||
|
- Lint + typecheck: Go vet + golangci-lint + tsc
|
||||||
|
- Test: Go test + vitest
|
||||||
|
- Build: Go build + npm build
|
||||||
|
- Docker: Build and push images
|
||||||
|
- Triggers: push to dev/main
|
||||||
|
|
||||||
|
### deploy-dev.yaml
|
||||||
|
- Workflow dispatch
|
||||||
|
- SCP deploy script to dev host
|
||||||
|
- systemctl restart with rollback
|
||||||
|
|
||||||
|
## Docker Compose
|
||||||
|
|
||||||
|
| Service | Image/Build | Ports | Depends On |
|
||||||
|
|---------|-------------|-------|------------|
|
||||||
|
| postgres | postgres:16-alpine | 5432 | — |
|
||||||
|
| backend | ./go-backend/Dockerfile | 8080 | postgres (healthy) |
|
||||||
|
| frontend | ./frontend/Dockerfile | 3000 | backend (healthy) |
|
||||||
|
|
||||||
|
## Linear Issue Map
|
||||||
|
|
||||||
|
| CUB | Title | Status |
|
||||||
|
|-----|-------|--------|
|
||||||
|
| 119 | **Epic: Control Center Refactor — .NET → Go + React** | Todo |
|
||||||
|
| 120 | PostgreSQL schema + migrations | Done |
|
||||||
|
| 121 | React pages wired to real API | Done |
|
||||||
|
| 122 | React frontend scaffold | Done |
|
||||||
|
| 123 | Gateway integration + SSE streaming | Done |
|
||||||
|
| 124 | Go backend scaffold | Done |
|
||||||
|
| 125 | Real-time SSE frontend | Done |
|
||||||
|
| 126 | Docker Compose deployment | Done |
|
||||||
|
| 127 | CRUD API endpoints | Done |
|
||||||
|
| 200 | Live WebSocket gateway client (CUB-200-207 sub-epic) | In Review |
|
||||||
|
| 201 | agents.list + sessions.list RPC and data mapping | In Review |
|
||||||
|
| 202 | Real-time event subscription + SSE fan-out | In Review |
|
||||||
|
| 203 | WS client scaffold — handshake, connect, reconnect loop | In Review |
|
||||||
|
| 204 | Config, wiring, and graceful fallback | In Review |
|
||||||
|
| 205 | Unit tests — gateway utility functions | In Review |
|
||||||
|
| 206 | Unit tests — WSClient handshake, Send/RPC, frame router, reconnect | In Review |
|
||||||
|
| 207 | Unit tests — event handlers and initial sync | In Review |
|
||||||
|
|
||||||
|
### Legacy Issues (Angular/ASP.NET — all Done)
|
||||||
|
CUB-19 through CUB-63: All 27 Control Center issues completed, including minion mapping, breakroom UI, dark mode theme, agent cards, quick-jump drawer, adaptive nav, SignalR hub, and status animations.
|
||||||
|
|
||||||
|
## Known Limitations / Next Steps
|
||||||
|
|
||||||
|
1. Agent detail/history views are scaffolded but not fully implemented
|
||||||
|
2. 16-bit minion breakroom concept (CUB-59-63) was on Angular — needs React port if desired
|
||||||
|
3. `.env` must be created from `.env.example` with a valid `GATEWAY_URL` for live agent data
|
||||||
|
4. Docker containers not currently running — start with `docker compose up --build -d`
|
||||||
|
|
||||||
|
## Default Agent Assignments
|
||||||
|
|
||||||
|
| Area | Agent | Notes |
|
||||||
|
|------|-------|-------|
|
||||||
|
| Backend (Go API, Gateway WS, SSE) | Dex | gitea-dex MCP |
|
||||||
|
| Database (PostgreSQL schema) | Hex | gitea-hex MCP |
|
||||||
|
| Frontend (React, Tailwind) | Rex | gitea-rex MCP |
|
||||||
|
| Design (wireframes, UX) | Sketch | |
|
||||||
|
|
||||||
|
## Getting Started
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /mnt/ai-storage/projects/Control-Center
|
||||||
|
git checkout dev
|
||||||
|
git pull origin dev
|
||||||
|
|
||||||
|
# Docker Compose (recommended)
|
||||||
|
cp .env.example .env # edit GATEWAY_URL first
|
||||||
|
docker compose up --build -d
|
||||||
|
|
||||||
|
# Manual
|
||||||
|
cd go-backend && go run cmd/server/main.go # → :8080
|
||||||
|
cd frontend && npm install && npm run dev # → :5173 (Vite proxy to :8080)
|
||||||
|
```
|
||||||
Generated
+1128
-1
File diff suppressed because it is too large
Load Diff
@@ -7,7 +7,9 @@
|
|||||||
"dev": "vite",
|
"dev": "vite",
|
||||||
"build": "tsc -b && vite build",
|
"build": "tsc -b && vite build",
|
||||||
"lint": "eslint .",
|
"lint": "eslint .",
|
||||||
"preview": "vite preview"
|
"preview": "vite preview",
|
||||||
|
"test": "vitest run",
|
||||||
|
"test:watch": "vitest"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@tanstack/react-query": "^5.100.9",
|
"@tanstack/react-query": "^5.100.9",
|
||||||
@@ -20,6 +22,8 @@
|
|||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@eslint/js": "^10.0.1",
|
"@eslint/js": "^10.0.1",
|
||||||
"@tailwindcss/vite": "^4.2.4",
|
"@tailwindcss/vite": "^4.2.4",
|
||||||
|
"@testing-library/jest-dom": "^6.9.1",
|
||||||
|
"@testing-library/react": "^16.3.2",
|
||||||
"@types/node": "^24.12.2",
|
"@types/node": "^24.12.2",
|
||||||
"@types/react": "^19.2.14",
|
"@types/react": "^19.2.14",
|
||||||
"@types/react-dom": "^19.2.3",
|
"@types/react-dom": "^19.2.3",
|
||||||
@@ -29,10 +33,12 @@
|
|||||||
"eslint-plugin-react-hooks": "^7.1.1",
|
"eslint-plugin-react-hooks": "^7.1.1",
|
||||||
"eslint-plugin-react-refresh": "^0.5.2",
|
"eslint-plugin-react-refresh": "^0.5.2",
|
||||||
"globals": "^17.5.0",
|
"globals": "^17.5.0",
|
||||||
|
"jsdom": "^29.1.1",
|
||||||
"postcss": "^8.5.14",
|
"postcss": "^8.5.14",
|
||||||
"tailwindcss": "^4.2.4",
|
"tailwindcss": "^4.2.4",
|
||||||
"typescript": "~6.0.2",
|
"typescript": "~6.0.2",
|
||||||
"typescript-eslint": "^8.58.2",
|
"typescript-eslint": "^8.58.2",
|
||||||
"vite": "^8.0.10"
|
"vite": "^8.0.10",
|
||||||
|
"vitest": "^4.1.7"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import { useState } from 'react'
|
import { useState } from 'react'
|
||||||
import { NavLink } from 'react-router-dom'
|
import { NavLink } from 'react-router-dom'
|
||||||
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X } from 'lucide-react'
|
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X, Wifi, WifiOff, Loader } from 'lucide-react'
|
||||||
|
import { useSSEContext } from '../contexts/SSEContext'
|
||||||
|
import type { SSEStatus } from '../hooks/useSSE'
|
||||||
|
|
||||||
const navItems = [
|
const navItems = [
|
||||||
{ to: '/', icon: Command, label: 'Hub' },
|
{ to: '/', icon: Command, label: 'Hub' },
|
||||||
@@ -10,9 +12,29 @@ const navItems = [
|
|||||||
{ to: '/settings', icon: Settings, label: 'Settings' },
|
{ to: '/settings', icon: Settings, label: 'Settings' },
|
||||||
]
|
]
|
||||||
|
|
||||||
|
/** Small status pill shown in the sidebar footer and mobile header. */
|
||||||
|
function SSEStatusBadge({ status, showLabel = false }: { status: SSEStatus; showLabel?: boolean }) {
|
||||||
|
const cfg = {
|
||||||
|
connected: { icon: Wifi, color: 'text-green-500', label: 'Live' },
|
||||||
|
connecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Connecting' },
|
||||||
|
reconnecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Reconnecting' },
|
||||||
|
error: { icon: WifiOff, color: 'text-red-500', label: 'Disconnected' },
|
||||||
|
}[status]
|
||||||
|
|
||||||
|
const Icon = cfg.icon
|
||||||
|
|
||||||
|
return (
|
||||||
|
<div className="flex items-center gap-1.5" title={cfg.label}>
|
||||||
|
<Icon size={14} className={cfg.color} />
|
||||||
|
{showLabel && <span className={`text-xs ${cfg.color}`}>{cfg.label}</span>}
|
||||||
|
</div>
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
export default function Layout({ children }: { children: React.ReactNode }) {
|
export default function Layout({ children }: { children: React.ReactNode }) {
|
||||||
const [expanded, setExpanded] = useState(false)
|
const [expanded, setExpanded] = useState(false)
|
||||||
const [mobileOpen, setMobileOpen] = useState(false)
|
const [mobileOpen, setMobileOpen] = useState(false)
|
||||||
|
const { sseStatus } = useSSEContext()
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
|
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
|
||||||
@@ -46,6 +68,15 @@ export default function Layout({ children }: { children: React.ReactNode }) {
|
|||||||
</NavLink>
|
</NavLink>
|
||||||
))}
|
))}
|
||||||
</nav>
|
</nav>
|
||||||
|
{/* SSE connection status — footer of sidebar */}
|
||||||
|
<div className="px-4 py-3 border-t border-surface-light flex items-center gap-2">
|
||||||
|
<SSEStatusBadge status={sseStatus} />
|
||||||
|
{expanded && (
|
||||||
|
<span className="text-xs text-on-surface-muted whitespace-nowrap">
|
||||||
|
{sseStatus === 'connected' ? 'Live updates on' : sseStatus}
|
||||||
|
</span>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
</aside>
|
</aside>
|
||||||
|
|
||||||
{/* Mobile Header + Bottom Nav */}
|
{/* Mobile Header + Bottom Nav */}
|
||||||
@@ -54,6 +85,7 @@ export default function Layout({ children }: { children: React.ReactNode }) {
|
|||||||
<div className="flex items-center gap-2">
|
<div className="flex items-center gap-2">
|
||||||
<Command size={22} className="text-primary" />
|
<Command size={22} className="text-primary" />
|
||||||
<span className="font-bold">Control Center</span>
|
<span className="font-bold">Control Center</span>
|
||||||
|
<SSEStatusBadge status={sseStatus} />
|
||||||
</div>
|
</div>
|
||||||
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
|
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
|
||||||
{mobileOpen ? <X size={22} /> : <Menu size={22} />}
|
{mobileOpen ? <X size={22} /> : <Menu size={22} />}
|
||||||
|
|||||||
@@ -0,0 +1,23 @@
|
|||||||
|
/**
|
||||||
|
* SSEContext — provides SSE connection status throughout the component tree.
|
||||||
|
* Mount <SSEProvider> once inside QueryClientProvider.
|
||||||
|
*/
|
||||||
|
import { createContext, useContext, type ReactNode } from 'react'
|
||||||
|
import { useRealtimeSync } from '../hooks/useRealtimeSync'
|
||||||
|
import type { SSEStatus } from '../hooks/useSSE'
|
||||||
|
|
||||||
|
interface SSEContextValue {
|
||||||
|
sseStatus: SSEStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
const SSEContext = createContext<SSEContextValue>({ sseStatus: 'connecting' })
|
||||||
|
|
||||||
|
export function SSEProvider({ children }: { children: ReactNode }) {
|
||||||
|
const { sseStatus } = useRealtimeSync()
|
||||||
|
return <SSEContext.Provider value={{ sseStatus }}>{children}</SSEContext.Provider>
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Access the SSE connection status from any component. */
|
||||||
|
export function useSSEContext(): SSEContextValue {
|
||||||
|
return useContext(SSEContext)
|
||||||
|
}
|
||||||
@@ -0,0 +1,129 @@
|
|||||||
|
/**
|
||||||
|
* Tests for useRealtimeSync — event → query invalidation mapping.
|
||||||
|
*
|
||||||
|
* Uses .tsx extension so Vite/OXC can parse JSX in the wrapper component.
|
||||||
|
*/
|
||||||
|
import { renderHook } from '@testing-library/react'
|
||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||||
|
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
|
||||||
|
import * as useSSEModule from './useSSE'
|
||||||
|
import { useRealtimeSync } from './useRealtimeSync'
|
||||||
|
import React from 'react'
|
||||||
|
import type { SSEMessage } from '../services/sse'
|
||||||
|
|
||||||
|
describe('useRealtimeSync', () => {
|
||||||
|
let queryClient: QueryClient
|
||||||
|
let mockSSEOnMessage: ((msg: { type: string; data: unknown }) => void) | null = null
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
queryClient = new QueryClient({
|
||||||
|
defaultOptions: { queries: { retry: false } },
|
||||||
|
})
|
||||||
|
mockSSEOnMessage = null
|
||||||
|
|
||||||
|
// Spy on useSSE to capture the onMessage callback
|
||||||
|
vi.spyOn(useSSEModule, 'useSSE').mockImplementation((opts) => {
|
||||||
|
mockSSEOnMessage = opts?.onMessage ?? null
|
||||||
|
return { status: 'connected' }
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.restoreAllMocks()
|
||||||
|
})
|
||||||
|
|
||||||
|
function render() {
|
||||||
|
return renderHook(() => useRealtimeSync(), {
|
||||||
|
wrapper: ({ children }: { children: React.ReactNode }) => (
|
||||||
|
React.createElement(QueryClientProvider, { client: queryClient }, children)
|
||||||
|
),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
it('invalidates ["agents"] on agent.status event', async () => {
|
||||||
|
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||||
|
render()
|
||||||
|
|
||||||
|
const msg: SSEMessage = {
|
||||||
|
type: 'agent.status',
|
||||||
|
data: { agentId: 'a1', status: 'active' },
|
||||||
|
}
|
||||||
|
mockSSEOnMessage!(msg)
|
||||||
|
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledTimes(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('invalidates ["tasks"] and ["agents"] on agent.task event', async () => {
|
||||||
|
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||||
|
render()
|
||||||
|
|
||||||
|
const msg: SSEMessage = {
|
||||||
|
type: 'agent.task',
|
||||||
|
data: { agentId: 'a1', taskId: 't1', title: 'Test', action: 'assigned' },
|
||||||
|
}
|
||||||
|
mockSSEOnMessage!(msg)
|
||||||
|
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledTimes(2)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('invalidates ["tasks"] and ["agents"] on agent.progress event', async () => {
|
||||||
|
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||||
|
render()
|
||||||
|
|
||||||
|
const msg: SSEMessage = {
|
||||||
|
type: 'agent.progress',
|
||||||
|
data: { agentId: 'a1', taskId: 't1', progress: 50, message: 'working' },
|
||||||
|
}
|
||||||
|
mockSSEOnMessage!(msg)
|
||||||
|
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledTimes(2)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('invalidates ["agents"], ["sessions"], ["tasks"] on fleet.update event', async () => {
|
||||||
|
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||||
|
render()
|
||||||
|
|
||||||
|
const msg: SSEMessage = {
|
||||||
|
type: 'fleet.update',
|
||||||
|
data: { timestamp: '2026-05-20T12:00:00Z', agentCount: 5 },
|
||||||
|
}
|
||||||
|
mockSSEOnMessage!(msg)
|
||||||
|
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['sessions'] })
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
||||||
|
expect(invalidateSpy).toHaveBeenCalledTimes(3)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('does nothing on connected event', async () => {
|
||||||
|
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||||
|
render()
|
||||||
|
|
||||||
|
const msg: SSEMessage = {
|
||||||
|
type: 'connected',
|
||||||
|
data: { clientCount: 1 },
|
||||||
|
}
|
||||||
|
mockSSEOnMessage!(msg)
|
||||||
|
|
||||||
|
expect(invalidateSpy).not.toHaveBeenCalled()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('does nothing on unknown event types', async () => {
|
||||||
|
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||||
|
render()
|
||||||
|
|
||||||
|
mockSSEOnMessage!({ type: 'unknown.event', data: {} })
|
||||||
|
|
||||||
|
expect(invalidateSpy).not.toHaveBeenCalled()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('returns sseStatus from useSSE', () => {
|
||||||
|
const { result } = render()
|
||||||
|
expect(result.current.sseStatus).toBe('connected')
|
||||||
|
})
|
||||||
|
})
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
/**
|
||||||
|
* useRealtimeSync — mounts the SSE connection once at the app level and
|
||||||
|
* wires incoming events to React Query cache invalidation.
|
||||||
|
*
|
||||||
|
* Event → query key mapping:
|
||||||
|
* agent.status → ['agents']
|
||||||
|
* agent.task → ['tasks'], ['agents']
|
||||||
|
* agent.progress → ['tasks'], ['agents']
|
||||||
|
* fleet.update → ['agents'], ['sessions'], ['tasks']
|
||||||
|
*/
|
||||||
|
import { useQueryClient } from '@tanstack/react-query'
|
||||||
|
import { useCallback } from 'react'
|
||||||
|
import { useSSE, type SSEStatus } from './useSSE'
|
||||||
|
import type { SSEMessage } from '../services/sse'
|
||||||
|
|
||||||
|
export function useRealtimeSync(): { sseStatus: SSEStatus } {
|
||||||
|
const queryClient = useQueryClient()
|
||||||
|
|
||||||
|
const handleMessage = useCallback(
|
||||||
|
(raw: { type: string; data: unknown }) => {
|
||||||
|
// Cast to discriminated union — the backend contract guarantees these shapes
|
||||||
|
const msg = raw as SSEMessage
|
||||||
|
|
||||||
|
switch (msg.type) {
|
||||||
|
case 'agent.status':
|
||||||
|
// msg.data: AgentStatusEvent { agentId, status, reason? }
|
||||||
|
void msg.data.agentId // retained for type-narrowing — ensures payload matches contract
|
||||||
|
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'agent.task':
|
||||||
|
// msg.data: AgentTaskEvent { agentId, taskId, title, action }
|
||||||
|
void msg.data.agentId
|
||||||
|
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||||
|
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'agent.progress':
|
||||||
|
// msg.data: AgentProgressEvent { agentId, taskId, progress, message? }
|
||||||
|
void msg.data.agentId
|
||||||
|
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||||
|
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||||
|
break
|
||||||
|
|
||||||
|
case 'fleet.update':
|
||||||
|
// msg.data: FleetUpdateEvent { timestamp, agentCount }
|
||||||
|
void msg.data.agentCount
|
||||||
|
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||||
|
queryClient.invalidateQueries({ queryKey: ['sessions'] })
|
||||||
|
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||||
|
break
|
||||||
|
|
||||||
|
default:
|
||||||
|
// 'connected' and unknown events — no action needed
|
||||||
|
break
|
||||||
|
}
|
||||||
|
},
|
||||||
|
[queryClient],
|
||||||
|
)
|
||||||
|
|
||||||
|
const { status: sseStatus } = useSSE({ onMessage: handleMessage })
|
||||||
|
|
||||||
|
return { sseStatus }
|
||||||
|
}
|
||||||
@@ -0,0 +1,267 @@
|
|||||||
|
/**
|
||||||
|
* Tests for useSSE — SSE connection lifecycle, back-off, event parsing, and cleanup.
|
||||||
|
*
|
||||||
|
* jsdom does not include EventSource, so we mock it completely.
|
||||||
|
*/
|
||||||
|
import { renderHook, act, waitFor } from '@testing-library/react'
|
||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||||
|
import { useSSE } from './useSSE'
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Mock EventSource — defined as a plain class so `new EventSource()` works
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
class MockEventSource {
|
||||||
|
url: string
|
||||||
|
onopen: (() => void) | null = null
|
||||||
|
onerror: ((evt: Event) => void) | null = null
|
||||||
|
onmessage: ((evt: MessageEvent) => void) | null = null
|
||||||
|
private listeners: Map<string, Array<(evt: Event) => void>> = new Map()
|
||||||
|
readyState: number = 0
|
||||||
|
|
||||||
|
constructor(url: string) {
|
||||||
|
this.url = url
|
||||||
|
}
|
||||||
|
|
||||||
|
addEventListener(type: string, handler: (evt: Event) => void) {
|
||||||
|
if (!this.listeners.has(type)) this.listeners.set(type, [])
|
||||||
|
this.listeners.get(type)!.push(handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
removeEventListener() { /* no-op for tests */ }
|
||||||
|
|
||||||
|
close() {
|
||||||
|
this.readyState = 2
|
||||||
|
this.onopen = null
|
||||||
|
this.onerror = null
|
||||||
|
this.onmessage = null
|
||||||
|
this.listeners.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test helpers
|
||||||
|
_simulateOpen() { this.onopen?.() }
|
||||||
|
_simulateError() { this.onerror?.(new Event('error')) }
|
||||||
|
_simulateNamedEvent(type: string, data: string) {
|
||||||
|
const handlers = this.listeners.get(type)
|
||||||
|
if (handlers) {
|
||||||
|
const evt = new MessageEvent(type, { data }) as Event
|
||||||
|
handlers.forEach((h) => h(evt))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_simulateMessage(data: string) {
|
||||||
|
this.onmessage?.(new MessageEvent('message', { data }) as MessageEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
static readonly CONNECTING = 0
|
||||||
|
static readonly OPEN = 1
|
||||||
|
static readonly CLOSED = 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Tests
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
let esInstances: MockEventSource[]
|
||||||
|
|
||||||
|
describe('useSSE', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
esInstances = []
|
||||||
|
// Replace global EventSource with our mock class
|
||||||
|
Object.defineProperty(globalThis, 'EventSource', {
|
||||||
|
// The mock must use a class for `new EventSource()` to work
|
||||||
|
value: class extends MockEventSource {
|
||||||
|
constructor(url: string) {
|
||||||
|
super(url)
|
||||||
|
esInstances.push(this)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
writable: true,
|
||||||
|
configurable: true,
|
||||||
|
})
|
||||||
|
vi.useFakeTimers({ shouldAdvanceTime: true })
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.restoreAllMocks()
|
||||||
|
vi.useRealTimers()
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── Initial connection ──────────────────────────────────────────────────
|
||||||
|
it('starts in "connecting" state and creates an EventSource', () => {
|
||||||
|
const { result } = renderHook(() => useSSE({ url: '/api/events' }))
|
||||||
|
|
||||||
|
expect(result.current.status).toBe('connecting')
|
||||||
|
expect(esInstances.length).toBeGreaterThanOrEqual(1)
|
||||||
|
expect(esInstances[0].url).toBe('/api/events')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('transitions to "connected" on open', async () => {
|
||||||
|
const onOpen = vi.fn()
|
||||||
|
const { result } = renderHook(() => useSSE({ url: '/api/events', onOpen }))
|
||||||
|
|
||||||
|
act(() => { esInstances[0]._simulateOpen() })
|
||||||
|
|
||||||
|
await waitFor(() => {
|
||||||
|
expect(result.current.status).toBe('connected')
|
||||||
|
})
|
||||||
|
expect(onOpen).toHaveBeenCalledTimes(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── Reconnection with exponential back-off ──────────────────────────────
|
||||||
|
it('retries after error with exponential back-off', async () => {
|
||||||
|
const { result } = renderHook(() =>
|
||||||
|
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 30000 }),
|
||||||
|
)
|
||||||
|
|
||||||
|
// First error → reconnecting, retry at 1s
|
||||||
|
act(() => { esInstances[0]._simulateError() })
|
||||||
|
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||||
|
expect(esInstances).toHaveLength(1)
|
||||||
|
|
||||||
|
// Advance 1000ms → second EventSource created
|
||||||
|
act(() => { vi.advanceTimersByTime(1000) })
|
||||||
|
expect(esInstances).toHaveLength(2)
|
||||||
|
|
||||||
|
// Second error → reconnecting, retry at 2s
|
||||||
|
act(() => { esInstances[1]._simulateError() })
|
||||||
|
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||||
|
act(() => { vi.advanceTimersByTime(2000) })
|
||||||
|
expect(esInstances).toHaveLength(3)
|
||||||
|
|
||||||
|
// Third error → reconnecting, retry at 4s
|
||||||
|
act(() => { esInstances[2]._simulateError() })
|
||||||
|
act(() => { vi.advanceTimersByTime(4000) })
|
||||||
|
expect(esInstances).toHaveLength(4)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('caps reconnect delay at reconnectMaxMs', async () => {
|
||||||
|
renderHook(() =>
|
||||||
|
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 10000 }),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Force 5 errors to push the exponent past the cap
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
act(() => { esInstances[i]._simulateError() })
|
||||||
|
const expectedDelay = Math.min(1000 * 2 ** i, 10000)
|
||||||
|
act(() => { vi.advanceTimersByTime(expectedDelay) })
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6 ES instances created (initial + 5 retries)
|
||||||
|
expect(esInstances).toHaveLength(6)
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── Circuit-breaker (max retries) ───────────────────────────────────────
|
||||||
|
it('transitions to "error" after reconnectLimit is exceeded', async () => {
|
||||||
|
const { result } = renderHook(() =>
|
||||||
|
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 2 }),
|
||||||
|
)
|
||||||
|
|
||||||
|
// First error → reconnecting
|
||||||
|
act(() => { esInstances[0]._simulateError() })
|
||||||
|
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||||
|
|
||||||
|
// Advance → retry
|
||||||
|
act(() => { vi.advanceTimersByTime(100) })
|
||||||
|
|
||||||
|
// Second error → reconnecting (attempt 2, still ≤ limit)
|
||||||
|
act(() => { esInstances[1]._simulateError() })
|
||||||
|
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||||
|
act(() => { vi.advanceTimersByTime(200) })
|
||||||
|
|
||||||
|
// Third error → limit exceeded (3 > 2) → error
|
||||||
|
act(() => { esInstances[2]._simulateError() })
|
||||||
|
await waitFor(() => { expect(result.current.status).toBe('error') })
|
||||||
|
})
|
||||||
|
|
||||||
|
it('resets reconnect counter on successful connection', async () => {
|
||||||
|
const { result } = renderHook(() =>
|
||||||
|
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 3 }),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Two errors then a successful connect
|
||||||
|
act(() => { esInstances[0]._simulateError() })
|
||||||
|
act(() => { vi.advanceTimersByTime(100) })
|
||||||
|
|
||||||
|
act(() => { esInstances[1]._simulateOpen() })
|
||||||
|
await waitFor(() => { expect(result.current.status).toBe('connected') })
|
||||||
|
|
||||||
|
// Now error again — counter should be reset, so we get fresh attempts
|
||||||
|
act(() => { esInstances[1]._simulateError() })
|
||||||
|
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||||
|
expect(result.current.status).toBe('reconnecting')
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── Cleanup on unmount ───────────────────────────────────────────────────
|
||||||
|
it('closes EventSource on unmount', () => {
|
||||||
|
const closeSpy = vi.spyOn(MockEventSource.prototype, 'close')
|
||||||
|
const { unmount } = renderHook(() => useSSE({ url: '/api/events' }))
|
||||||
|
|
||||||
|
unmount()
|
||||||
|
expect(closeSpy).toHaveBeenCalled()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('does not update state after unmount', async () => {
|
||||||
|
const { result, unmount } = renderHook(() => useSSE({ url: '/api/events' }))
|
||||||
|
|
||||||
|
unmount()
|
||||||
|
|
||||||
|
// These should be no-ops after unmount (mountedRef guards)
|
||||||
|
act(() => { esInstances[0]._simulateOpen() })
|
||||||
|
act(() => { esInstances[0]._simulateError() })
|
||||||
|
|
||||||
|
// State should not have changed
|
||||||
|
expect(result.current.status).toBe('connecting')
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── Event parsing ───────────────────────────────────────────────────────
|
||||||
|
it('parses valid JSON data into objects', async () => {
|
||||||
|
const onMessage = vi.fn()
|
||||||
|
renderHook(() => useSSE({ url: '/api/events', onMessage }))
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
esInstances[0]._simulateNamedEvent('agent.status', JSON.stringify({ agentId: 'a1', status: 'active' }))
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(onMessage).toHaveBeenCalledWith({
|
||||||
|
type: 'agent.status',
|
||||||
|
data: { agentId: 'a1', status: 'active' },
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('passes invalid JSON through as raw string', async () => {
|
||||||
|
const onMessage = vi.fn()
|
||||||
|
renderHook(() => useSSE({ url: '/api/events', onMessage }))
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
esInstances[0]._simulateNamedEvent('agent.status', 'not valid json {{{')
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(onMessage).toHaveBeenCalledWith({
|
||||||
|
type: 'agent.status',
|
||||||
|
data: 'not valid json {{{',
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── enabled=false skips connection ──────────────────────────────────────
|
||||||
|
it('does not create EventSource when enabled=false', () => {
|
||||||
|
const { result } = renderHook(() => useSSE({ url: '/api/events', enabled: false }))
|
||||||
|
|
||||||
|
expect(esInstances).toHaveLength(0)
|
||||||
|
expect(result.current.status).toBe('connecting')
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── onError callback ────────────────────────────────────────────────────
|
||||||
|
it('calls onError on connection failure', async () => {
|
||||||
|
const onError = vi.fn()
|
||||||
|
renderHook(() =>
|
||||||
|
useSSE({ url: '/api/events', onError, reconnectBaseMs: 100 }),
|
||||||
|
)
|
||||||
|
|
||||||
|
act(() => { esInstances[0]._simulateError() })
|
||||||
|
expect(onError).toHaveBeenCalledTimes(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── Default URL ─────────────────────────────────────────────────────────
|
||||||
|
it('uses /api/events as default URL', () => {
|
||||||
|
renderHook(() => useSSE())
|
||||||
|
expect(esInstances[0].url).toBe('/api/events')
|
||||||
|
})
|
||||||
|
})
|
||||||
@@ -0,0 +1,180 @@
|
|||||||
|
import { useEffect, useRef, useCallback, useState } from 'react'
|
||||||
|
|
||||||
|
/** SSE connection state reported to consumers. */
|
||||||
|
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
|
||||||
|
|
||||||
|
/** Typed SSE event received from the backend. */
|
||||||
|
export interface SSEMessage {
|
||||||
|
/** event: field from the SSE frame */
|
||||||
|
type: string
|
||||||
|
/** parsed JSON from the data: field */
|
||||||
|
data: unknown
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface UseSSEOptions {
|
||||||
|
/** Endpoint URL — defaults to /api/events */
|
||||||
|
url?: string
|
||||||
|
/** Called for every SSE message (all event types) */
|
||||||
|
onMessage?: (msg: SSEMessage) => void
|
||||||
|
/** Called when connection opens or reconnects */
|
||||||
|
onOpen?: () => void
|
||||||
|
/** Called on every connection error (both transient and terminal) */
|
||||||
|
onError?: (err: Event) => void
|
||||||
|
/** Base delay in ms before the first reconnect attempt (default 1 000) */
|
||||||
|
reconnectBaseMs?: number
|
||||||
|
/** Maximum reconnect delay in ms (default 30 000) */
|
||||||
|
reconnectMaxMs?: number
|
||||||
|
/**
|
||||||
|
* Maximum number of consecutive reconnect attempts before giving up.
|
||||||
|
* When the limit is reached, status transitions to 'error'.
|
||||||
|
* Default undefined (unlimited).
|
||||||
|
*/
|
||||||
|
reconnectLimit?: number
|
||||||
|
/** Set false to disable auto-connect (useful in tests) */
|
||||||
|
enabled?: boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
|
||||||
|
|
||||||
|
/**
|
||||||
|
* useSSE — mounts a persistent SSE connection to the Control Center backend.
|
||||||
|
*
|
||||||
|
* Handles:
|
||||||
|
* - Initial connection on mount
|
||||||
|
* - Exponential back-off reconnection on drop (1s → 2s → 4s … capped at reconnectMaxMs)
|
||||||
|
* - Circuit-breaker: after reconnectLimit consecutive failures, transitions to 'error'
|
||||||
|
* - Cleanup on unmount
|
||||||
|
* - All five event types: agent.status, agent.task, agent.progress, fleet.update, connected
|
||||||
|
*
|
||||||
|
* The 'connected' SSE event is an application-level handshake sent by the backend
|
||||||
|
* after the transport opens. This is distinct from onOpen, which fires at the
|
||||||
|
* transport level when the EventSource HTTP connection is established.
|
||||||
|
*/
|
||||||
|
export function useSSE({
|
||||||
|
url = '/api/events',
|
||||||
|
onMessage,
|
||||||
|
onOpen,
|
||||||
|
onError,
|
||||||
|
reconnectBaseMs = 1_000,
|
||||||
|
reconnectMaxMs = 30_000,
|
||||||
|
reconnectLimit,
|
||||||
|
enabled = true,
|
||||||
|
}: UseSSEOptions = {}): { status: SSEStatus } {
|
||||||
|
const [status, setStatus] = useState<SSEStatus>('connecting')
|
||||||
|
|
||||||
|
// Stable refs so the effect doesn't need to re-run when callbacks change
|
||||||
|
const onMessageRef = useRef(onMessage)
|
||||||
|
const onOpenRef = useRef(onOpen)
|
||||||
|
const onErrorRef = useRef(onError)
|
||||||
|
onMessageRef.current = onMessage
|
||||||
|
onOpenRef.current = onOpen
|
||||||
|
onErrorRef.current = onError
|
||||||
|
|
||||||
|
const reconnectAttemptRef = useRef(0)
|
||||||
|
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
|
||||||
|
const esRef = useRef<EventSource | null>(null)
|
||||||
|
const mountedRef = useRef(true)
|
||||||
|
|
||||||
|
const clearReconnectTimer = useCallback(() => {
|
||||||
|
if (reconnectTimerRef.current !== null) {
|
||||||
|
clearTimeout(reconnectTimerRef.current)
|
||||||
|
reconnectTimerRef.current = null
|
||||||
|
}
|
||||||
|
}, [])
|
||||||
|
|
||||||
|
const connect = useCallback(() => {
|
||||||
|
if (!mountedRef.current || !enabled) return
|
||||||
|
|
||||||
|
// Clean up any existing connection
|
||||||
|
if (esRef.current) {
|
||||||
|
esRef.current.close()
|
||||||
|
esRef.current = null
|
||||||
|
}
|
||||||
|
|
||||||
|
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
|
||||||
|
|
||||||
|
const es = new EventSource(url)
|
||||||
|
esRef.current = es
|
||||||
|
|
||||||
|
es.onopen = () => {
|
||||||
|
if (!mountedRef.current) return
|
||||||
|
reconnectAttemptRef.current = 0
|
||||||
|
setStatus('connected')
|
||||||
|
onOpenRef.current?.()
|
||||||
|
}
|
||||||
|
|
||||||
|
es.onerror = (evt) => {
|
||||||
|
if (!mountedRef.current) return
|
||||||
|
|
||||||
|
// EventSource auto-retries but we manage our own to get back-off control
|
||||||
|
es.close()
|
||||||
|
esRef.current = null
|
||||||
|
|
||||||
|
onErrorRef.current?.(evt)
|
||||||
|
|
||||||
|
reconnectAttemptRef.current += 1
|
||||||
|
|
||||||
|
// Circuit-breaker: give up after reconnectLimit consecutive failures
|
||||||
|
if (reconnectLimit !== undefined && reconnectAttemptRef.current > reconnectLimit) {
|
||||||
|
setStatus('error')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
|
||||||
|
// Note: attempt is 1-based here (already incremented), so we use attempt-1 for the exponent
|
||||||
|
const delay = Math.min(
|
||||||
|
reconnectBaseMs * 2 ** (reconnectAttemptRef.current - 1),
|
||||||
|
reconnectMaxMs,
|
||||||
|
)
|
||||||
|
setStatus('reconnecting')
|
||||||
|
|
||||||
|
clearReconnectTimer()
|
||||||
|
reconnectTimerRef.current = setTimeout(() => {
|
||||||
|
if (mountedRef.current) connect()
|
||||||
|
}, delay)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register listeners for all known event types
|
||||||
|
for (const eventType of SSE_EVENTS) {
|
||||||
|
es.addEventListener(eventType, (evt: MessageEvent) => {
|
||||||
|
if (!mountedRef.current) return
|
||||||
|
let data: unknown = evt.data
|
||||||
|
try {
|
||||||
|
data = JSON.parse(evt.data as string)
|
||||||
|
} catch {
|
||||||
|
// leave as raw string
|
||||||
|
}
|
||||||
|
onMessageRef.current?.({ type: eventType, data })
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Catch-all for unnamed events (type == 'message').
|
||||||
|
// Won't fire for the named events registered via addEventListener above.
|
||||||
|
es.onmessage = (evt: MessageEvent) => {
|
||||||
|
if (!mountedRef.current) return
|
||||||
|
let data: unknown = evt.data
|
||||||
|
try {
|
||||||
|
data = JSON.parse(evt.data as string)
|
||||||
|
} catch {
|
||||||
|
// leave as raw string
|
||||||
|
}
|
||||||
|
onMessageRef.current?.({ type: 'message', data })
|
||||||
|
}
|
||||||
|
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, reconnectLimit, clearReconnectTimer])
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
mountedRef.current = true
|
||||||
|
if (enabled) connect()
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
mountedRef.current = false
|
||||||
|
clearReconnectTimer()
|
||||||
|
if (esRef.current) {
|
||||||
|
esRef.current.close()
|
||||||
|
esRef.current = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, [connect, enabled, clearReconnectTimer])
|
||||||
|
|
||||||
|
return { status }
|
||||||
|
}
|
||||||
+11
-4
@@ -4,13 +4,16 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
|
|||||||
import { BrowserRouter } from 'react-router-dom'
|
import { BrowserRouter } from 'react-router-dom'
|
||||||
import ErrorBoundary from './components/ErrorBoundary'
|
import ErrorBoundary from './components/ErrorBoundary'
|
||||||
import { ThemeProvider } from './hooks/useTheme'
|
import { ThemeProvider } from './hooks/useTheme'
|
||||||
|
import { SSEProvider } from './contexts/SSEContext'
|
||||||
import './index.css'
|
import './index.css'
|
||||||
import App from './App'
|
import App from './App'
|
||||||
|
|
||||||
const queryClient = new QueryClient({
|
const queryClient = new QueryClient({
|
||||||
defaultOptions: {
|
defaultOptions: {
|
||||||
queries: {
|
queries: {
|
||||||
staleTime: 30_000,
|
// No polling — real-time updates come through SSE.
|
||||||
|
// staleTime is kept high; data is pushed, not pulled.
|
||||||
|
staleTime: 60_000,
|
||||||
refetchOnWindowFocus: false,
|
refetchOnWindowFocus: false,
|
||||||
retry: 1,
|
retry: 1,
|
||||||
},
|
},
|
||||||
@@ -22,9 +25,13 @@ createRoot(document.getElementById('root')!).render(
|
|||||||
<ErrorBoundary>
|
<ErrorBoundary>
|
||||||
<ThemeProvider>
|
<ThemeProvider>
|
||||||
<QueryClientProvider client={queryClient}>
|
<QueryClientProvider client={queryClient}>
|
||||||
<BrowserRouter>
|
{/* SSEProvider must live inside QueryClientProvider so it can call
|
||||||
<App />
|
useQueryClient() to invalidate caches on incoming events. */}
|
||||||
</BrowserRouter>
|
<SSEProvider>
|
||||||
|
<BrowserRouter>
|
||||||
|
<App />
|
||||||
|
</BrowserRouter>
|
||||||
|
</SSEProvider>
|
||||||
</QueryClientProvider>
|
</QueryClientProvider>
|
||||||
</ThemeProvider>
|
</ThemeProvider>
|
||||||
</ErrorBoundary>
|
</ErrorBoundary>
|
||||||
|
|||||||
@@ -1,18 +1,36 @@
|
|||||||
import { useTheme } from '../hooks/useTheme'
|
import { useTheme } from '../hooks/useTheme'
|
||||||
import { useLocalStorage } from '../hooks/useLocalStorage'
|
import { useLocalStorage } from '../hooks/useLocalStorage'
|
||||||
import { Sun, Moon, Monitor, Zap, Clock } from 'lucide-react'
|
import { useSSEContext } from '../contexts/SSEContext'
|
||||||
|
import { Sun, Moon, Monitor, Zap, Radio } from 'lucide-react'
|
||||||
|
|
||||||
const REFRESH_PRESETS = [
|
const SSE_STATUS_COPY: Record<string, { label: string; description: string; color: string }> = {
|
||||||
{ label: '5s', value: 5_000 },
|
connected: {
|
||||||
{ label: '10s', value: 10_000 },
|
label: 'Connected',
|
||||||
{ label: '30s', value: 30_000 },
|
description: 'Real-time updates are active. Agent status, tasks, and progress stream live.',
|
||||||
{ label: '60s', value: 60_000 },
|
color: 'text-green-500',
|
||||||
]
|
},
|
||||||
|
connecting: {
|
||||||
|
label: 'Connecting…',
|
||||||
|
description: 'Establishing SSE connection to the backend.',
|
||||||
|
color: 'text-yellow-500',
|
||||||
|
},
|
||||||
|
reconnecting: {
|
||||||
|
label: 'Reconnecting…',
|
||||||
|
description: 'Connection lost. Retrying with exponential back-off.',
|
||||||
|
color: 'text-yellow-500',
|
||||||
|
},
|
||||||
|
error: {
|
||||||
|
label: 'Disconnected',
|
||||||
|
description: 'Could not connect to the SSE endpoint. Check that the backend is reachable.',
|
||||||
|
color: 'text-red-500',
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
export default function SettingsPage() {
|
export default function SettingsPage() {
|
||||||
const { isDark, toggleTheme } = useTheme()
|
const { isDark, toggleTheme } = useTheme()
|
||||||
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
|
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
|
||||||
const [refreshInterval, setRefreshInterval] = useLocalStorage('cc-refresh-interval', 30_000)
|
const { sseStatus } = useSSEContext()
|
||||||
|
const sseInfo = SSE_STATUS_COPY[sseStatus] ?? SSE_STATUS_COPY.error
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="space-y-8 max-w-2xl">
|
<div className="space-y-8 max-w-2xl">
|
||||||
@@ -80,45 +98,31 @@ export default function SettingsPage() {
|
|||||||
</div>
|
</div>
|
||||||
</section>
|
</section>
|
||||||
|
|
||||||
{/* Refresh */}
|
{/* Real-time connection status */}
|
||||||
<section className="space-y-4">
|
<section className="space-y-4">
|
||||||
<h2 className="text-lg font-semibold flex items-center gap-2">
|
<h2 className="text-lg font-semibold flex items-center gap-2">
|
||||||
<Clock size={20} className="text-primary" />
|
<Radio size={20} className="text-primary" />
|
||||||
Auto Refresh
|
Real-time Updates
|
||||||
</h2>
|
</h2>
|
||||||
|
|
||||||
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
|
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
|
||||||
<p className="text-sm text-on-surface-variant">Data refresh interval for agent status and logs</p>
|
<div className="flex items-center justify-between">
|
||||||
|
<div>
|
||||||
<div className="flex flex-col gap-2">
|
<p className="font-medium">SSE Connection</p>
|
||||||
<input
|
<p className="text-sm text-on-surface-variant mt-0.5">{sseInfo.description}</p>
|
||||||
type="range"
|
|
||||||
min="0"
|
|
||||||
max="3"
|
|
||||||
step="1"
|
|
||||||
value={REFRESH_PRESETS.findIndex((p) => p.value === refreshInterval)}
|
|
||||||
onChange={(e) => {
|
|
||||||
const idx = parseInt(e.target.value)
|
|
||||||
setRefreshInterval(REFRESH_PRESETS[idx].value)
|
|
||||||
}}
|
|
||||||
className="w-full accent-primary"
|
|
||||||
/>
|
|
||||||
<div className="flex justify-between text-xs text-on-surface-muted">
|
|
||||||
{REFRESH_PRESETS.map((p) => (
|
|
||||||
<button
|
|
||||||
key={p.label}
|
|
||||||
onClick={() => setRefreshInterval(p.value)}
|
|
||||||
className={`px-3 py-1 rounded-lg transition-colors ${
|
|
||||||
refreshInterval === p.value
|
|
||||||
? 'bg-primary/10 text-primary'
|
|
||||||
: 'hover:bg-surface-light'
|
|
||||||
}`}
|
|
||||||
>
|
|
||||||
{p.label}
|
|
||||||
</button>
|
|
||||||
))}
|
|
||||||
</div>
|
</div>
|
||||||
|
<span className={`text-sm font-semibold whitespace-nowrap ${sseInfo.color}`}>
|
||||||
|
{sseInfo.label}
|
||||||
|
</span>
|
||||||
</div>
|
</div>
|
||||||
|
<p className="text-xs text-on-surface-muted">
|
||||||
|
Endpoint: <code className="bg-surface-light px-1.5 py-0.5 rounded text-on-surface-variant">/api/events</code>
|
||||||
|
· Events: agent.status, agent.task, agent.progress, fleet.update
|
||||||
|
</p>
|
||||||
|
<p className="text-xs text-on-surface-muted">
|
||||||
|
Polling is disabled. All status updates are pushed from the server over a persistent SSE connection.
|
||||||
|
The client reconnects automatically with exponential back-off on drop.
|
||||||
|
</p>
|
||||||
</div>
|
</div>
|
||||||
</section>
|
</section>
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
@@ -0,0 +1,72 @@
|
|||||||
|
/**
|
||||||
|
* SSE event payload types matching the Go backend (internal/handler/sse.go).
|
||||||
|
*
|
||||||
|
* Event format on the wire:
|
||||||
|
* event: <eventType>
|
||||||
|
* data: <json>
|
||||||
|
*
|
||||||
|
* The types below define the backend contract. The SSEPayloadMap maps
|
||||||
|
* each event type string to its expected payload shape. SSEMessage is a
|
||||||
|
* discriminated union on `type` — when you switch on msg.type, TypeScript
|
||||||
|
* narrows msg.data to the correct payload interface automatically.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { AgentStatus } from '../types'
|
||||||
|
|
||||||
|
/** agent.status — agent came online, went offline, changed state */
|
||||||
|
export interface AgentStatusEvent {
|
||||||
|
agentId: string
|
||||||
|
status: AgentStatus
|
||||||
|
/** Optional human-readable reason (e.g. error message) */
|
||||||
|
reason?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
/** agent.task — a task was assigned to or completed by an agent */
|
||||||
|
export interface AgentTaskEvent {
|
||||||
|
agentId: string
|
||||||
|
taskId: string
|
||||||
|
title: string
|
||||||
|
action: 'assigned' | 'completed' | 'failed'
|
||||||
|
}
|
||||||
|
|
||||||
|
/** agent.progress — incremental progress update for a running task */
|
||||||
|
export interface AgentProgressEvent {
|
||||||
|
agentId: string
|
||||||
|
taskId: string
|
||||||
|
progress: number
|
||||||
|
/** Optional description of what is currently happening */
|
||||||
|
message?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* fleet.update — bulk refresh of all agents (e.g. after a deployment).
|
||||||
|
* The backend may send partial or complete agent state.
|
||||||
|
*/
|
||||||
|
export interface FleetUpdateEvent {
|
||||||
|
/** ISO timestamp of when the snapshot was taken */
|
||||||
|
timestamp: string
|
||||||
|
/** Number of agents in the fleet */
|
||||||
|
agentCount: number
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Union of all SSE data payloads keyed by event type. */
|
||||||
|
export type SSEPayloadMap = {
|
||||||
|
'agent.status': AgentStatusEvent
|
||||||
|
'agent.task': AgentTaskEvent
|
||||||
|
'agent.progress': AgentProgressEvent
|
||||||
|
'fleet.update': FleetUpdateEvent
|
||||||
|
connected: { clientCount: number }
|
||||||
|
message: unknown
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Discriminated SSE message — the `type` field narrows `data` via SSEPayloadMap.
|
||||||
|
*
|
||||||
|
* Usage:
|
||||||
|
* if (msg.type === 'agent.status') {
|
||||||
|
* msg.data.agentId // ✅ TypeScript knows this is AgentStatusEvent
|
||||||
|
* }
|
||||||
|
*/
|
||||||
|
export type SSEMessage = {
|
||||||
|
[K in keyof SSEPayloadMap]: { type: K; data: SSEPayloadMap[K] }
|
||||||
|
}[keyof SSEPayloadMap]
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
import '@testing-library/jest-dom'
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error'
|
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' | 'offline'
|
||||||
|
|
||||||
export interface Agent {
|
export interface Agent {
|
||||||
id: string
|
id: string
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
"target": "es2023",
|
"target": "es2023",
|
||||||
"lib": ["ES2023", "DOM"],
|
"lib": ["ES2023", "DOM"],
|
||||||
"module": "esnext",
|
"module": "esnext",
|
||||||
"types": ["vite/client"],
|
"types": ["vite/client", "vitest/globals"],
|
||||||
"skipLibCheck": true,
|
"skipLibCheck": true,
|
||||||
|
|
||||||
/* Bundler mode */
|
/* Bundler mode */
|
||||||
|
|||||||
@@ -20,5 +20,5 @@
|
|||||||
"erasableSyntaxOnly": true,
|
"erasableSyntaxOnly": true,
|
||||||
"noFallthroughCasesInSwitch": true
|
"noFallthroughCasesInSwitch": true
|
||||||
},
|
},
|
||||||
"include": ["vite.config.ts"]
|
"include": ["vite.config.ts", "vitest.config.ts"]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,11 @@
|
|||||||
|
import { defineConfig } from 'vitest/config'
|
||||||
|
import react from '@vitejs/plugin-react'
|
||||||
|
|
||||||
|
export default defineConfig({
|
||||||
|
plugins: [react()],
|
||||||
|
test: {
|
||||||
|
environment: 'jsdom',
|
||||||
|
globals: true,
|
||||||
|
setupFiles: ['./src/test-setup.ts'],
|
||||||
|
},
|
||||||
|
})
|
||||||
@@ -112,7 +112,7 @@ func main() {
|
|||||||
<-quit
|
<-quit
|
||||||
slog.Info("shutting down server...")
|
slog.Info("shutting down server...")
|
||||||
|
|
||||||
cancel() // stop gateway polling
|
cancel() // stop gateway clients
|
||||||
|
|
||||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
defer shutdownCancel()
|
defer shutdownCancel()
|
||||||
|
|||||||
@@ -10,30 +10,30 @@ import (
|
|||||||
|
|
||||||
// Config holds all application configuration.
|
// Config holds all application configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port int
|
Port int
|
||||||
DatabaseURL string
|
DatabaseURL string
|
||||||
CORSOrigin string
|
CORSOrigin string
|
||||||
LogLevel string
|
LogLevel string
|
||||||
Environment string
|
Environment string
|
||||||
GatewayRestURL string
|
GatewayRestURL string
|
||||||
GatewayRestPollInterval time.Duration
|
GatewayRestPollInterval time.Duration
|
||||||
WSGatewayURL string
|
WSGatewayURL string
|
||||||
WSGatewayToken string
|
WSGatewayToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load reads configuration from environment variables, applying defaults where
|
// Load reads configuration from environment variables, applying defaults where
|
||||||
// values are not set. All secrets come from the environment — nothing is hardcoded.
|
// values are not set. All secrets come from the environment — nothing is hardcoded.
|
||||||
func Load() *Config {
|
func Load() *Config {
|
||||||
return &Config{
|
return &Config{
|
||||||
Port: getEnvInt("PORT", 8080),
|
Port: getEnvInt("PORT", 8080),
|
||||||
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
|
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
|
||||||
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
||||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||||
Environment: getEnv("ENVIRONMENT", "development"),
|
Environment: getEnv("ENVIRONMENT", "development"),
|
||||||
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
||||||
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||||
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
||||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,10 @@
|
|||||||
// Package gateway provides an OpenClaw gateway integration client that
|
// Package gateway provides an OpenClaw gateway integration client that
|
||||||
// polls agent states, persists them via the repository layer, and broadcasts
|
// polls agent states, persists them via the repository layer, and broadcasts
|
||||||
// changes through the SSE broker for real-time frontend updates.
|
// changes through the SSE broker for real-time frontend updates.
|
||||||
|
//
|
||||||
|
// When a WSClient is wired via SetWSClient, the REST poller becomes a
|
||||||
|
// fallback: it waits for the WS client to signal readiness, and only starts
|
||||||
|
// polling if WS fails to connect within 30 seconds.
|
||||||
package gateway
|
package gateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -29,7 +33,7 @@ type Client struct {
|
|||||||
broker *handler.Broker
|
broker *handler.Broker
|
||||||
wsClient *WSClient // optional WS client; when set, REST is fallback only
|
wsClient *WSClient // optional WS client; when set, REST is fallback only
|
||||||
wsReady chan struct{} // closed once WS connection is established
|
wsReady chan struct{} // closed once WS connection is established
|
||||||
wsReadyOnce sync.Once // protects wsReady close from double-close race
|
wsReadyOnce sync.Once // protects wsReady close from double-close race
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config holds gateway client configuration, typically loaded from environment.
|
// Config holds gateway client configuration, typically loaded from environment.
|
||||||
@@ -140,7 +144,6 @@ func (c *Client) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, ga := range agents {
|
for _, ga := range agents {
|
||||||
// Check if agent already exists; if so, update; otherwise create.
|
|
||||||
existing, err := c.agents.Get(ctx, ga.ID)
|
existing, err := c.agents.Get(ctx, ga.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Not found — create it
|
// Not found — create it
|
||||||
@@ -185,51 +188,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
|||||||
slog.Info("seeding demo agents")
|
slog.Info("seeding demo agents")
|
||||||
demoAgents := []models.AgentCardData{
|
demoAgents := []models.AgentCardData{
|
||||||
{
|
{
|
||||||
ID: "otto",
|
ID: "otto",
|
||||||
DisplayName: "Otto",
|
DisplayName: "Otto",
|
||||||
Role: "Orchestrator",
|
Role: "Orchestrator",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Orchestrating tasks"),
|
CurrentTask: strPtr("Orchestrating tasks"),
|
||||||
SessionKey: "otto-session",
|
SessionKey: "otto-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "rex",
|
ID: "rex",
|
||||||
DisplayName: "Rex",
|
DisplayName: "Rex",
|
||||||
Role: "Frontend Dev",
|
Role: "Frontend Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "rex-session",
|
SessionKey: "rex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "dex",
|
ID: "dex",
|
||||||
DisplayName: "Dex",
|
DisplayName: "Dex",
|
||||||
Role: "Backend Dev",
|
Role: "Backend Dev",
|
||||||
Status: models.AgentStatusThinking,
|
Status: models.AgentStatusThinking,
|
||||||
CurrentTask: strPtr("Designing API contracts"),
|
CurrentTask: strPtr("Designing API contracts"),
|
||||||
SessionKey: "dex-session",
|
SessionKey: "dex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "hex",
|
ID: "hex",
|
||||||
DisplayName: "Hex",
|
DisplayName: "Hex",
|
||||||
Role: "Database Specialist",
|
Role: "Database Specialist",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Reviewing schema migrations"),
|
CurrentTask: strPtr("Reviewing schema migrations"),
|
||||||
SessionKey: "hex-session",
|
SessionKey: "hex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "pip",
|
ID: "pip",
|
||||||
DisplayName: "Pip",
|
DisplayName: "Pip",
|
||||||
Role: "Edge Device Dev",
|
Role: "Edge Device Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "pip-session",
|
SessionKey: "pip-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -92,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
|||||||
|
|
||||||
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
|
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
|
||||||
type wsFrame struct {
|
type wsFrame struct {
|
||||||
Type string `json:"type"` // "req", "res", "event"
|
Type string `json:"type"` // "req", "res", "event"
|
||||||
ID string `json:"id,omitempty"` // request/response correlation
|
ID string `json:"id,omitempty"` // request/response correlation
|
||||||
Method string `json:"method,omitempty"` // method name (req frames)
|
Method string `json:"method,omitempty"` // method name (req frames)
|
||||||
Event string `json:"event,omitempty"` // event name (event frames)
|
Event string `json:"event,omitempty"` // event name (event frames)
|
||||||
Params json.RawMessage `json:"params,omitempty"`
|
Params json.RawMessage `json:"params,omitempty"`
|
||||||
Result json.RawMessage `json:"result,omitempty"`
|
Result json.RawMessage `json:"result,omitempty"`
|
||||||
Error *wsError `json:"error,omitempty"`
|
Error *wsError `json:"error,omitempty"`
|
||||||
@@ -229,7 +229,34 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
c.connId = helloOK.ConnID
|
c.connId = helloOK.ConnID
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
// Notify REST client that WS is live so it stands down
|
// Step 2b: Register live event handlers BEFORE starting the read
|
||||||
|
// loop. This eliminates the race window where readLoop dispatches
|
||||||
|
// live events as "unhandled" because no handlers are registered yet.
|
||||||
|
// The handlers only depend on c.agents and c.broker, which are wired
|
||||||
|
// in the constructor — they do not need initialSync to have completed.
|
||||||
|
c.registerEventHandlers()
|
||||||
|
|
||||||
|
// Step 2c: Start the read loop in a goroutine so that Send() in
|
||||||
|
// initialSync can receive responses. The read loop goroutine will
|
||||||
|
// continue running after initialSync completes, routing live events
|
||||||
|
// and any future RPC responses. Because handlers are already
|
||||||
|
// registered, any events arriving during or after initialSync are
|
||||||
|
// dispatched correctly.
|
||||||
|
readLoopErrCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
readLoopErrCh <- c.readLoop(ctx, conn)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Step 2d: Initial sync — fetch agents + sessions from gateway.
|
||||||
|
// This works because the read loop is active and will route
|
||||||
|
// response frames back to Send() via handleResponse.
|
||||||
|
if err := c.initialSync(ctx); err != nil {
|
||||||
|
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify REST client that WS is live so it stands down.
|
||||||
|
// This must happen AFTER initialSync so that the REST poller
|
||||||
|
// doesn't start polling while we're still syncing.
|
||||||
if c.restClient != nil {
|
if c.restClient != nil {
|
||||||
c.restClient.MarkWSReady()
|
c.restClient.MarkWSReady()
|
||||||
c.logger.Info("ws client notified REST fallback to stand down")
|
c.logger.Info("ws client notified REST fallback to stand down")
|
||||||
@@ -238,16 +265,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
||||||
c.wsReadyOnce = sync.Once{}
|
c.wsReadyOnce = sync.Once{}
|
||||||
|
|
||||||
// Step 2b: Initial sync — fetch agents + sessions from gateway
|
// Step 3: Wait for the read loop goroutine to finish (blocks
|
||||||
if err := c.initialSync(ctx); err != nil {
|
// until the connection drops or context is cancelled).
|
||||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
return <-readLoopErrCh
|
||||||
}
|
|
||||||
|
|
||||||
// Step 2c: Register live event handlers
|
|
||||||
c.registerEventHandlers()
|
|
||||||
|
|
||||||
// Step 3: Read loop
|
|
||||||
return c.readLoop(ctx, conn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// readChallenge reads the first frame from the gateway, which must be a
|
// readChallenge reads the first frame from the gateway, which must be a
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
@@ -466,6 +467,236 @@ func TestAgentItemToCard(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── 6. Test: Initial sync ordering (readLoop active before Send) ──────────
|
||||||
|
|
||||||
|
// TestConnectAndRun_InitialSyncOrdering verifies that the WS client
|
||||||
|
// completes initial sync successfully. This test would hang/timeout if
|
||||||
|
// readLoop were NOT started before initialSync, because Send() relies on
|
||||||
|
// readLoop→routeFrame→handleResponse to deliver RPC responses.
|
||||||
|
func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
|
||||||
|
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||||
|
broker := handler.NewBroker()
|
||||||
|
capture := newBroadcastCapture(broker)
|
||||||
|
defer capture.close()
|
||||||
|
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
// Handshake
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
|
||||||
|
// After handshake, respond to RPCs
|
||||||
|
for {
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
reqID, _ := req["id"].(string)
|
||||||
|
method, _ := req["method"].(string)
|
||||||
|
|
||||||
|
var result any
|
||||||
|
switch method {
|
||||||
|
case "agents.list":
|
||||||
|
result = []map[string]any{
|
||||||
|
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
||||||
|
{"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"},
|
||||||
|
}
|
||||||
|
case "sessions.list":
|
||||||
|
result = []map[string]any{
|
||||||
|
{"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"},
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
result = map[string]any{}
|
||||||
|
}
|
||||||
|
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": result,
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
client.Start(ctx)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for initial sync to complete by checking repo state.
|
||||||
|
// The agents should be persisted from the RPC responses.
|
||||||
|
deadline := time.Now().Add(5 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
repo.mu.Lock()
|
||||||
|
_, ottoOK := repo.agents["otto"]
|
||||||
|
_, dexOK := repo.agents["dex"]
|
||||||
|
repo.mu.Unlock()
|
||||||
|
if ottoOK && dexOK {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
repo.mu.Lock()
|
||||||
|
_, ottoOK := repo.agents["otto"]
|
||||||
|
_, dexOK := repo.agents["dex"]
|
||||||
|
repo.mu.Unlock()
|
||||||
|
|
||||||
|
if !ottoOK {
|
||||||
|
t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()")
|
||||||
|
}
|
||||||
|
if !dexOK {
|
||||||
|
t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("WSClient did not shut down cleanly")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
|
||||||
|
|
||||||
|
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
|
||||||
|
// arriving during initial sync are NOT dropped. This is a regression test
|
||||||
|
// for the race where readLoop started before registerEventHandlers(),
|
||||||
|
// causing events read during that window to be logged as "unhandled" and lost.
|
||||||
|
//
|
||||||
|
// The mock server sends a live event (sessions.changed) right after the
|
||||||
|
// handshake, interleaved with the RPC responses for agents.list and
|
||||||
|
// sessions.list. The test asserts the event is received by the handler.
|
||||||
|
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
|
||||||
|
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||||
|
broker := handler.NewBroker()
|
||||||
|
capture := newBroadcastCapture(broker)
|
||||||
|
defer capture.close()
|
||||||
|
|
||||||
|
// Pre-seed an agent so the event handler can update it.
|
||||||
|
repo.agents["otto"] = models.AgentCardData{
|
||||||
|
ID: "otto",
|
||||||
|
DisplayName: "Otto",
|
||||||
|
Status: models.AgentStatusIdle,
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
// Handshake
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
|
||||||
|
// After handshake, process RPCs and inject a live event.
|
||||||
|
for {
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
reqID, _ := req["id"].(string)
|
||||||
|
method, _ := req["method"].(string)
|
||||||
|
|
||||||
|
// Respond to agents.list RPC
|
||||||
|
if method == "agents.list" {
|
||||||
|
// Before responding, inject a live event — simulates
|
||||||
|
// a gateway pushing a presence update during sync.
|
||||||
|
evt := map[string]any{
|
||||||
|
"type": "event",
|
||||||
|
"event": "presence",
|
||||||
|
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(evt); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now send the RPC response
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": []map[string]any{
|
||||||
|
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Respond to sessions.list RPC
|
||||||
|
if method == "sessions.list" {
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": []map[string]any{},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default response for other methods
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": map[string]any{},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
client.Start(ctx)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for the presence event to be processed by checking the repo.
|
||||||
|
// The presence handler updates the agent, so we check for the
|
||||||
|
// lastActivityAt change.
|
||||||
|
deadline := time.Now().Add(5 * time.Second)
|
||||||
|
var lastActivity string
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
repo.mu.Lock()
|
||||||
|
if a, ok := repo.agents["otto"]; ok {
|
||||||
|
lastActivity = a.LastActivity
|
||||||
|
}
|
||||||
|
repo.mu.Unlock()
|
||||||
|
if lastActivity == "2025-05-20T12:30:00Z" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastActivity != "2025-05-20T12:30:00Z" {
|
||||||
|
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("WSClient did not shut down cleanly")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStrPtr(t *testing.T) {
|
func TestStrPtr(t *testing.T) {
|
||||||
s := "hello"
|
s := "hello"
|
||||||
p := strPtr(s)
|
p := strPtr(s)
|
||||||
|
|||||||
@@ -65,6 +65,21 @@ func New(deps *Dependencies) *chi.Mux {
|
|||||||
|
|
||||||
// ── API v1 routes ──────────────────────────────────────────────────────
|
// ── API v1 routes ──────────────────────────────────────────────────────
|
||||||
r.Route("/api", func(api chi.Router) {
|
r.Route("/api", func(api chi.Router) {
|
||||||
|
// Health check (under /api)
|
||||||
|
api.Get("/health", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
status := "ok"
|
||||||
|
if deps.Pool != nil {
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := deps.Pool.Ping(ctx); err != nil {
|
||||||
|
w.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
status = "db_unhealthy"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.Write([]byte(`{"status":"` + status + `"}`))
|
||||||
|
})
|
||||||
|
|
||||||
// Agents CRUD
|
// Agents CRUD
|
||||||
api.Route("/agents", func(agents chi.Router) {
|
api.Route("/agents", func(agents chi.Router) {
|
||||||
agents.Get("/", deps.Handler.ListAgents) // GET /api/agents
|
agents.Get("/", deps.Handler.ListAgents) // GET /api/agents
|
||||||
|
|||||||
Reference in New Issue
Block a user