CUB-125: implement real-time SSE/WebSocket in React frontend #46

Merged
overseer merged 2 commits from agent/rex/CUB-125-realtime-sse-clean into dev 2026-05-20 13:14:30 -04:00
12 changed files with 1606 additions and 15 deletions
Showing only changes of commit 010408cc45 - Show all commits

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,9 @@
"dev": "vite", "dev": "vite",
"build": "tsc -b && vite build", "build": "tsc -b && vite build",
"lint": "eslint .", "lint": "eslint .",
"preview": "vite preview" "preview": "vite preview",
"test": "vitest run",
"test:watch": "vitest"
}, },
"dependencies": { "dependencies": {
"@tanstack/react-query": "^5.100.9", "@tanstack/react-query": "^5.100.9",
@@ -20,6 +22,8 @@
"devDependencies": { "devDependencies": {
"@eslint/js": "^10.0.1", "@eslint/js": "^10.0.1",
"@tailwindcss/vite": "^4.2.4", "@tailwindcss/vite": "^4.2.4",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.2",
"@types/node": "^24.12.2", "@types/node": "^24.12.2",
"@types/react": "^19.2.14", "@types/react": "^19.2.14",
"@types/react-dom": "^19.2.3", "@types/react-dom": "^19.2.3",
@@ -29,10 +33,12 @@
"eslint-plugin-react-hooks": "^7.1.1", "eslint-plugin-react-hooks": "^7.1.1",
"eslint-plugin-react-refresh": "^0.5.2", "eslint-plugin-react-refresh": "^0.5.2",
"globals": "^17.5.0", "globals": "^17.5.0",
"jsdom": "^29.1.1",
"postcss": "^8.5.14", "postcss": "^8.5.14",
"tailwindcss": "^4.2.4", "tailwindcss": "^4.2.4",
"typescript": "~6.0.2", "typescript": "~6.0.2",
"typescript-eslint": "^8.58.2", "typescript-eslint": "^8.58.2",
"vite": "^8.0.10" "vite": "^8.0.10",
"vitest": "^4.1.7"
} }
} }

View File

@@ -0,0 +1,129 @@
/**
* Tests for useRealtimeSync — event → query invalidation mapping.
*
* Uses .tsx extension so Vite/OXC can parse JSX in the wrapper component.
*/
import { renderHook } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import * as useSSEModule from './useSSE'
import { useRealtimeSync } from './useRealtimeSync'
import React from 'react'
import type { SSEMessage } from '../services/sse'
describe('useRealtimeSync', () => {
let queryClient: QueryClient
let mockSSEOnMessage: ((msg: { type: string; data: unknown }) => void) | null = null
beforeEach(() => {
queryClient = new QueryClient({
defaultOptions: { queries: { retry: false } },
})
mockSSEOnMessage = null
// Spy on useSSE to capture the onMessage callback
vi.spyOn(useSSEModule, 'useSSE').mockImplementation((opts) => {
mockSSEOnMessage = opts?.onMessage ?? null
return { status: 'connected' }
})
})
afterEach(() => {
vi.restoreAllMocks()
})
function render() {
return renderHook(() => useRealtimeSync(), {
wrapper: ({ children }: { children: React.ReactNode }) => (
React.createElement(QueryClientProvider, { client: queryClient }, children)
),
})
}
it('invalidates ["agents"] on agent.status event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.status',
data: { agentId: 'a1', status: 'active' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(1)
})
it('invalidates ["tasks"] and ["agents"] on agent.task event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.task',
data: { agentId: 'a1', taskId: 't1', title: 'Test', action: 'assigned' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(2)
})
it('invalidates ["tasks"] and ["agents"] on agent.progress event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.progress',
data: { agentId: 'a1', taskId: 't1', progress: 50, message: 'working' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(2)
})
it('invalidates ["agents"], ["sessions"], ["tasks"] on fleet.update event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'fleet.update',
data: { timestamp: '2026-05-20T12:00:00Z', agentCount: 5 },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['sessions'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledTimes(3)
})
it('does nothing on connected event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'connected',
data: { clientCount: 1 },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).not.toHaveBeenCalled()
})
it('does nothing on unknown event types', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
mockSSEOnMessage!({ type: 'unknown.event', data: {} })
expect(invalidateSpy).not.toHaveBeenCalled()
})
it('returns sseStatus from useSSE', () => {
const { result } = render()
expect(result.current.sseStatus).toBe('connected')
})
})

View File

@@ -10,29 +10,41 @@
*/ */
import { useQueryClient } from '@tanstack/react-query' import { useQueryClient } from '@tanstack/react-query'
import { useCallback } from 'react' import { useCallback } from 'react'
import { useSSE, type SSEMessage, type SSEStatus } from './useSSE' import { useSSE, type SSEStatus } from './useSSE'
import type { SSEMessage } from '../services/sse'
export function useRealtimeSync(): { sseStatus: SSEStatus } { export function useRealtimeSync(): { sseStatus: SSEStatus } {
const queryClient = useQueryClient() const queryClient = useQueryClient()
const handleMessage = useCallback( const handleMessage = useCallback(
(msg: SSEMessage) => { (raw: { type: string; data: unknown }) => {
// Cast to discriminated union — the backend contract guarantees these shapes
const msg = raw as SSEMessage
switch (msg.type) { switch (msg.type) {
case 'agent.status': case 'agent.status':
// msg.data: AgentStatusEvent { agentId, status, reason? }
void msg.data.agentId // retained for type-narrowing — ensures payload matches contract
queryClient.invalidateQueries({ queryKey: ['agents'] }) queryClient.invalidateQueries({ queryKey: ['agents'] })
break break
case 'agent.task': case 'agent.task':
// msg.data: AgentTaskEvent { agentId, taskId, title, action }
void msg.data.agentId
queryClient.invalidateQueries({ queryKey: ['tasks'] }) queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] }) queryClient.invalidateQueries({ queryKey: ['agents'] })
break break
case 'agent.progress': case 'agent.progress':
// msg.data: AgentProgressEvent { agentId, taskId, progress, message? }
void msg.data.agentId
queryClient.invalidateQueries({ queryKey: ['tasks'] }) queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] }) queryClient.invalidateQueries({ queryKey: ['agents'] })
break break
case 'fleet.update': case 'fleet.update':
// msg.data: FleetUpdateEvent { timestamp, agentCount }
void msg.data.agentCount
queryClient.invalidateQueries({ queryKey: ['agents'] }) queryClient.invalidateQueries({ queryKey: ['agents'] })
queryClient.invalidateQueries({ queryKey: ['sessions'] }) queryClient.invalidateQueries({ queryKey: ['sessions'] })
queryClient.invalidateQueries({ queryKey: ['tasks'] }) queryClient.invalidateQueries({ queryKey: ['tasks'] })

View File

@@ -0,0 +1,267 @@
/**
* Tests for useSSE — SSE connection lifecycle, back-off, event parsing, and cleanup.
*
* jsdom does not include EventSource, so we mock it completely.
*/
import { renderHook, act, waitFor } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { useSSE } from './useSSE'
// ---------------------------------------------------------------------------
// Mock EventSource — defined as a plain class so `new EventSource()` works
// ---------------------------------------------------------------------------
class MockEventSource {
url: string
onopen: (() => void) | null = null
onerror: ((evt: Event) => void) | null = null
onmessage: ((evt: MessageEvent) => void) | null = null
private listeners: Map<string, Array<(evt: Event) => void>> = new Map()
readyState: number = 0
constructor(url: string) {
this.url = url
}
addEventListener(type: string, handler: (evt: Event) => void) {
if (!this.listeners.has(type)) this.listeners.set(type, [])
this.listeners.get(type)!.push(handler)
}
removeEventListener() { /* no-op for tests */ }
close() {
this.readyState = 2
this.onopen = null
this.onerror = null
this.onmessage = null
this.listeners.clear()
}
// Test helpers
_simulateOpen() { this.onopen?.() }
_simulateError() { this.onerror?.(new Event('error')) }
_simulateNamedEvent(type: string, data: string) {
const handlers = this.listeners.get(type)
if (handlers) {
const evt = new MessageEvent(type, { data }) as Event
handlers.forEach((h) => h(evt))
}
}
_simulateMessage(data: string) {
this.onmessage?.(new MessageEvent('message', { data }) as MessageEvent)
}
static readonly CONNECTING = 0
static readonly OPEN = 1
static readonly CLOSED = 2
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
let esInstances: MockEventSource[]
describe('useSSE', () => {
beforeEach(() => {
esInstances = []
// Replace global EventSource with our mock class
Object.defineProperty(globalThis, 'EventSource', {
// The mock must use a class for `new EventSource()` to work
value: class extends MockEventSource {
constructor(url: string) {
super(url)
esInstances.push(this)
}
},
writable: true,
configurable: true,
})
vi.useFakeTimers({ shouldAdvanceTime: true })
})
afterEach(() => {
vi.restoreAllMocks()
vi.useRealTimers()
})
// ── Initial connection ──────────────────────────────────────────────────
it('starts in "connecting" state and creates an EventSource', () => {
const { result } = renderHook(() => useSSE({ url: '/api/events' }))
expect(result.current.status).toBe('connecting')
expect(esInstances.length).toBeGreaterThanOrEqual(1)
expect(esInstances[0].url).toBe('/api/events')
})
it('transitions to "connected" on open', async () => {
const onOpen = vi.fn()
const { result } = renderHook(() => useSSE({ url: '/api/events', onOpen }))
act(() => { esInstances[0]._simulateOpen() })
await waitFor(() => {
expect(result.current.status).toBe('connected')
})
expect(onOpen).toHaveBeenCalledTimes(1)
})
// ── Reconnection with exponential back-off ──────────────────────────────
it('retries after error with exponential back-off', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 30000 }),
)
// First error → reconnecting, retry at 1s
act(() => { esInstances[0]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
expect(esInstances).toHaveLength(1)
// Advance 1000ms → second EventSource created
act(() => { vi.advanceTimersByTime(1000) })
expect(esInstances).toHaveLength(2)
// Second error → reconnecting, retry at 2s
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
act(() => { vi.advanceTimersByTime(2000) })
expect(esInstances).toHaveLength(3)
// Third error → reconnecting, retry at 4s
act(() => { esInstances[2]._simulateError() })
act(() => { vi.advanceTimersByTime(4000) })
expect(esInstances).toHaveLength(4)
})
it('caps reconnect delay at reconnectMaxMs', async () => {
renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 10000 }),
)
// Force 5 errors to push the exponent past the cap
for (let i = 0; i < 5; i++) {
act(() => { esInstances[i]._simulateError() })
const expectedDelay = Math.min(1000 * 2 ** i, 10000)
act(() => { vi.advanceTimersByTime(expectedDelay) })
}
// 6 ES instances created (initial + 5 retries)
expect(esInstances).toHaveLength(6)
})
// ── Circuit-breaker (max retries) ───────────────────────────────────────
it('transitions to "error" after reconnectLimit is exceeded', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 2 }),
)
// First error → reconnecting
act(() => { esInstances[0]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
// Advance → retry
act(() => { vi.advanceTimersByTime(100) })
// Second error → reconnecting (attempt 2, still ≤ limit)
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
act(() => { vi.advanceTimersByTime(200) })
// Third error → limit exceeded (3 > 2) → error
act(() => { esInstances[2]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('error') })
})
it('resets reconnect counter on successful connection', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 3 }),
)
// Two errors then a successful connect
act(() => { esInstances[0]._simulateError() })
act(() => { vi.advanceTimersByTime(100) })
act(() => { esInstances[1]._simulateOpen() })
await waitFor(() => { expect(result.current.status).toBe('connected') })
// Now error again — counter should be reset, so we get fresh attempts
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
expect(result.current.status).toBe('reconnecting')
})
// ── Cleanup on unmount ───────────────────────────────────────────────────
it('closes EventSource on unmount', () => {
const closeSpy = vi.spyOn(MockEventSource.prototype, 'close')
const { unmount } = renderHook(() => useSSE({ url: '/api/events' }))
unmount()
expect(closeSpy).toHaveBeenCalled()
})
it('does not update state after unmount', async () => {
const { result, unmount } = renderHook(() => useSSE({ url: '/api/events' }))
unmount()
// These should be no-ops after unmount (mountedRef guards)
act(() => { esInstances[0]._simulateOpen() })
act(() => { esInstances[0]._simulateError() })
// State should not have changed
expect(result.current.status).toBe('connecting')
})
// ── Event parsing ───────────────────────────────────────────────────────
it('parses valid JSON data into objects', async () => {
const onMessage = vi.fn()
renderHook(() => useSSE({ url: '/api/events', onMessage }))
act(() => {
esInstances[0]._simulateNamedEvent('agent.status', JSON.stringify({ agentId: 'a1', status: 'active' }))
})
expect(onMessage).toHaveBeenCalledWith({
type: 'agent.status',
data: { agentId: 'a1', status: 'active' },
})
})
it('passes invalid JSON through as raw string', async () => {
const onMessage = vi.fn()
renderHook(() => useSSE({ url: '/api/events', onMessage }))
act(() => {
esInstances[0]._simulateNamedEvent('agent.status', 'not valid json {{{')
})
expect(onMessage).toHaveBeenCalledWith({
type: 'agent.status',
data: 'not valid json {{{',
})
})
// ── enabled=false skips connection ──────────────────────────────────────
it('does not create EventSource when enabled=false', () => {
const { result } = renderHook(() => useSSE({ url: '/api/events', enabled: false }))
expect(esInstances).toHaveLength(0)
expect(result.current.status).toBe('connecting')
})
// ── onError callback ────────────────────────────────────────────────────
it('calls onError on connection failure', async () => {
const onError = vi.fn()
renderHook(() =>
useSSE({ url: '/api/events', onError, reconnectBaseMs: 100 }),
)
act(() => { esInstances[0]._simulateError() })
expect(onError).toHaveBeenCalledTimes(1)
})
// ── Default URL ─────────────────────────────────────────────────────────
it('uses /api/events as default URL', () => {
renderHook(() => useSSE())
expect(esInstances[0].url).toBe('/api/events')
})
})

View File

@@ -18,12 +18,18 @@ export interface UseSSEOptions {
onMessage?: (msg: SSEMessage) => void onMessage?: (msg: SSEMessage) => void
/** Called when connection opens or reconnects */ /** Called when connection opens or reconnects */
onOpen?: () => void onOpen?: () => void
/** Called on unrecoverable error */ /** Called on every connection error (both transient and terminal) */
onError?: (err: Event) => void onError?: (err: Event) => void
/** Base delay in ms before the first reconnect attempt (default 1 000) */ /** Base delay in ms before the first reconnect attempt (default 1 000) */
reconnectBaseMs?: number reconnectBaseMs?: number
/** Maximum reconnect delay in ms (default 30 000) */ /** Maximum reconnect delay in ms (default 30 000) */
reconnectMaxMs?: number reconnectMaxMs?: number
/**
* Maximum number of consecutive reconnect attempts before giving up.
* When the limit is reached, status transitions to 'error'.
* Default undefined (unlimited).
*/
reconnectLimit?: number
/** Set false to disable auto-connect (useful in tests) */ /** Set false to disable auto-connect (useful in tests) */
enabled?: boolean enabled?: boolean
} }
@@ -35,9 +41,14 @@ const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.updat
* *
* Handles: * Handles:
* - Initial connection on mount * - Initial connection on mount
* - Exponential back-off reconnection on drop * - Exponential back-off reconnection on drop (1s → 2s → 4s … capped at reconnectMaxMs)
* - Circuit-breaker: after reconnectLimit consecutive failures, transitions to 'error'
* - Cleanup on unmount * - Cleanup on unmount
* - All four event types: agent.status, agent.task, agent.progress, fleet.update * - All five event types: agent.status, agent.task, agent.progress, fleet.update, connected
*
* The 'connected' SSE event is an application-level handshake sent by the backend
* after the transport opens. This is distinct from onOpen, which fires at the
* transport level when the EventSource HTTP connection is established.
*/ */
export function useSSE({ export function useSSE({
url = '/api/events', url = '/api/events',
@@ -46,6 +57,7 @@ export function useSSE({
onError, onError,
reconnectBaseMs = 1_000, reconnectBaseMs = 1_000,
reconnectMaxMs = 30_000, reconnectMaxMs = 30_000,
reconnectLimit,
enabled = true, enabled = true,
}: UseSSEOptions = {}): { status: SSEStatus } { }: UseSSEOptions = {}): { status: SSEStatus } {
const [status, setStatus] = useState<SSEStatus>('connecting') const [status, setStatus] = useState<SSEStatus>('connecting')
@@ -100,12 +112,20 @@ export function useSSE({
onErrorRef.current?.(evt) onErrorRef.current?.(evt)
reconnectAttemptRef.current += 1
// Circuit-breaker: give up after reconnectLimit consecutive failures
if (reconnectLimit !== undefined && reconnectAttemptRef.current > reconnectLimit) {
setStatus('error')
return
}
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs // Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
// Note: attempt is 1-based here (already incremented), so we use attempt-1 for the exponent
const delay = Math.min( const delay = Math.min(
reconnectBaseMs * 2 ** reconnectAttemptRef.current, reconnectBaseMs * 2 ** (reconnectAttemptRef.current - 1),
reconnectMaxMs, reconnectMaxMs,
) )
reconnectAttemptRef.current += 1
setStatus('reconnecting') setStatus('reconnecting')
clearReconnectTimer() clearReconnectTimer()
@@ -128,7 +148,8 @@ export function useSSE({
}) })
} }
// Catch-all for unnamed events (type == 'message') // Catch-all for unnamed events (type == 'message').
// Won't fire for the named events registered via addEventListener above.
es.onmessage = (evt: MessageEvent) => { es.onmessage = (evt: MessageEvent) => {
if (!mountedRef.current) return if (!mountedRef.current) return
let data: unknown = evt.data let data: unknown = evt.data
@@ -139,7 +160,7 @@ export function useSSE({
} }
onMessageRef.current?.({ type: 'message', data }) onMessageRef.current?.({ type: 'message', data })
} }
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, clearReconnectTimer]) }, [url, enabled, reconnectBaseMs, reconnectMaxMs, reconnectLimit, clearReconnectTimer])
useEffect(() => { useEffect(() => {
mountedRef.current = true mountedRef.current = true

View File

@@ -4,6 +4,11 @@
* Event format on the wire: * Event format on the wire:
* event: <eventType> * event: <eventType>
* data: <json> * data: <json>
*
* The types below define the backend contract. The SSEPayloadMap maps
* each event type string to its expected payload shape. SSEMessage is a
* discriminated union on `type` — when you switch on msg.type, TypeScript
* narrows msg.data to the correct payload interface automatically.
*/ */
import type { AgentStatus } from '../types' import type { AgentStatus } from '../types'
@@ -53,3 +58,15 @@ export type SSEPayloadMap = {
connected: { clientCount: number } connected: { clientCount: number }
message: unknown message: unknown
} }
/**
* Discriminated SSE message — the `type` field narrows `data` via SSEPayloadMap.
*
* Usage:
* if (msg.type === 'agent.status') {
* msg.data.agentId // ✅ TypeScript knows this is AgentStatusEvent
* }
*/
export type SSEMessage = {
[K in keyof SSEPayloadMap]: { type: K; data: SSEPayloadMap[K] }
}[keyof SSEPayloadMap]

View File

@@ -0,0 +1 @@
import '@testing-library/jest-dom'

View File

@@ -1,4 +1,4 @@
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' | 'offline'
export interface Agent { export interface Agent {
id: string id: string

View File

@@ -4,7 +4,7 @@
"target": "es2023", "target": "es2023",
"lib": ["ES2023", "DOM"], "lib": ["ES2023", "DOM"],
"module": "esnext", "module": "esnext",
"types": ["vite/client"], "types": ["vite/client", "vitest/globals"],
"skipLibCheck": true, "skipLibCheck": true,
/* Bundler mode */ /* Bundler mode */

View File

@@ -20,5 +20,5 @@
"erasableSyntaxOnly": true, "erasableSyntaxOnly": true,
"noFallthroughCasesInSwitch": true "noFallthroughCasesInSwitch": true
}, },
"include": ["vite.config.ts"] "include": ["vite.config.ts", "vitest.config.ts"]
} }

11
frontend/vitest.config.ts Normal file
View File

@@ -0,0 +1,11 @@
import { defineConfig } from 'vitest/config'
import react from '@vitejs/plugin-react'
export default defineConfig({
plugins: [react()],
test: {
environment: 'jsdom',
globals: true,
setupFiles: ['./src/test-setup.ts'],
},
})