Compare commits

...

1 Commits

Author SHA1 Message Date
5147997764 CUB-125: implement real-time SSE/WebSocket in React frontend
Some checks failed
Dev Build / build-test (pull_request) Failing after 58s
- Add useSSE hook with exponential back-off reconnect (1s → 30s)
- Add useRealtimeSync hook: maps SSE events to React Query invalidation
  (agent.status → agents; agent.task/agent.progress → tasks+agents; fleet.update → all)
- Add SSEContext/SSEProvider so connection status is available app-wide
- Mount SSEProvider in main.tsx inside QueryClientProvider (no polling)
- Show live/connecting/reconnecting/disconnected badge in sidebar + mobile header
- Update SettingsPage: replace polling interval UI with SSE status panel
- Disable React Query polling (staleTime 60s); all updates pushed via SSE

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 18:10:38 -04:00
7 changed files with 377 additions and 45 deletions

View File

@@ -1,6 +1,8 @@
import { useState } from 'react'
import { NavLink } from 'react-router-dom'
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X } from 'lucide-react'
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X, Wifi, WifiOff, Loader } from 'lucide-react'
import { useSSEContext } from '../contexts/SSEContext'
import type { SSEStatus } from '../hooks/useSSE'
const navItems = [
{ to: '/', icon: Command, label: 'Hub' },
@@ -10,9 +12,29 @@ const navItems = [
{ to: '/settings', icon: Settings, label: 'Settings' },
]
/** Small status pill shown in the sidebar footer and mobile header. */
function SSEStatusBadge({ status, showLabel = false }: { status: SSEStatus; showLabel?: boolean }) {
const cfg = {
connected: { icon: Wifi, color: 'text-green-500', label: 'Live' },
connecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Connecting' },
reconnecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Reconnecting' },
error: { icon: WifiOff, color: 'text-red-500', label: 'Disconnected' },
}[status]
const Icon = cfg.icon
return (
<div className="flex items-center gap-1.5" title={cfg.label}>
<Icon size={14} className={cfg.color} />
{showLabel && <span className={`text-xs ${cfg.color}`}>{cfg.label}</span>}
</div>
)
}
export default function Layout({ children }: { children: React.ReactNode }) {
const [expanded, setExpanded] = useState(false)
const [mobileOpen, setMobileOpen] = useState(false)
const { sseStatus } = useSSEContext()
return (
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
@@ -46,6 +68,15 @@ export default function Layout({ children }: { children: React.ReactNode }) {
</NavLink>
))}
</nav>
{/* SSE connection status — footer of sidebar */}
<div className="px-4 py-3 border-t border-surface-light flex items-center gap-2">
<SSEStatusBadge status={sseStatus} />
{expanded && (
<span className="text-xs text-on-surface-muted whitespace-nowrap">
{sseStatus === 'connected' ? 'Live updates on' : sseStatus}
</span>
)}
</div>
</aside>
{/* Mobile Header + Bottom Nav */}
@@ -54,6 +85,7 @@ export default function Layout({ children }: { children: React.ReactNode }) {
<div className="flex items-center gap-2">
<Command size={22} className="text-primary" />
<span className="font-bold">Control Center</span>
<SSEStatusBadge status={sseStatus} />
</div>
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
{mobileOpen ? <X size={22} /> : <Menu size={22} />}

View File

@@ -0,0 +1,23 @@
/**
* SSEContext — provides SSE connection status throughout the component tree.
* Mount <SSEProvider> once inside QueryClientProvider.
*/
import { createContext, useContext, type ReactNode } from 'react'
import { useRealtimeSync } from '../hooks/useRealtimeSync'
import type { SSEStatus } from '../hooks/useSSE'
interface SSEContextValue {
sseStatus: SSEStatus
}
const SSEContext = createContext<SSEContextValue>({ sseStatus: 'connecting' })
export function SSEProvider({ children }: { children: ReactNode }) {
const { sseStatus } = useRealtimeSync()
return <SSEContext.Provider value={{ sseStatus }}>{children}</SSEContext.Provider>
}
/** Access the SSE connection status from any component. */
export function useSSEContext(): SSEContextValue {
return useContext(SSEContext)
}

View File

@@ -0,0 +1,52 @@
/**
* useRealtimeSync — mounts the SSE connection once at the app level and
* wires incoming events to React Query cache invalidation.
*
* Event → query key mapping:
* agent.status → ['agents']
* agent.task → ['tasks'], ['agents']
* agent.progress → ['tasks'], ['agents']
* fleet.update → ['agents'], ['sessions'], ['tasks']
*/
import { useQueryClient } from '@tanstack/react-query'
import { useCallback } from 'react'
import { useSSE, type SSEMessage, type SSEStatus } from './useSSE'
export function useRealtimeSync(): { sseStatus: SSEStatus } {
const queryClient = useQueryClient()
const handleMessage = useCallback(
(msg: SSEMessage) => {
switch (msg.type) {
case 'agent.status':
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'agent.task':
queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'agent.progress':
queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'fleet.update':
queryClient.invalidateQueries({ queryKey: ['agents'] })
queryClient.invalidateQueries({ queryKey: ['sessions'] })
queryClient.invalidateQueries({ queryKey: ['tasks'] })
break
default:
// 'connected' and unknown events — no action needed
break
}
},
[queryClient],
)
const { status: sseStatus } = useSSE({ onMessage: handleMessage })
return { sseStatus }
}

View File

@@ -0,0 +1,159 @@
import { useEffect, useRef, useCallback, useState } from 'react'
/** SSE connection state reported to consumers. */
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
/** Typed SSE event received from the backend. */
export interface SSEMessage {
/** event: field from the SSE frame */
type: string
/** parsed JSON from the data: field */
data: unknown
}
export interface UseSSEOptions {
/** Endpoint URL — defaults to /api/events */
url?: string
/** Called for every SSE message (all event types) */
onMessage?: (msg: SSEMessage) => void
/** Called when connection opens or reconnects */
onOpen?: () => void
/** Called on unrecoverable error */
onError?: (err: Event) => void
/** Base delay in ms before the first reconnect attempt (default 1 000) */
reconnectBaseMs?: number
/** Maximum reconnect delay in ms (default 30 000) */
reconnectMaxMs?: number
/** Set false to disable auto-connect (useful in tests) */
enabled?: boolean
}
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
/**
* useSSE — mounts a persistent SSE connection to the Control Center backend.
*
* Handles:
* - Initial connection on mount
* - Exponential back-off reconnection on drop
* - Cleanup on unmount
* - All four event types: agent.status, agent.task, agent.progress, fleet.update
*/
export function useSSE({
url = '/api/events',
onMessage,
onOpen,
onError,
reconnectBaseMs = 1_000,
reconnectMaxMs = 30_000,
enabled = true,
}: UseSSEOptions = {}): { status: SSEStatus } {
const [status, setStatus] = useState<SSEStatus>('connecting')
// Stable refs so the effect doesn't need to re-run when callbacks change
const onMessageRef = useRef(onMessage)
const onOpenRef = useRef(onOpen)
const onErrorRef = useRef(onError)
onMessageRef.current = onMessage
onOpenRef.current = onOpen
onErrorRef.current = onError
const reconnectAttemptRef = useRef(0)
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const esRef = useRef<EventSource | null>(null)
const mountedRef = useRef(true)
const clearReconnectTimer = useCallback(() => {
if (reconnectTimerRef.current !== null) {
clearTimeout(reconnectTimerRef.current)
reconnectTimerRef.current = null
}
}, [])
const connect = useCallback(() => {
if (!mountedRef.current || !enabled) return
// Clean up any existing connection
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
const es = new EventSource(url)
esRef.current = es
es.onopen = () => {
if (!mountedRef.current) return
reconnectAttemptRef.current = 0
setStatus('connected')
onOpenRef.current?.()
}
es.onerror = (evt) => {
if (!mountedRef.current) return
// EventSource auto-retries but we manage our own to get back-off control
es.close()
esRef.current = null
onErrorRef.current?.(evt)
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
const delay = Math.min(
reconnectBaseMs * 2 ** reconnectAttemptRef.current,
reconnectMaxMs,
)
reconnectAttemptRef.current += 1
setStatus('reconnecting')
clearReconnectTimer()
reconnectTimerRef.current = setTimeout(() => {
if (mountedRef.current) connect()
}, delay)
}
// Register listeners for all known event types
for (const eventType of SSE_EVENTS) {
es.addEventListener(eventType, (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: eventType, data })
})
}
// Catch-all for unnamed events (type == 'message')
es.onmessage = (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: 'message', data })
}
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, clearReconnectTimer])
useEffect(() => {
mountedRef.current = true
if (enabled) connect()
return () => {
mountedRef.current = false
clearReconnectTimer()
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
}
}, [connect, enabled, clearReconnectTimer])
return { status }
}

View File

@@ -4,13 +4,16 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import { BrowserRouter } from 'react-router-dom'
import ErrorBoundary from './components/ErrorBoundary'
import { ThemeProvider } from './hooks/useTheme'
import { SSEProvider } from './contexts/SSEContext'
import './index.css'
import App from './App'
const queryClient = new QueryClient({
defaultOptions: {
queries: {
staleTime: 30_000,
// No polling — real-time updates come through SSE.
// staleTime is kept high; data is pushed, not pulled.
staleTime: 60_000,
refetchOnWindowFocus: false,
retry: 1,
},
@@ -22,9 +25,13 @@ createRoot(document.getElementById('root')!).render(
<ErrorBoundary>
<ThemeProvider>
<QueryClientProvider client={queryClient}>
<BrowserRouter>
<App />
</BrowserRouter>
{/* SSEProvider must live inside QueryClientProvider so it can call
useQueryClient() to invalidate caches on incoming events. */}
<SSEProvider>
<BrowserRouter>
<App />
</BrowserRouter>
</SSEProvider>
</QueryClientProvider>
</ThemeProvider>
</ErrorBoundary>

View File

@@ -1,18 +1,36 @@
import { useTheme } from '../hooks/useTheme'
import { useLocalStorage } from '../hooks/useLocalStorage'
import { Sun, Moon, Monitor, Zap, Clock } from 'lucide-react'
import { useSSEContext } from '../contexts/SSEContext'
import { Sun, Moon, Monitor, Zap, Radio } from 'lucide-react'
const REFRESH_PRESETS = [
{ label: '5s', value: 5_000 },
{ label: '10s', value: 10_000 },
{ label: '30s', value: 30_000 },
{ label: '60s', value: 60_000 },
]
const SSE_STATUS_COPY: Record<string, { label: string; description: string; color: string }> = {
connected: {
label: 'Connected',
description: 'Real-time updates are active. Agent status, tasks, and progress stream live.',
color: 'text-green-500',
},
connecting: {
label: 'Connecting…',
description: 'Establishing SSE connection to the backend.',
color: 'text-yellow-500',
},
reconnecting: {
label: 'Reconnecting…',
description: 'Connection lost. Retrying with exponential back-off.',
color: 'text-yellow-500',
},
error: {
label: 'Disconnected',
description: 'Could not connect to the SSE endpoint. Check that the backend is reachable.',
color: 'text-red-500',
},
}
export default function SettingsPage() {
const { isDark, toggleTheme } = useTheme()
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
const [refreshInterval, setRefreshInterval] = useLocalStorage('cc-refresh-interval', 30_000)
const { sseStatus } = useSSEContext()
const sseInfo = SSE_STATUS_COPY[sseStatus] ?? SSE_STATUS_COPY.error
return (
<div className="space-y-8 max-w-2xl">
@@ -80,45 +98,31 @@ export default function SettingsPage() {
</div>
</section>
{/* Refresh */}
{/* Real-time connection status */}
<section className="space-y-4">
<h2 className="text-lg font-semibold flex items-center gap-2">
<Clock size={20} className="text-primary" />
Auto Refresh
<Radio size={20} className="text-primary" />
Real-time Updates
</h2>
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
<p className="text-sm text-on-surface-variant">Data refresh interval for agent status and logs</p>
<div className="flex flex-col gap-2">
<input
type="range"
min="0"
max="3"
step="1"
value={REFRESH_PRESETS.findIndex((p) => p.value === refreshInterval)}
onChange={(e) => {
const idx = parseInt(e.target.value)
setRefreshInterval(REFRESH_PRESETS[idx].value)
}}
className="w-full accent-primary"
/>
<div className="flex justify-between text-xs text-on-surface-muted">
{REFRESH_PRESETS.map((p) => (
<button
key={p.label}
onClick={() => setRefreshInterval(p.value)}
className={`px-3 py-1 rounded-lg transition-colors ${
refreshInterval === p.value
? 'bg-primary/10 text-primary'
: 'hover:bg-surface-light'
}`}
>
{p.label}
</button>
))}
<div className="flex items-center justify-between">
<div>
<p className="font-medium">SSE Connection</p>
<p className="text-sm text-on-surface-variant mt-0.5">{sseInfo.description}</p>
</div>
<span className={`text-sm font-semibold whitespace-nowrap ${sseInfo.color}`}>
{sseInfo.label}
</span>
</div>
<p className="text-xs text-on-surface-muted">
Endpoint: <code className="bg-surface-light px-1.5 py-0.5 rounded text-on-surface-variant">/api/events</code>
&nbsp;·&nbsp;Events: agent.status, agent.task, agent.progress, fleet.update
</p>
<p className="text-xs text-on-surface-muted">
Polling is disabled. All status updates are pushed from the server over a persistent SSE connection.
The client reconnects automatically with exponential back-off on drop.
</p>
</div>
</section>
</div>

View File

@@ -0,0 +1,55 @@
/**
* SSE event payload types matching the Go backend (internal/handler/sse.go).
*
* Event format on the wire:
* event: <eventType>
* data: <json>
*/
import type { AgentStatus } from '../types'
/** agent.status — agent came online, went offline, changed state */
export interface AgentStatusEvent {
agentId: string
status: AgentStatus
/** Optional human-readable reason (e.g. error message) */
reason?: string
}
/** agent.task — a task was assigned to or completed by an agent */
export interface AgentTaskEvent {
agentId: string
taskId: string
title: string
action: 'assigned' | 'completed' | 'failed'
}
/** agent.progress — incremental progress update for a running task */
export interface AgentProgressEvent {
agentId: string
taskId: string
progress: number
/** Optional description of what is currently happening */
message?: string
}
/**
* fleet.update — bulk refresh of all agents (e.g. after a deployment).
* The backend may send partial or complete agent state.
*/
export interface FleetUpdateEvent {
/** ISO timestamp of when the snapshot was taken */
timestamp: string
/** Number of agents in the fleet */
agentCount: number
}
/** Union of all SSE data payloads keyed by event type. */
export type SSEPayloadMap = {
'agent.status': AgentStatusEvent
'agent.task': AgentTaskEvent
'agent.progress': AgentProgressEvent
'fleet.update': FleetUpdateEvent
connected: { clientCount: number }
message: unknown
}