Compare commits

...

21 Commits

Author SHA1 Message Date
9d3ddc4340 Merge branch 'dev' into agent/rex/CUB-125-realtime-sse
Some checks failed
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
2026-05-20 12:52:12 -04:00
ffc127f12d CUB-125: address Grimm review — tests, type fixes, error state circuit breaker
Some checks failed
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
- Add missing 'offline' to AgentStatus union type (types/index.ts)
- Add max-retry circuit breaker to useSSE; error state is now reachable
- Wire typed SSE payloads (SSEPayloadMap discriminated union) into useRealtimeSync
- Add Vitest + 20 unit tests: useSSE lifecycle, back-off, circuit breaker,
  event parsing, cleanup; useRealtimeSync event-to-invalidation mapping
- Rebase on dev to remove stale CUB-119 legacy-deletion commit and align
  CI workflow (dev already consolidated into single dev.yml)
- Tests: npm test → 20/20 pass; Build: npm run build → 0 errors
2026-05-20 16:51:13 +00:00
3d5bf16d37 Merge pull request 'CUB-207: unit tests for event handlers and initial sync' (!44) from agent/dex/CUB-207-event-sync-tests-v2 into dev
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 0s
Dev Build & Deploy / docker-build-push (push) Has been skipped
Reviewed-on: #44
2026-05-20 12:47:27 -04:00
724a4a9427 CUB-125: implement real-time SSE/WebSocket in React frontend
- Add useSSE hook with exponential back-off reconnect (1s → 30s)
- Add useRealtimeSync hook: maps SSE events to React Query invalidation
  (agent.status → agents; agent.task/agent.progress → tasks+agents; fleet.update → all)
- Add SSEContext/SSEProvider so connection status is available app-wide
- Mount SSEProvider in main.tsx inside QueryClientProvider (no polling)
- Show live/connecting/reconnecting/disconnected badge in sidebar + mobile header
- Update SettingsPage: replace polling interval UI with SSE status panel
- Disable React Query polling (staleTime 60s); all updates pushed via SSE

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 16:32:12 +00:00
5347944c4c Merge branch 'dev' into agent/dex/CUB-207-event-sync-tests-v2
Some checks failed
Dev Build & Deploy / test-and-build (pull_request) Failing after 14m36s
Dev Build & Deploy / docker-build-push (pull_request) Blocked by required conditions
2026-05-20 12:24:40 -04:00
48a8598d3b CUB-CI: Consolidate workflows — remove build-dev.yaml, fix dev.yml
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 1s
Dev Build & Deploy / docker-build-push (push) Failing after 10m1s
2026-05-20 12:24:32 -04:00
a0eb393c6c CUB-CI: Consolidate CI — switch to ubuntu-latest with manual Go/Node install
Some checks failed
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
- Remove custom go-react runner label (was inconsistent on PR branches)
- Replace with ubuntu-latest + manual Go 1.23 / Node 22 install
  (actions/setup-go and setup-node don't work with Gitea Act runner)
- Remove duplicate build-dev.yaml workflow (already deleted)
- All steps: Go test → Go build → npm ci → npm lint → npm build
- Docker push on push events only (unchanged)
2026-05-20 12:23:32 -04:00
d294818581 CUB-CI: Remove redundant build-dev.yaml 2026-05-20 12:23:16 -04:00
9e0366e780 CUB-CI: Remove redundant build-dev.yaml — dev.yml already handles this
build-dev.yaml uses actions/setup-go@v5 and actions/setup-node@v4 which
are incompatible with Gitea Act runner (no node20 runtime). dev.yml is
the canonical build workflow; having two competing workflows on the same
triggers was causing duplicate CI runs and misleading failures.
2026-05-20 12:22:19 -04:00
20404b30bb Merge branch 'dev' into agent/dex/CUB-207-event-sync-tests-v2
Some checks failed
Build (Dev) / trigger-deploy (pull_request) Blocked by required conditions
Dev Build & Deploy / test-and-build (pull_request) Waiting to run
Dev Build & Deploy / docker-build-push (pull_request) Blocked by required conditions
Build (Dev) / build-frontend (pull_request) Failing after 1s
Build (Dev) / build-go-backend (pull_request) Failing after 13m33s
2026-05-20 12:15:43 -04:00
b7a54c8461 Merge pull request 'CUB-203: WebSocket client scaffold for OpenClaw gateway v3' (!41) from agent/dex/CUB-203-ws-client-scaffold into dev
Some checks failed
Build (Dev) / build-frontend (push) Failing after 1s
Build (Dev) / trigger-deploy (push) Has been skipped
Build (Dev) / build-go-backend (push) Failing after 0s
Dev Build & Deploy / test-and-build (push) Has been cancelled
Dev Build & Deploy / docker-build-push (push) Has been cancelled
Reviewed-on: #41
2026-05-20 12:14:02 -04:00
b6e44cb4f8 Merge branch 'dev' into agent/dex/CUB-203-ws-client-scaffold
Some checks failed
Dev Build & Deploy / docker-build-push (pull_request) Blocked by required conditions
Dev Build & Deploy / test-and-build (pull_request) Waiting to run
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / build-frontend (pull_request) Failing after 0s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
2026-05-20 09:01:58 -04:00
Joshua King
49b959aee5 Add CI Docker image with Go 1.23 + Node 22 pre-installed, update workflow to use go-react label
Some checks failed
Dev Build & Deploy / test-and-build (push) Has been cancelled
Dev Build & Deploy / docker-build-push (push) Has been cancelled
2026-05-20 08:47:16 -04:00
Joshua King
ae37d79aa8 Switch to ubuntu-dotnet runner label to bypass /var/run symlink issue
Some checks failed
Dev Build & Deploy / test-and-build (push) Has been cancelled
Dev Build & Deploy / docker-build-push (push) Has been cancelled
2026-05-20 08:39:05 -04:00
Joshua King
8fb4183abe Add container spec to fix /var/run symlink path escape error
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 7s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-20 08:30:49 -04:00
Joshua King
ee6ad10db9 Replace setup-go/setup-node actions with manual install for Gitea runner compatibility
Some checks failed
Dev Build & Deploy / docker-build-push (push) Blocked by required conditions
Dev Build & Deploy / test-and-build (push) Failing after 14m18s
2026-05-20 08:10:13 -04:00
Joshua King
5f42a3be18 Rename GITEA_TOKEN secret to ACCESS_TOKEN
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 4s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-20 08:08:21 -04:00
Joshua King
0e452941dd Remove deploy-dev job - deployment handled via docker-compose
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 0s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-20 08:06:52 -04:00
Joshua King
87cb517623 Update CI workflow to match Go+React stack with Docker registry push
Some checks failed
Dev Build & Deploy / test-and-build (push) Failing after 1s
Dev Build & Deploy / docker-build-push (push) Has been skipped
Dev Build & Deploy / deploy-dev (push) Has been skipped
2026-05-20 08:04:50 -04:00
Dex
439741e55f CUB-207: fix unused broker variable in test
Some checks failed
Dev Build / build-test (pull_request) Waiting to run
Dev Build / deploy-dev (pull_request) Blocked by required conditions
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / build-frontend (pull_request) Failing after 1s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
openclaw/grimm-review Review in progress
2026-05-20 08:04:05 -04:00
Dex
3c26b8deba CUB-207: add unit tests for event handlers and initial sync 2026-05-20 11:58:42 +00:00
21 changed files with 2797 additions and 159 deletions

View File

@@ -1,85 +0,0 @@
name: Build (Dev)
on:
push:
branches: [dev]
pull_request:
branches: [dev]
workflow_dispatch:
env:
GO_VERSION: "1.23"
NODE_VERSION: "22"
BINARY_NAME: server
jobs:
build-go-backend:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: Test Go backend
working-directory: go-backend
run: go test ./...
- name: Build Go binary
working-directory: go-backend
run: |
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
go build -ldflags="-s -w -X main.version=${GITHUB_SHA:0:8}" \
-o ${{ env.BINARY_NAME }} ./cmd/server
- name: Upload Go binary
uses: actions/upload-artifact@v4
with:
name: go-backend-binary
path: go-backend/${{ env.BINARY_NAME }}
retention-days: 3
build-frontend:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Node
uses: actions/setup-node@v4
with:
node-version: ${{ env.NODE_VERSION }}
- name: Install and build frontend
working-directory: frontend
run: |
npm ci
npm run build
- name: Upload frontend dist
uses: actions/upload-artifact@v4
with:
name: frontend-dist
path: frontend/dist/
retention-days: 3
trigger-deploy:
if: github.event_name == 'push'
needs: [build-go-backend, build-frontend]
runs-on: ubuntu-latest
steps:
- name: Trigger deploy workflow
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.repos.createDispatchEvent({
owner: context.repo.owner,
repo: context.repo.repo,
event_type: 'dev-build-success',
client_payload: {
sha: context.sha,
ref: context.ref
}
})

View File

@@ -1,4 +1,4 @@
name: Dev Build name: Dev Build & Deploy
on: on:
pull_request: pull_request:
@@ -6,45 +6,82 @@ on:
push: push:
branches: [dev] branches: [dev]
env:
GO_VERSION: "1.23"
NODE_VERSION: "22"
REGISTRY: code.cubecraftcreations.com
BACKEND_IMAGE: ${{ gitea.repository }}/backend
FRONTEND_IMAGE: ${{ gitea.repository }}/frontend
jobs: jobs:
build-test: test-and-build:
runs-on: ubuntu-dotnet runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Restore backend - name: Install Go
run: dotnet restore run: |
curl -fsSL "https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz" | sudo tar -C /usr/local -xz
echo "/usr/local/go/bin" >> $GITHUB_PATH
- name: Install Node.js
run: |
curl -fsSL "https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.xz" | sudo tar -C /usr/local --strip-components=1 -xJ
echo "/usr/local/bin" >> $GITHUB_PATH
- name: Run backend tests
run: go test ./...
working-directory: ./go-backend
- name: Build backend - name: Build backend
run: dotnet build --no-restore --configuration Release run: go build -ldflags="-w -s" -o /tmp/server ./cmd/server
working-directory: ./go-backend
- name: Test backend
run: dotnet test --no-build --configuration Release
- name: Setup Node
uses: actions/setup-node@v4
with:
node-version: "24"
- name: Install frontend deps - name: Install frontend deps
run: npm ci run: npm ci
working-directory: ./frontend working-directory: ./frontend
- name: Lint frontend
run: npm run lint
working-directory: ./frontend
- name: Build frontend - name: Build frontend
run: npm run build run: npm run build
working-directory: ./frontend working-directory: ./frontend
deploy-dev: docker-build-push:
needs: build-test needs: test-and-build
if: gitea.event_name == 'push' if: gitea.event_name == 'push'
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Deploy dev - uses: actions/checkout@v4
run: |
echo "${{ secrets.DEV_DEPLOY_SSH_KEY }}" > /tmp/dev_key - name: Login to Gitea Container Registry
chmod 600 /tmp/dev_key uses: docker/login-action@v3
ssh -i /tmp/dev_key -o StrictHostKeyChecking=no \ with:
${{ secrets.DEV_DEPLOY_USER }}@${{ secrets.DEV_DEPLOY_HOST }} \ registry: ${{ env.REGISTRY }}
"${{ secrets.DEV_DEPLOY_PATH }}/deploy.sh" username: ${{ gitea.actor }}
password: ${{ secrets.ACCESS_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build & push backend image
uses: docker/build-push-action@v6
with:
context: ./go-backend
push: true
tags: |
${{ env.REGISTRY }}/${{ env.BACKEND_IMAGE }}:dev
${{ env.REGISTRY }}/${{ env.BACKEND_IMAGE }}:${{ gitea.sha }}
- name: Build & push frontend image
uses: docker/build-push-action@v6
with:
context: ./frontend
push: true
tags: |
${{ env.REGISTRY }}/${{ env.FRONTEND_IMAGE }}:dev
${{ env.REGISTRY }}/${{ env.FRONTEND_IMAGE }}:${{ gitea.sha }}

11
ci-image/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM catthehacker/ubuntu:act-latest
# Install Go 1.23
RUN curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
# Install Node 22
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
ENV PATH="/usr/local/go/bin:${PATH}"

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,9 @@
"dev": "vite", "dev": "vite",
"build": "tsc -b && vite build", "build": "tsc -b && vite build",
"lint": "eslint .", "lint": "eslint .",
"preview": "vite preview" "preview": "vite preview",
"test": "vitest run",
"test:watch": "vitest"
}, },
"dependencies": { "dependencies": {
"@tanstack/react-query": "^5.100.9", "@tanstack/react-query": "^5.100.9",
@@ -20,6 +22,8 @@
"devDependencies": { "devDependencies": {
"@eslint/js": "^10.0.1", "@eslint/js": "^10.0.1",
"@tailwindcss/vite": "^4.2.4", "@tailwindcss/vite": "^4.2.4",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.2",
"@types/node": "^24.12.2", "@types/node": "^24.12.2",
"@types/react": "^19.2.14", "@types/react": "^19.2.14",
"@types/react-dom": "^19.2.3", "@types/react-dom": "^19.2.3",
@@ -29,10 +33,12 @@
"eslint-plugin-react-hooks": "^7.1.1", "eslint-plugin-react-hooks": "^7.1.1",
"eslint-plugin-react-refresh": "^0.5.2", "eslint-plugin-react-refresh": "^0.5.2",
"globals": "^17.5.0", "globals": "^17.5.0",
"jsdom": "^29.1.1",
"postcss": "^8.5.14", "postcss": "^8.5.14",
"tailwindcss": "^4.2.4", "tailwindcss": "^4.2.4",
"typescript": "~6.0.2", "typescript": "~6.0.2",
"typescript-eslint": "^8.58.2", "typescript-eslint": "^8.58.2",
"vite": "^8.0.10" "vite": "^8.0.10",
"vitest": "^4.1.7"
} }
} }

View File

@@ -1,6 +1,8 @@
import { useState } from 'react' import { useState } from 'react'
import { NavLink } from 'react-router-dom' import { NavLink } from 'react-router-dom'
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X } from 'lucide-react' import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X, Wifi, WifiOff, Loader } from 'lucide-react'
import { useSSEContext } from '../contexts/SSEContext'
import type { SSEStatus } from '../hooks/useSSE'
const navItems = [ const navItems = [
{ to: '/', icon: Command, label: 'Hub' }, { to: '/', icon: Command, label: 'Hub' },
@@ -10,9 +12,29 @@ const navItems = [
{ to: '/settings', icon: Settings, label: 'Settings' }, { to: '/settings', icon: Settings, label: 'Settings' },
] ]
/** Small status pill shown in the sidebar footer and mobile header. */
function SSEStatusBadge({ status, showLabel = false }: { status: SSEStatus; showLabel?: boolean }) {
const cfg = {
connected: { icon: Wifi, color: 'text-green-500', label: 'Live' },
connecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Connecting' },
reconnecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Reconnecting' },
error: { icon: WifiOff, color: 'text-red-500', label: 'Disconnected' },
}[status]
const Icon = cfg.icon
return (
<div className="flex items-center gap-1.5" title={cfg.label}>
<Icon size={14} className={cfg.color} />
{showLabel && <span className={`text-xs ${cfg.color}`}>{cfg.label}</span>}
</div>
)
}
export default function Layout({ children }: { children: React.ReactNode }) { export default function Layout({ children }: { children: React.ReactNode }) {
const [expanded, setExpanded] = useState(false) const [expanded, setExpanded] = useState(false)
const [mobileOpen, setMobileOpen] = useState(false) const [mobileOpen, setMobileOpen] = useState(false)
const { sseStatus } = useSSEContext()
return ( return (
<div className="flex min-h-screen bg-surface-darkest text-on-surface"> <div className="flex min-h-screen bg-surface-darkest text-on-surface">
@@ -46,6 +68,15 @@ export default function Layout({ children }: { children: React.ReactNode }) {
</NavLink> </NavLink>
))} ))}
</nav> </nav>
{/* SSE connection status — footer of sidebar */}
<div className="px-4 py-3 border-t border-surface-light flex items-center gap-2">
<SSEStatusBadge status={sseStatus} />
{expanded && (
<span className="text-xs text-on-surface-muted whitespace-nowrap">
{sseStatus === 'connected' ? 'Live updates on' : sseStatus}
</span>
)}
</div>
</aside> </aside>
{/* Mobile Header + Bottom Nav */} {/* Mobile Header + Bottom Nav */}
@@ -54,6 +85,7 @@ export default function Layout({ children }: { children: React.ReactNode }) {
<div className="flex items-center gap-2"> <div className="flex items-center gap-2">
<Command size={22} className="text-primary" /> <Command size={22} className="text-primary" />
<span className="font-bold">Control Center</span> <span className="font-bold">Control Center</span>
<SSEStatusBadge status={sseStatus} />
</div> </div>
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2"> <button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
{mobileOpen ? <X size={22} /> : <Menu size={22} />} {mobileOpen ? <X size={22} /> : <Menu size={22} />}

View File

@@ -0,0 +1,23 @@
/**
* SSEContext — provides SSE connection status throughout the component tree.
* Mount <SSEProvider> once inside QueryClientProvider.
*/
import { createContext, useContext, type ReactNode } from 'react'
import { useRealtimeSync } from '../hooks/useRealtimeSync'
import type { SSEStatus } from '../hooks/useSSE'
interface SSEContextValue {
sseStatus: SSEStatus
}
const SSEContext = createContext<SSEContextValue>({ sseStatus: 'connecting' })
export function SSEProvider({ children }: { children: ReactNode }) {
const { sseStatus } = useRealtimeSync()
return <SSEContext.Provider value={{ sseStatus }}>{children}</SSEContext.Provider>
}
/** Access the SSE connection status from any component. */
export function useSSEContext(): SSEContextValue {
return useContext(SSEContext)
}

View File

@@ -0,0 +1,129 @@
/**
* Tests for useRealtimeSync — event → query invalidation mapping.
*
* Uses .tsx extension so Vite/OXC can parse JSX in the wrapper component.
*/
import { renderHook } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import * as useSSEModule from './useSSE'
import { useRealtimeSync } from './useRealtimeSync'
import React from 'react'
import type { SSEMessage } from '../services/sse'
describe('useRealtimeSync', () => {
let queryClient: QueryClient
let mockSSEOnMessage: ((msg: { type: string; data: unknown }) => void) | null = null
beforeEach(() => {
queryClient = new QueryClient({
defaultOptions: { queries: { retry: false } },
})
mockSSEOnMessage = null
// Spy on useSSE to capture the onMessage callback
vi.spyOn(useSSEModule, 'useSSE').mockImplementation((opts) => {
mockSSEOnMessage = opts?.onMessage ?? null
return { status: 'connected' }
})
})
afterEach(() => {
vi.restoreAllMocks()
})
function render() {
return renderHook(() => useRealtimeSync(), {
wrapper: ({ children }: { children: React.ReactNode }) => (
React.createElement(QueryClientProvider, { client: queryClient }, children)
),
})
}
it('invalidates ["agents"] on agent.status event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.status',
data: { agentId: 'a1', status: 'active' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(1)
})
it('invalidates ["tasks"] and ["agents"] on agent.task event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.task',
data: { agentId: 'a1', taskId: 't1', title: 'Test', action: 'assigned' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(2)
})
it('invalidates ["tasks"] and ["agents"] on agent.progress event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.progress',
data: { agentId: 'a1', taskId: 't1', progress: 50, message: 'working' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(2)
})
it('invalidates ["agents"], ["sessions"], ["tasks"] on fleet.update event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'fleet.update',
data: { timestamp: '2026-05-20T12:00:00Z', agentCount: 5 },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['sessions'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledTimes(3)
})
it('does nothing on connected event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'connected',
data: { clientCount: 1 },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).not.toHaveBeenCalled()
})
it('does nothing on unknown event types', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
mockSSEOnMessage!({ type: 'unknown.event', data: {} })
expect(invalidateSpy).not.toHaveBeenCalled()
})
it('returns sseStatus from useSSE', () => {
const { result } = render()
expect(result.current.sseStatus).toBe('connected')
})
})

View File

@@ -0,0 +1,64 @@
/**
* useRealtimeSync — mounts the SSE connection once at the app level and
* wires incoming events to React Query cache invalidation.
*
* Event → query key mapping:
* agent.status → ['agents']
* agent.task → ['tasks'], ['agents']
* agent.progress → ['tasks'], ['agents']
* fleet.update → ['agents'], ['sessions'], ['tasks']
*/
import { useQueryClient } from '@tanstack/react-query'
import { useCallback } from 'react'
import { useSSE, type SSEStatus } from './useSSE'
import type { SSEMessage } from '../services/sse'
export function useRealtimeSync(): { sseStatus: SSEStatus } {
const queryClient = useQueryClient()
const handleMessage = useCallback(
(raw: { type: string; data: unknown }) => {
// Cast to discriminated union — the backend contract guarantees these shapes
const msg = raw as SSEMessage
switch (msg.type) {
case 'agent.status':
// msg.data: AgentStatusEvent { agentId, status, reason? }
void msg.data.agentId // retained for type-narrowing — ensures payload matches contract
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'agent.task':
// msg.data: AgentTaskEvent { agentId, taskId, title, action }
void msg.data.agentId
queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'agent.progress':
// msg.data: AgentProgressEvent { agentId, taskId, progress, message? }
void msg.data.agentId
queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'fleet.update':
// msg.data: FleetUpdateEvent { timestamp, agentCount }
void msg.data.agentCount
queryClient.invalidateQueries({ queryKey: ['agents'] })
queryClient.invalidateQueries({ queryKey: ['sessions'] })
queryClient.invalidateQueries({ queryKey: ['tasks'] })
break
default:
// 'connected' and unknown events — no action needed
break
}
},
[queryClient],
)
const { status: sseStatus } = useSSE({ onMessage: handleMessage })
return { sseStatus }
}

View File

@@ -0,0 +1,267 @@
/**
* Tests for useSSE — SSE connection lifecycle, back-off, event parsing, and cleanup.
*
* jsdom does not include EventSource, so we mock it completely.
*/
import { renderHook, act, waitFor } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { useSSE } from './useSSE'
// ---------------------------------------------------------------------------
// Mock EventSource — defined as a plain class so `new EventSource()` works
// ---------------------------------------------------------------------------
class MockEventSource {
url: string
onopen: (() => void) | null = null
onerror: ((evt: Event) => void) | null = null
onmessage: ((evt: MessageEvent) => void) | null = null
private listeners: Map<string, Array<(evt: Event) => void>> = new Map()
readyState: number = 0
constructor(url: string) {
this.url = url
}
addEventListener(type: string, handler: (evt: Event) => void) {
if (!this.listeners.has(type)) this.listeners.set(type, [])
this.listeners.get(type)!.push(handler)
}
removeEventListener() { /* no-op for tests */ }
close() {
this.readyState = 2
this.onopen = null
this.onerror = null
this.onmessage = null
this.listeners.clear()
}
// Test helpers
_simulateOpen() { this.onopen?.() }
_simulateError() { this.onerror?.(new Event('error')) }
_simulateNamedEvent(type: string, data: string) {
const handlers = this.listeners.get(type)
if (handlers) {
const evt = new MessageEvent(type, { data }) as Event
handlers.forEach((h) => h(evt))
}
}
_simulateMessage(data: string) {
this.onmessage?.(new MessageEvent('message', { data }) as MessageEvent)
}
static readonly CONNECTING = 0
static readonly OPEN = 1
static readonly CLOSED = 2
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
let esInstances: MockEventSource[]
describe('useSSE', () => {
beforeEach(() => {
esInstances = []
// Replace global EventSource with our mock class
Object.defineProperty(globalThis, 'EventSource', {
// The mock must use a class for `new EventSource()` to work
value: class extends MockEventSource {
constructor(url: string) {
super(url)
esInstances.push(this)
}
},
writable: true,
configurable: true,
})
vi.useFakeTimers({ shouldAdvanceTime: true })
})
afterEach(() => {
vi.restoreAllMocks()
vi.useRealTimers()
})
// ── Initial connection ──────────────────────────────────────────────────
it('starts in "connecting" state and creates an EventSource', () => {
const { result } = renderHook(() => useSSE({ url: '/api/events' }))
expect(result.current.status).toBe('connecting')
expect(esInstances.length).toBeGreaterThanOrEqual(1)
expect(esInstances[0].url).toBe('/api/events')
})
it('transitions to "connected" on open', async () => {
const onOpen = vi.fn()
const { result } = renderHook(() => useSSE({ url: '/api/events', onOpen }))
act(() => { esInstances[0]._simulateOpen() })
await waitFor(() => {
expect(result.current.status).toBe('connected')
})
expect(onOpen).toHaveBeenCalledTimes(1)
})
// ── Reconnection with exponential back-off ──────────────────────────────
it('retries after error with exponential back-off', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 30000 }),
)
// First error → reconnecting, retry at 1s
act(() => { esInstances[0]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
expect(esInstances).toHaveLength(1)
// Advance 1000ms → second EventSource created
act(() => { vi.advanceTimersByTime(1000) })
expect(esInstances).toHaveLength(2)
// Second error → reconnecting, retry at 2s
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
act(() => { vi.advanceTimersByTime(2000) })
expect(esInstances).toHaveLength(3)
// Third error → reconnecting, retry at 4s
act(() => { esInstances[2]._simulateError() })
act(() => { vi.advanceTimersByTime(4000) })
expect(esInstances).toHaveLength(4)
})
it('caps reconnect delay at reconnectMaxMs', async () => {
renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 10000 }),
)
// Force 5 errors to push the exponent past the cap
for (let i = 0; i < 5; i++) {
act(() => { esInstances[i]._simulateError() })
const expectedDelay = Math.min(1000 * 2 ** i, 10000)
act(() => { vi.advanceTimersByTime(expectedDelay) })
}
// 6 ES instances created (initial + 5 retries)
expect(esInstances).toHaveLength(6)
})
// ── Circuit-breaker (max retries) ───────────────────────────────────────
it('transitions to "error" after reconnectLimit is exceeded', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 2 }),
)
// First error → reconnecting
act(() => { esInstances[0]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
// Advance → retry
act(() => { vi.advanceTimersByTime(100) })
// Second error → reconnecting (attempt 2, still ≤ limit)
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
act(() => { vi.advanceTimersByTime(200) })
// Third error → limit exceeded (3 > 2) → error
act(() => { esInstances[2]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('error') })
})
it('resets reconnect counter on successful connection', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 3 }),
)
// Two errors then a successful connect
act(() => { esInstances[0]._simulateError() })
act(() => { vi.advanceTimersByTime(100) })
act(() => { esInstances[1]._simulateOpen() })
await waitFor(() => { expect(result.current.status).toBe('connected') })
// Now error again — counter should be reset, so we get fresh attempts
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
expect(result.current.status).toBe('reconnecting')
})
// ── Cleanup on unmount ───────────────────────────────────────────────────
it('closes EventSource on unmount', () => {
const closeSpy = vi.spyOn(MockEventSource.prototype, 'close')
const { unmount } = renderHook(() => useSSE({ url: '/api/events' }))
unmount()
expect(closeSpy).toHaveBeenCalled()
})
it('does not update state after unmount', async () => {
const { result, unmount } = renderHook(() => useSSE({ url: '/api/events' }))
unmount()
// These should be no-ops after unmount (mountedRef guards)
act(() => { esInstances[0]._simulateOpen() })
act(() => { esInstances[0]._simulateError() })
// State should not have changed
expect(result.current.status).toBe('connecting')
})
// ── Event parsing ───────────────────────────────────────────────────────
it('parses valid JSON data into objects', async () => {
const onMessage = vi.fn()
renderHook(() => useSSE({ url: '/api/events', onMessage }))
act(() => {
esInstances[0]._simulateNamedEvent('agent.status', JSON.stringify({ agentId: 'a1', status: 'active' }))
})
expect(onMessage).toHaveBeenCalledWith({
type: 'agent.status',
data: { agentId: 'a1', status: 'active' },
})
})
it('passes invalid JSON through as raw string', async () => {
const onMessage = vi.fn()
renderHook(() => useSSE({ url: '/api/events', onMessage }))
act(() => {
esInstances[0]._simulateNamedEvent('agent.status', 'not valid json {{{')
})
expect(onMessage).toHaveBeenCalledWith({
type: 'agent.status',
data: 'not valid json {{{',
})
})
// ── enabled=false skips connection ──────────────────────────────────────
it('does not create EventSource when enabled=false', () => {
const { result } = renderHook(() => useSSE({ url: '/api/events', enabled: false }))
expect(esInstances).toHaveLength(0)
expect(result.current.status).toBe('connecting')
})
// ── onError callback ────────────────────────────────────────────────────
it('calls onError on connection failure', async () => {
const onError = vi.fn()
renderHook(() =>
useSSE({ url: '/api/events', onError, reconnectBaseMs: 100 }),
)
act(() => { esInstances[0]._simulateError() })
expect(onError).toHaveBeenCalledTimes(1)
})
// ── Default URL ─────────────────────────────────────────────────────────
it('uses /api/events as default URL', () => {
renderHook(() => useSSE())
expect(esInstances[0].url).toBe('/api/events')
})
})

View File

@@ -0,0 +1,180 @@
import { useEffect, useRef, useCallback, useState } from 'react'
/** SSE connection state reported to consumers. */
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
/** Typed SSE event received from the backend. */
export interface SSEMessage {
/** event: field from the SSE frame */
type: string
/** parsed JSON from the data: field */
data: unknown
}
export interface UseSSEOptions {
/** Endpoint URL — defaults to /api/events */
url?: string
/** Called for every SSE message (all event types) */
onMessage?: (msg: SSEMessage) => void
/** Called when connection opens or reconnects */
onOpen?: () => void
/** Called on every connection error (both transient and terminal) */
onError?: (err: Event) => void
/** Base delay in ms before the first reconnect attempt (default 1 000) */
reconnectBaseMs?: number
/** Maximum reconnect delay in ms (default 30 000) */
reconnectMaxMs?: number
/**
* Maximum number of consecutive reconnect attempts before giving up.
* When the limit is reached, status transitions to 'error'.
* Default undefined (unlimited).
*/
reconnectLimit?: number
/** Set false to disable auto-connect (useful in tests) */
enabled?: boolean
}
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
/**
* useSSE — mounts a persistent SSE connection to the Control Center backend.
*
* Handles:
* - Initial connection on mount
* - Exponential back-off reconnection on drop (1s → 2s → 4s … capped at reconnectMaxMs)
* - Circuit-breaker: after reconnectLimit consecutive failures, transitions to 'error'
* - Cleanup on unmount
* - All five event types: agent.status, agent.task, agent.progress, fleet.update, connected
*
* The 'connected' SSE event is an application-level handshake sent by the backend
* after the transport opens. This is distinct from onOpen, which fires at the
* transport level when the EventSource HTTP connection is established.
*/
export function useSSE({
url = '/api/events',
onMessage,
onOpen,
onError,
reconnectBaseMs = 1_000,
reconnectMaxMs = 30_000,
reconnectLimit,
enabled = true,
}: UseSSEOptions = {}): { status: SSEStatus } {
const [status, setStatus] = useState<SSEStatus>('connecting')
// Stable refs so the effect doesn't need to re-run when callbacks change
const onMessageRef = useRef(onMessage)
const onOpenRef = useRef(onOpen)
const onErrorRef = useRef(onError)
onMessageRef.current = onMessage
onOpenRef.current = onOpen
onErrorRef.current = onError
const reconnectAttemptRef = useRef(0)
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const esRef = useRef<EventSource | null>(null)
const mountedRef = useRef(true)
const clearReconnectTimer = useCallback(() => {
if (reconnectTimerRef.current !== null) {
clearTimeout(reconnectTimerRef.current)
reconnectTimerRef.current = null
}
}, [])
const connect = useCallback(() => {
if (!mountedRef.current || !enabled) return
// Clean up any existing connection
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
const es = new EventSource(url)
esRef.current = es
es.onopen = () => {
if (!mountedRef.current) return
reconnectAttemptRef.current = 0
setStatus('connected')
onOpenRef.current?.()
}
es.onerror = (evt) => {
if (!mountedRef.current) return
// EventSource auto-retries but we manage our own to get back-off control
es.close()
esRef.current = null
onErrorRef.current?.(evt)
reconnectAttemptRef.current += 1
// Circuit-breaker: give up after reconnectLimit consecutive failures
if (reconnectLimit !== undefined && reconnectAttemptRef.current > reconnectLimit) {
setStatus('error')
return
}
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
// Note: attempt is 1-based here (already incremented), so we use attempt-1 for the exponent
const delay = Math.min(
reconnectBaseMs * 2 ** (reconnectAttemptRef.current - 1),
reconnectMaxMs,
)
setStatus('reconnecting')
clearReconnectTimer()
reconnectTimerRef.current = setTimeout(() => {
if (mountedRef.current) connect()
}, delay)
}
// Register listeners for all known event types
for (const eventType of SSE_EVENTS) {
es.addEventListener(eventType, (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: eventType, data })
})
}
// Catch-all for unnamed events (type == 'message').
// Won't fire for the named events registered via addEventListener above.
es.onmessage = (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: 'message', data })
}
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, reconnectLimit, clearReconnectTimer])
useEffect(() => {
mountedRef.current = true
if (enabled) connect()
return () => {
mountedRef.current = false
clearReconnectTimer()
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
}
}, [connect, enabled, clearReconnectTimer])
return { status }
}

View File

@@ -4,13 +4,16 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import { BrowserRouter } from 'react-router-dom' import { BrowserRouter } from 'react-router-dom'
import ErrorBoundary from './components/ErrorBoundary' import ErrorBoundary from './components/ErrorBoundary'
import { ThemeProvider } from './hooks/useTheme' import { ThemeProvider } from './hooks/useTheme'
import { SSEProvider } from './contexts/SSEContext'
import './index.css' import './index.css'
import App from './App' import App from './App'
const queryClient = new QueryClient({ const queryClient = new QueryClient({
defaultOptions: { defaultOptions: {
queries: { queries: {
staleTime: 30_000, // No polling — real-time updates come through SSE.
// staleTime is kept high; data is pushed, not pulled.
staleTime: 60_000,
refetchOnWindowFocus: false, refetchOnWindowFocus: false,
retry: 1, retry: 1,
}, },
@@ -22,9 +25,13 @@ createRoot(document.getElementById('root')!).render(
<ErrorBoundary> <ErrorBoundary>
<ThemeProvider> <ThemeProvider>
<QueryClientProvider client={queryClient}> <QueryClientProvider client={queryClient}>
{/* SSEProvider must live inside QueryClientProvider so it can call
useQueryClient() to invalidate caches on incoming events. */}
<SSEProvider>
<BrowserRouter> <BrowserRouter>
<App /> <App />
</BrowserRouter> </BrowserRouter>
</SSEProvider>
</QueryClientProvider> </QueryClientProvider>
</ThemeProvider> </ThemeProvider>
</ErrorBoundary> </ErrorBoundary>

View File

@@ -1,18 +1,36 @@
import { useTheme } from '../hooks/useTheme' import { useTheme } from '../hooks/useTheme'
import { useLocalStorage } from '../hooks/useLocalStorage' import { useLocalStorage } from '../hooks/useLocalStorage'
import { Sun, Moon, Monitor, Zap, Clock } from 'lucide-react' import { useSSEContext } from '../contexts/SSEContext'
import { Sun, Moon, Monitor, Zap, Radio } from 'lucide-react'
const REFRESH_PRESETS = [ const SSE_STATUS_COPY: Record<string, { label: string; description: string; color: string }> = {
{ label: '5s', value: 5_000 }, connected: {
{ label: '10s', value: 10_000 }, label: 'Connected',
{ label: '30s', value: 30_000 }, description: 'Real-time updates are active. Agent status, tasks, and progress stream live.',
{ label: '60s', value: 60_000 }, color: 'text-green-500',
] },
connecting: {
label: 'Connecting…',
description: 'Establishing SSE connection to the backend.',
color: 'text-yellow-500',
},
reconnecting: {
label: 'Reconnecting…',
description: 'Connection lost. Retrying with exponential back-off.',
color: 'text-yellow-500',
},
error: {
label: 'Disconnected',
description: 'Could not connect to the SSE endpoint. Check that the backend is reachable.',
color: 'text-red-500',
},
}
export default function SettingsPage() { export default function SettingsPage() {
const { isDark, toggleTheme } = useTheme() const { isDark, toggleTheme } = useTheme()
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '') const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
const [refreshInterval, setRefreshInterval] = useLocalStorage('cc-refresh-interval', 30_000) const { sseStatus } = useSSEContext()
const sseInfo = SSE_STATUS_COPY[sseStatus] ?? SSE_STATUS_COPY.error
return ( return (
<div className="space-y-8 max-w-2xl"> <div className="space-y-8 max-w-2xl">
@@ -80,45 +98,31 @@ export default function SettingsPage() {
</div> </div>
</section> </section>
{/* Refresh */} {/* Real-time connection status */}
<section className="space-y-4"> <section className="space-y-4">
<h2 className="text-lg font-semibold flex items-center gap-2"> <h2 className="text-lg font-semibold flex items-center gap-2">
<Clock size={20} className="text-primary" /> <Radio size={20} className="text-primary" />
Auto Refresh Real-time Updates
</h2> </h2>
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3"> <div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
<p className="text-sm text-on-surface-variant">Data refresh interval for agent status and logs</p> <div className="flex items-center justify-between">
<div>
<div className="flex flex-col gap-2"> <p className="font-medium">SSE Connection</p>
<input <p className="text-sm text-on-surface-variant mt-0.5">{sseInfo.description}</p>
type="range"
min="0"
max="3"
step="1"
value={REFRESH_PRESETS.findIndex((p) => p.value === refreshInterval)}
onChange={(e) => {
const idx = parseInt(e.target.value)
setRefreshInterval(REFRESH_PRESETS[idx].value)
}}
className="w-full accent-primary"
/>
<div className="flex justify-between text-xs text-on-surface-muted">
{REFRESH_PRESETS.map((p) => (
<button
key={p.label}
onClick={() => setRefreshInterval(p.value)}
className={`px-3 py-1 rounded-lg transition-colors ${
refreshInterval === p.value
? 'bg-primary/10 text-primary'
: 'hover:bg-surface-light'
}`}
>
{p.label}
</button>
))}
</div> </div>
<span className={`text-sm font-semibold whitespace-nowrap ${sseInfo.color}`}>
{sseInfo.label}
</span>
</div> </div>
<p className="text-xs text-on-surface-muted">
Endpoint: <code className="bg-surface-light px-1.5 py-0.5 rounded text-on-surface-variant">/api/events</code>
&nbsp;·&nbsp;Events: agent.status, agent.task, agent.progress, fleet.update
</p>
<p className="text-xs text-on-surface-muted">
Polling is disabled. All status updates are pushed from the server over a persistent SSE connection.
The client reconnects automatically with exponential back-off on drop.
</p>
</div> </div>
</section> </section>
</div> </div>

View File

@@ -0,0 +1,72 @@
/**
* SSE event payload types matching the Go backend (internal/handler/sse.go).
*
* Event format on the wire:
* event: <eventType>
* data: <json>
*
* The types below define the backend contract. The SSEPayloadMap maps
* each event type string to its expected payload shape. SSEMessage is a
* discriminated union on `type` — when you switch on msg.type, TypeScript
* narrows msg.data to the correct payload interface automatically.
*/
import type { AgentStatus } from '../types'
/** agent.status — agent came online, went offline, changed state */
export interface AgentStatusEvent {
agentId: string
status: AgentStatus
/** Optional human-readable reason (e.g. error message) */
reason?: string
}
/** agent.task — a task was assigned to or completed by an agent */
export interface AgentTaskEvent {
agentId: string
taskId: string
title: string
action: 'assigned' | 'completed' | 'failed'
}
/** agent.progress — incremental progress update for a running task */
export interface AgentProgressEvent {
agentId: string
taskId: string
progress: number
/** Optional description of what is currently happening */
message?: string
}
/**
* fleet.update — bulk refresh of all agents (e.g. after a deployment).
* The backend may send partial or complete agent state.
*/
export interface FleetUpdateEvent {
/** ISO timestamp of when the snapshot was taken */
timestamp: string
/** Number of agents in the fleet */
agentCount: number
}
/** Union of all SSE data payloads keyed by event type. */
export type SSEPayloadMap = {
'agent.status': AgentStatusEvent
'agent.task': AgentTaskEvent
'agent.progress': AgentProgressEvent
'fleet.update': FleetUpdateEvent
connected: { clientCount: number }
message: unknown
}
/**
* Discriminated SSE message — the `type` field narrows `data` via SSEPayloadMap.
*
* Usage:
* if (msg.type === 'agent.status') {
* msg.data.agentId // ✅ TypeScript knows this is AgentStatusEvent
* }
*/
export type SSEMessage = {
[K in keyof SSEPayloadMap]: { type: K; data: SSEPayloadMap[K] }
}[keyof SSEPayloadMap]

View File

@@ -0,0 +1 @@
import '@testing-library/jest-dom'

View File

@@ -1,4 +1,4 @@
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' | 'offline'
export interface Agent { export interface Agent {
id: string id: string

View File

@@ -4,7 +4,7 @@
"target": "es2023", "target": "es2023",
"lib": ["ES2023", "DOM"], "lib": ["ES2023", "DOM"],
"module": "esnext", "module": "esnext",
"types": ["vite/client"], "types": ["vite/client", "vitest/globals"],
"skipLibCheck": true, "skipLibCheck": true,
/* Bundler mode */ /* Bundler mode */

View File

@@ -20,5 +20,5 @@
"erasableSyntaxOnly": true, "erasableSyntaxOnly": true,
"noFallthroughCasesInSwitch": true "noFallthroughCasesInSwitch": true
}, },
"include": ["vite.config.ts"] "include": ["vite.config.ts", "vitest.config.ts"]
} }

11
frontend/vitest.config.ts Normal file
View File

@@ -0,0 +1,11 @@
import { defineConfig } from 'vitest/config'
import react from '@vitejs/plugin-react'
export default defineConfig({
plugins: [react()],
test: {
environment: 'jsdom',
globals: true,
setupFiles: ['./src/test-setup.ts'],
},
})

View File

@@ -0,0 +1,516 @@
package gateway
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"testing"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
// ── Mock AgentRepo ────────────────────────────────────────────────────────
type mockAgentRepo struct {
mu sync.Mutex
agents map[string]models.AgentCardData
updateCalls []updateCall
}
type updateCall struct {
id string
req models.UpdateAgentRequest
}
func (m *mockAgentRepo) Get(_ context.Context, id string) (models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[id]
if !ok {
return models.AgentCardData{}, errNotFound
}
return a, nil
}
func (m *mockAgentRepo) Update(_ context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[id]
if !ok {
return models.AgentCardData{}, errNotFound
}
if req.Status != nil {
a.Status = *req.Status
}
if req.DisplayName != nil {
a.DisplayName = *req.DisplayName
}
if req.Role != nil {
a.Role = *req.Role
}
if req.Channel != nil {
a.Channel = *req.Channel
}
if req.CurrentTask != nil {
a.CurrentTask = req.CurrentTask
}
if req.TaskProgress != nil {
a.TaskProgress = req.TaskProgress
}
if req.TaskElapsed != nil {
a.TaskElapsed = req.TaskElapsed
}
if req.ErrorMessage != nil {
a.ErrorMessage = req.ErrorMessage
}
if req.LastActivityAt != nil {
a.LastActivity = *req.LastActivityAt
}
m.agents[id] = a
m.updateCalls = append(m.updateCalls, updateCall{id, req})
return a, nil
}
func (m *mockAgentRepo) Create(_ context.Context, a models.AgentCardData) error {
m.mu.Lock()
defer m.mu.Unlock()
m.agents[a.ID] = a
return nil
}
func (m *mockAgentRepo) List(_ context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
var result []models.AgentCardData
for _, a := range m.agents {
if statusFilter == "" || a.Status == statusFilter {
result = append(result, a)
}
}
return result, nil
}
func (m *mockAgentRepo) Delete(_ context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.agents, id)
return nil
}
func (m *mockAgentRepo) Count(_ context.Context) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.agents), nil
}
// errNotFound is returned by the mock repo when an agent is not found.
var errNotFound = fmt.Errorf("not found")
// ── Broadcast capture helper ───────────────────────────────────────────────
// broadcastCapture wraps a real Broker and captures all broadcasts
// via a subscribed channel. Use captured() to retrieve events that have
// been received so far. Call close() to unsubscribe when done.
type broadcastCapture struct {
broker *handler.Broker
ch chan handler.SSEEvent
}
func newBroadcastCapture(broker *handler.Broker) *broadcastCapture {
return &broadcastCapture{
broker: broker,
ch: broker.Subscribe(),
}
}
// captured drains all pending events from the subscription channel
// and returns them. This is synchronous — it only returns events that
// have already been sent to the channel.
func (bc *broadcastCapture) captured() []handler.SSEEvent {
var events []handler.SSEEvent
for {
select {
case evt := <-bc.ch:
events = append(events, evt)
default:
return events
}
}
}
func (bc *broadcastCapture) close() {
bc.broker.Unsubscribe(bc.ch)
}
// ── Test helpers ──────────────────────────────────────────────────────────
// newTestWSClient creates a WSClient wired to a mock repo and a real broker.
// Returns the client, the mock repo, and a broadcast capture.
func newTestWSClient() (*WSClient, *mockAgentRepo, *handler.Broker, *broadcastCapture) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
client := NewWSClient(WSConfig{}, repo, broker, slog.Default())
return client, repo, broker, capture
}
// ── Tests ─────────────────────────────────────────────────────────────────
func TestHandleSessionsChanged_Active(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Status: models.AgentStatusIdle,
}
payload := json.RawMessage(`{
"sessionKey": "s1",
"agentId": "otto",
"status": "running",
"totalTokens": 500,
"currentTask": "Orchestrating tasks"
}`)
client.handleSessionsChanged(payload)
// Verify: agent status updated to active
repo.mu.Lock()
agent := repo.agents["otto"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
if agent.Status != models.AgentStatusActive {
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusActive)
}
// Verify: update was called
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
if calls[0].id != "otto" {
t.Errorf("update call agentId = %q, want %q", calls[0].id, "otto")
}
// Verify: broker broadcast "agent.status"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandleSessionsChanged_Idle(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["dex"] = models.AgentCardData{
ID: "dex",
DisplayName: "Dex",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Writing API"),
}
payload := json.RawMessage(`{
"sessionKey": "s2",
"agentId": "dex",
"status": "done",
"totalTokens": 1000
}`)
client.handleSessionsChanged(payload)
repo.mu.Lock()
agent := repo.agents["dex"]
repo.mu.Unlock()
// Verify: agent goes idle
if agent.Status != models.AgentStatusIdle {
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusIdle)
}
// Verify: current task cleared (set to empty string)
if agent.CurrentTask != nil && *agent.CurrentTask != "" {
t.Errorf("current task = %q, want empty (cleared on idle)", *agent.CurrentTask)
}
// Verify: broker fires "agent.status"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandleSessionsChanged_ArrayPayload(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusIdle}
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
payload := json.RawMessage(`[
{"sessionKey":"s1","agentId":"otto","status":"running","totalTokens":100},
{"sessionKey":"s2","agentId":"dex","status":"streaming","totalTokens":200}
]`)
client.handleSessionsChanged(payload)
repo.mu.Lock()
otto := repo.agents["otto"]
dex := repo.agents["dex"]
repo.mu.Unlock()
if otto.Status != models.AgentStatusActive {
t.Errorf("otto status = %q, want active", otto.Status)
}
if dex.Status != models.AgentStatusActive {
t.Errorf("dex status = %q, want active", dex.Status)
}
// Both should produce broadcasts
events := capture.captured()
statusCount := 0
for _, evt := range events {
if evt.EventType == "agent.status" {
statusCount++
}
}
if statusCount < 2 {
t.Errorf("expected at least 2 agent.status broadcasts, got %d", statusCount)
}
}
func TestHandleSessionsChanged_SkipsEmptyAgentID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"sessionKey":"s1","agentId":"","status":"running"}`)
client.handleSessionsChanged(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
}
}
func TestHandleSessionsChanged_UnparseablePayload(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`not json at all`)
client.handleSessionsChanged(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for unparseable payload, got %d", len(events))
}
}
func TestHandlePresence(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["pip"] = models.AgentCardData{
ID: "pip",
DisplayName: "Pip",
Status: models.AgentStatusActive,
}
payload := json.RawMessage(`{
"agentId": "pip",
"connected": true,
"lastActivityAt": "2025-01-01T00:00:00Z"
}`)
client.handlePresence(payload)
repo.mu.Lock()
agent := repo.agents["pip"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
// Agent should still be active (connected=true doesn't change status)
if agent.Status != models.AgentStatusActive {
t.Errorf("agent status = %q, want active", agent.Status)
}
// Update should have been called (for lastActivityAt)
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
// Verify broadcast
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandlePresence_Disconnect(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["pip"] = models.AgentCardData{
ID: "pip",
DisplayName: "Pip",
Status: models.AgentStatusActive,
}
payload := json.RawMessage(`{
"agentId": "pip",
"connected": false
}`)
client.handlePresence(payload)
repo.mu.Lock()
agent := repo.agents["pip"]
repo.mu.Unlock()
// Agent should go idle on disconnect
if agent.Status != models.AgentStatusIdle {
t.Errorf("agent status = %q, want idle after disconnect", agent.Status)
}
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status' on disconnect")
}
}
func TestHandlePresence_EmptyAgentID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"agentId":"","connected":true}`)
client.handlePresence(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
}
}
func TestHandleAgentConfig(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["rex"] = models.AgentCardData{
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
Channel: "discord",
}
payload := json.RawMessage(`{
"id": "rex",
"name": "Rex the Dev",
"role": "Senior Frontend",
"channel": "telegram"
}`)
client.handleAgentConfig(payload)
repo.mu.Lock()
agent := repo.agents["rex"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
// Verify DisplayName and Role updated
if agent.DisplayName != "Rex the Dev" {
t.Errorf("displayName = %q, want %q", agent.DisplayName, "Rex the Dev")
}
if agent.Role != "Senior Frontend" {
t.Errorf("role = %q, want %q", agent.Role, "Senior Frontend")
}
if agent.Channel != "telegram" {
t.Errorf("channel = %q, want %q", agent.Channel, "telegram")
}
// Verify update was called
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
// Verify broker fires "fleet.update"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "fleet.update" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'fleet.update'")
}
}
func TestHandleAgentConfig_EmptyID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"id":"","name":"Ghost"}`)
client.handleAgentConfig(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty id, got %d", len(events))
}
}
func TestHandleAgentConfig_NotFound(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"id":"unknown","name":"Ghost","role":"Phantom"}`)
client.handleAgentConfig(payload)
// Agent doesn't exist in repo, so Update will fail → handler logs warning, returns early
events := capture.captured()
for _, evt := range events {
if evt.EventType == "fleet.update" {
t.Error("fleet.update should not be broadcast when agent update fails")
}
}
}

View File

@@ -0,0 +1,236 @@
package gateway
import (
"context"
"testing"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
func TestInitialSync(t *testing.T) {
_ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// --- Test agentItemToCard + session merge (the core of initialSync) ---
agentItems := []agentListItem{
{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"},
{ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"},
}
sessionItems := []sessionListItem{
{SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"},
{SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"},
}
// Build sessionByAgent map (mirrors initialSync logic)
sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems {
if s.AgentID != "" {
sessionByAgent[s.AgentID] = s
}
}
// Merge and verify
merged := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems {
card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok {
card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt
if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100)
card.TaskProgress = &prog
}
}
merged = append(merged, card)
}
// Verify otto: running → active
if merged[0].ID != "otto" {
t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto")
}
if merged[0].Status != models.AgentStatusActive {
t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive)
}
if merged[0].SessionKey != "s1" {
t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1")
}
if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 {
t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress)
}
// Verify dex: done → idle
if merged[1].ID != "dex" {
t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex")
}
if merged[1].Status != models.AgentStatusIdle {
t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle)
}
if merged[1].SessionKey != "s2" {
t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2")
}
}
func TestInitialSync_PersistCreatesNew(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Simulate the persist logic from initialSync:
// new agents should be created
card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"})
ctx := context.Background()
// Agent doesn't exist → create
_, err := repo.Get(ctx, card.ID)
if err == nil {
t.Fatal("expected agent to not exist yet")
}
if err := repo.Create(ctx, card); err != nil {
t.Fatalf("Create failed: %v", err)
}
got, err := repo.Get(ctx, card.ID)
if err != nil {
t.Fatalf("Get after Create failed: %v", err)
}
if got.ID != "otto" {
t.Errorf("got.ID = %q, want %q", got.ID, "otto")
}
if got.DisplayName != "Otto" {
t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto")
}
if got.Role != "Orchestrator" {
t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator")
}
}
func TestInitialSync_PersistUpdatesExisting(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
ctx := context.Background()
// Pre-populate with existing agent
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Role: "Old Role",
Status: models.AgentStatusIdle,
}
// Simulate initialSync: agent exists, name/role changed → update
newName := "Otto Prime"
newRole := "Super Orchestrator"
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
DisplayName: &newName,
Role: &newRole,
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
got, err := repo.Get(ctx, "otto")
if err != nil {
t.Fatalf("Get after Update failed: %v", err)
}
if got.DisplayName != "Otto Prime" {
t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime")
}
if got.Role != "Super Orchestrator" {
t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator")
}
}
func TestInitialSync_MergesSessionStatus(t *testing.T) {
// When initialSync merges session state, an agent whose existing status
// differs from the session-derived status should be updated.
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
ctx := context.Background()
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusIdle,
}
// Simulate session merge: session says "running" → agent should go active
activeStatus := mapSessionStatus("running")
if activeStatus != models.AgentStatusActive {
t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus)
}
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
Status: &activeStatus,
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
got, err := repo.Get(ctx, "otto")
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if got.Status != models.AgentStatusActive {
t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive)
}
}
func TestInitialSync_BroadcastsFleet(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Create some agents in the repo
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive}
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
// Simulate the final broadcast from initialSync
mergedAgents := []models.AgentCardData{
repo.agents["otto"],
repo.agents["dex"],
}
broker.Broadcast("fleet.update", mergedAgents)
events := capture.captured()
if len(events) == 0 {
t.Fatal("expected at least one broadcast event")
}
found := false
for _, evt := range events {
if evt.EventType == "fleet.update" {
found = true
// Verify data is the merged agents list
agents, ok := evt.Data.([]models.AgentCardData)
if !ok {
t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data)
}
if len(agents) != 2 {
t.Errorf("fleet.update agents count = %d, want 2", len(agents))
}
break
}
}
if !found {
t.Error("expected fleet.update broadcast event")
}
}