CUB-125: implement real-time SSE/WebSocket in React frontend
Some checks failed
Dev Build / build-test (pull_request) Failing after 58s
Some checks failed
Dev Build / build-test (pull_request) Failing after 58s
- Add useSSE hook with exponential back-off reconnect (1s → 30s) - Add useRealtimeSync hook: maps SSE events to React Query invalidation (agent.status → agents; agent.task/agent.progress → tasks+agents; fleet.update → all) - Add SSEContext/SSEProvider so connection status is available app-wide - Mount SSEProvider in main.tsx inside QueryClientProvider (no polling) - Show live/connecting/reconnecting/disconnected badge in sidebar + mobile header - Update SettingsPage: replace polling interval UI with SSE status panel - Disable React Query polling (staleTime 60s); all updates pushed via SSE Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
import { useState } from 'react'
|
||||
import { NavLink } from 'react-router-dom'
|
||||
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X } from 'lucide-react'
|
||||
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X, Wifi, WifiOff, Loader } from 'lucide-react'
|
||||
import { useSSEContext } from '../contexts/SSEContext'
|
||||
import type { SSEStatus } from '../hooks/useSSE'
|
||||
|
||||
const navItems = [
|
||||
{ to: '/', icon: Command, label: 'Hub' },
|
||||
@@ -10,9 +12,29 @@ const navItems = [
|
||||
{ to: '/settings', icon: Settings, label: 'Settings' },
|
||||
]
|
||||
|
||||
/** Small status pill shown in the sidebar footer and mobile header. */
|
||||
function SSEStatusBadge({ status, showLabel = false }: { status: SSEStatus; showLabel?: boolean }) {
|
||||
const cfg = {
|
||||
connected: { icon: Wifi, color: 'text-green-500', label: 'Live' },
|
||||
connecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Connecting' },
|
||||
reconnecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Reconnecting' },
|
||||
error: { icon: WifiOff, color: 'text-red-500', label: 'Disconnected' },
|
||||
}[status]
|
||||
|
||||
const Icon = cfg.icon
|
||||
|
||||
return (
|
||||
<div className="flex items-center gap-1.5" title={cfg.label}>
|
||||
<Icon size={14} className={cfg.color} />
|
||||
{showLabel && <span className={`text-xs ${cfg.color}`}>{cfg.label}</span>}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
export default function Layout({ children }: { children: React.ReactNode }) {
|
||||
const [expanded, setExpanded] = useState(false)
|
||||
const [mobileOpen, setMobileOpen] = useState(false)
|
||||
const { sseStatus } = useSSEContext()
|
||||
|
||||
return (
|
||||
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
|
||||
@@ -46,6 +68,15 @@ export default function Layout({ children }: { children: React.ReactNode }) {
|
||||
</NavLink>
|
||||
))}
|
||||
</nav>
|
||||
{/* SSE connection status — footer of sidebar */}
|
||||
<div className="px-4 py-3 border-t border-surface-light flex items-center gap-2">
|
||||
<SSEStatusBadge status={sseStatus} />
|
||||
{expanded && (
|
||||
<span className="text-xs text-on-surface-muted whitespace-nowrap">
|
||||
{sseStatus === 'connected' ? 'Live updates on' : sseStatus}
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
</aside>
|
||||
|
||||
{/* Mobile Header + Bottom Nav */}
|
||||
@@ -54,6 +85,7 @@ export default function Layout({ children }: { children: React.ReactNode }) {
|
||||
<div className="flex items-center gap-2">
|
||||
<Command size={22} className="text-primary" />
|
||||
<span className="font-bold">Control Center</span>
|
||||
<SSEStatusBadge status={sseStatus} />
|
||||
</div>
|
||||
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
|
||||
{mobileOpen ? <X size={22} /> : <Menu size={22} />}
|
||||
|
||||
23
frontend/src/contexts/SSEContext.tsx
Normal file
23
frontend/src/contexts/SSEContext.tsx
Normal file
@@ -0,0 +1,23 @@
|
||||
/**
|
||||
* SSEContext — provides SSE connection status throughout the component tree.
|
||||
* Mount <SSEProvider> once inside QueryClientProvider.
|
||||
*/
|
||||
import { createContext, useContext, type ReactNode } from 'react'
|
||||
import { useRealtimeSync } from '../hooks/useRealtimeSync'
|
||||
import type { SSEStatus } from '../hooks/useSSE'
|
||||
|
||||
interface SSEContextValue {
|
||||
sseStatus: SSEStatus
|
||||
}
|
||||
|
||||
const SSEContext = createContext<SSEContextValue>({ sseStatus: 'connecting' })
|
||||
|
||||
export function SSEProvider({ children }: { children: ReactNode }) {
|
||||
const { sseStatus } = useRealtimeSync()
|
||||
return <SSEContext.Provider value={{ sseStatus }}>{children}</SSEContext.Provider>
|
||||
}
|
||||
|
||||
/** Access the SSE connection status from any component. */
|
||||
export function useSSEContext(): SSEContextValue {
|
||||
return useContext(SSEContext)
|
||||
}
|
||||
52
frontend/src/hooks/useRealtimeSync.ts
Normal file
52
frontend/src/hooks/useRealtimeSync.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
/**
|
||||
* useRealtimeSync — mounts the SSE connection once at the app level and
|
||||
* wires incoming events to React Query cache invalidation.
|
||||
*
|
||||
* Event → query key mapping:
|
||||
* agent.status → ['agents']
|
||||
* agent.task → ['tasks'], ['agents']
|
||||
* agent.progress → ['tasks'], ['agents']
|
||||
* fleet.update → ['agents'], ['sessions'], ['tasks']
|
||||
*/
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import { useCallback } from 'react'
|
||||
import { useSSE, type SSEMessage, type SSEStatus } from './useSSE'
|
||||
|
||||
export function useRealtimeSync(): { sseStatus: SSEStatus } {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
const handleMessage = useCallback(
|
||||
(msg: SSEMessage) => {
|
||||
switch (msg.type) {
|
||||
case 'agent.status':
|
||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||
break
|
||||
|
||||
case 'agent.task':
|
||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||
break
|
||||
|
||||
case 'agent.progress':
|
||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||
break
|
||||
|
||||
case 'fleet.update':
|
||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||
queryClient.invalidateQueries({ queryKey: ['sessions'] })
|
||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||
break
|
||||
|
||||
default:
|
||||
// 'connected' and unknown events — no action needed
|
||||
break
|
||||
}
|
||||
},
|
||||
[queryClient],
|
||||
)
|
||||
|
||||
const { status: sseStatus } = useSSE({ onMessage: handleMessage })
|
||||
|
||||
return { sseStatus }
|
||||
}
|
||||
159
frontend/src/hooks/useSSE.ts
Normal file
159
frontend/src/hooks/useSSE.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import { useEffect, useRef, useCallback, useState } from 'react'
|
||||
|
||||
/** SSE connection state reported to consumers. */
|
||||
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
|
||||
|
||||
/** Typed SSE event received from the backend. */
|
||||
export interface SSEMessage {
|
||||
/** event: field from the SSE frame */
|
||||
type: string
|
||||
/** parsed JSON from the data: field */
|
||||
data: unknown
|
||||
}
|
||||
|
||||
export interface UseSSEOptions {
|
||||
/** Endpoint URL — defaults to /api/events */
|
||||
url?: string
|
||||
/** Called for every SSE message (all event types) */
|
||||
onMessage?: (msg: SSEMessage) => void
|
||||
/** Called when connection opens or reconnects */
|
||||
onOpen?: () => void
|
||||
/** Called on unrecoverable error */
|
||||
onError?: (err: Event) => void
|
||||
/** Base delay in ms before the first reconnect attempt (default 1 000) */
|
||||
reconnectBaseMs?: number
|
||||
/** Maximum reconnect delay in ms (default 30 000) */
|
||||
reconnectMaxMs?: number
|
||||
/** Set false to disable auto-connect (useful in tests) */
|
||||
enabled?: boolean
|
||||
}
|
||||
|
||||
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
|
||||
|
||||
/**
|
||||
* useSSE — mounts a persistent SSE connection to the Control Center backend.
|
||||
*
|
||||
* Handles:
|
||||
* - Initial connection on mount
|
||||
* - Exponential back-off reconnection on drop
|
||||
* - Cleanup on unmount
|
||||
* - All four event types: agent.status, agent.task, agent.progress, fleet.update
|
||||
*/
|
||||
export function useSSE({
|
||||
url = '/api/events',
|
||||
onMessage,
|
||||
onOpen,
|
||||
onError,
|
||||
reconnectBaseMs = 1_000,
|
||||
reconnectMaxMs = 30_000,
|
||||
enabled = true,
|
||||
}: UseSSEOptions = {}): { status: SSEStatus } {
|
||||
const [status, setStatus] = useState<SSEStatus>('connecting')
|
||||
|
||||
// Stable refs so the effect doesn't need to re-run when callbacks change
|
||||
const onMessageRef = useRef(onMessage)
|
||||
const onOpenRef = useRef(onOpen)
|
||||
const onErrorRef = useRef(onError)
|
||||
onMessageRef.current = onMessage
|
||||
onOpenRef.current = onOpen
|
||||
onErrorRef.current = onError
|
||||
|
||||
const reconnectAttemptRef = useRef(0)
|
||||
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
|
||||
const esRef = useRef<EventSource | null>(null)
|
||||
const mountedRef = useRef(true)
|
||||
|
||||
const clearReconnectTimer = useCallback(() => {
|
||||
if (reconnectTimerRef.current !== null) {
|
||||
clearTimeout(reconnectTimerRef.current)
|
||||
reconnectTimerRef.current = null
|
||||
}
|
||||
}, [])
|
||||
|
||||
const connect = useCallback(() => {
|
||||
if (!mountedRef.current || !enabled) return
|
||||
|
||||
// Clean up any existing connection
|
||||
if (esRef.current) {
|
||||
esRef.current.close()
|
||||
esRef.current = null
|
||||
}
|
||||
|
||||
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
|
||||
|
||||
const es = new EventSource(url)
|
||||
esRef.current = es
|
||||
|
||||
es.onopen = () => {
|
||||
if (!mountedRef.current) return
|
||||
reconnectAttemptRef.current = 0
|
||||
setStatus('connected')
|
||||
onOpenRef.current?.()
|
||||
}
|
||||
|
||||
es.onerror = (evt) => {
|
||||
if (!mountedRef.current) return
|
||||
|
||||
// EventSource auto-retries but we manage our own to get back-off control
|
||||
es.close()
|
||||
esRef.current = null
|
||||
|
||||
onErrorRef.current?.(evt)
|
||||
|
||||
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
|
||||
const delay = Math.min(
|
||||
reconnectBaseMs * 2 ** reconnectAttemptRef.current,
|
||||
reconnectMaxMs,
|
||||
)
|
||||
reconnectAttemptRef.current += 1
|
||||
setStatus('reconnecting')
|
||||
|
||||
clearReconnectTimer()
|
||||
reconnectTimerRef.current = setTimeout(() => {
|
||||
if (mountedRef.current) connect()
|
||||
}, delay)
|
||||
}
|
||||
|
||||
// Register listeners for all known event types
|
||||
for (const eventType of SSE_EVENTS) {
|
||||
es.addEventListener(eventType, (evt: MessageEvent) => {
|
||||
if (!mountedRef.current) return
|
||||
let data: unknown = evt.data
|
||||
try {
|
||||
data = JSON.parse(evt.data as string)
|
||||
} catch {
|
||||
// leave as raw string
|
||||
}
|
||||
onMessageRef.current?.({ type: eventType, data })
|
||||
})
|
||||
}
|
||||
|
||||
// Catch-all for unnamed events (type == 'message')
|
||||
es.onmessage = (evt: MessageEvent) => {
|
||||
if (!mountedRef.current) return
|
||||
let data: unknown = evt.data
|
||||
try {
|
||||
data = JSON.parse(evt.data as string)
|
||||
} catch {
|
||||
// leave as raw string
|
||||
}
|
||||
onMessageRef.current?.({ type: 'message', data })
|
||||
}
|
||||
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, clearReconnectTimer])
|
||||
|
||||
useEffect(() => {
|
||||
mountedRef.current = true
|
||||
if (enabled) connect()
|
||||
|
||||
return () => {
|
||||
mountedRef.current = false
|
||||
clearReconnectTimer()
|
||||
if (esRef.current) {
|
||||
esRef.current.close()
|
||||
esRef.current = null
|
||||
}
|
||||
}
|
||||
}, [connect, enabled, clearReconnectTimer])
|
||||
|
||||
return { status }
|
||||
}
|
||||
@@ -4,13 +4,16 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
|
||||
import { BrowserRouter } from 'react-router-dom'
|
||||
import ErrorBoundary from './components/ErrorBoundary'
|
||||
import { ThemeProvider } from './hooks/useTheme'
|
||||
import { SSEProvider } from './contexts/SSEContext'
|
||||
import './index.css'
|
||||
import App from './App'
|
||||
|
||||
const queryClient = new QueryClient({
|
||||
defaultOptions: {
|
||||
queries: {
|
||||
staleTime: 30_000,
|
||||
// No polling — real-time updates come through SSE.
|
||||
// staleTime is kept high; data is pushed, not pulled.
|
||||
staleTime: 60_000,
|
||||
refetchOnWindowFocus: false,
|
||||
retry: 1,
|
||||
},
|
||||
@@ -22,9 +25,13 @@ createRoot(document.getElementById('root')!).render(
|
||||
<ErrorBoundary>
|
||||
<ThemeProvider>
|
||||
<QueryClientProvider client={queryClient}>
|
||||
<BrowserRouter>
|
||||
<App />
|
||||
</BrowserRouter>
|
||||
{/* SSEProvider must live inside QueryClientProvider so it can call
|
||||
useQueryClient() to invalidate caches on incoming events. */}
|
||||
<SSEProvider>
|
||||
<BrowserRouter>
|
||||
<App />
|
||||
</BrowserRouter>
|
||||
</SSEProvider>
|
||||
</QueryClientProvider>
|
||||
</ThemeProvider>
|
||||
</ErrorBoundary>
|
||||
|
||||
@@ -1,18 +1,36 @@
|
||||
import { useTheme } from '../hooks/useTheme'
|
||||
import { useLocalStorage } from '../hooks/useLocalStorage'
|
||||
import { Sun, Moon, Monitor, Zap, Clock } from 'lucide-react'
|
||||
import { useSSEContext } from '../contexts/SSEContext'
|
||||
import { Sun, Moon, Monitor, Zap, Radio } from 'lucide-react'
|
||||
|
||||
const REFRESH_PRESETS = [
|
||||
{ label: '5s', value: 5_000 },
|
||||
{ label: '10s', value: 10_000 },
|
||||
{ label: '30s', value: 30_000 },
|
||||
{ label: '60s', value: 60_000 },
|
||||
]
|
||||
const SSE_STATUS_COPY: Record<string, { label: string; description: string; color: string }> = {
|
||||
connected: {
|
||||
label: 'Connected',
|
||||
description: 'Real-time updates are active. Agent status, tasks, and progress stream live.',
|
||||
color: 'text-green-500',
|
||||
},
|
||||
connecting: {
|
||||
label: 'Connecting…',
|
||||
description: 'Establishing SSE connection to the backend.',
|
||||
color: 'text-yellow-500',
|
||||
},
|
||||
reconnecting: {
|
||||
label: 'Reconnecting…',
|
||||
description: 'Connection lost. Retrying with exponential back-off.',
|
||||
color: 'text-yellow-500',
|
||||
},
|
||||
error: {
|
||||
label: 'Disconnected',
|
||||
description: 'Could not connect to the SSE endpoint. Check that the backend is reachable.',
|
||||
color: 'text-red-500',
|
||||
},
|
||||
}
|
||||
|
||||
export default function SettingsPage() {
|
||||
const { isDark, toggleTheme } = useTheme()
|
||||
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
|
||||
const [refreshInterval, setRefreshInterval] = useLocalStorage('cc-refresh-interval', 30_000)
|
||||
const { sseStatus } = useSSEContext()
|
||||
const sseInfo = SSE_STATUS_COPY[sseStatus] ?? SSE_STATUS_COPY.error
|
||||
|
||||
return (
|
||||
<div className="space-y-8 max-w-2xl">
|
||||
@@ -80,45 +98,31 @@ export default function SettingsPage() {
|
||||
</div>
|
||||
</section>
|
||||
|
||||
{/* Refresh */}
|
||||
{/* Real-time connection status */}
|
||||
<section className="space-y-4">
|
||||
<h2 className="text-lg font-semibold flex items-center gap-2">
|
||||
<Clock size={20} className="text-primary" />
|
||||
Auto Refresh
|
||||
<Radio size={20} className="text-primary" />
|
||||
Real-time Updates
|
||||
</h2>
|
||||
|
||||
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
|
||||
<p className="text-sm text-on-surface-variant">Data refresh interval for agent status and logs</p>
|
||||
|
||||
<div className="flex flex-col gap-2">
|
||||
<input
|
||||
type="range"
|
||||
min="0"
|
||||
max="3"
|
||||
step="1"
|
||||
value={REFRESH_PRESETS.findIndex((p) => p.value === refreshInterval)}
|
||||
onChange={(e) => {
|
||||
const idx = parseInt(e.target.value)
|
||||
setRefreshInterval(REFRESH_PRESETS[idx].value)
|
||||
}}
|
||||
className="w-full accent-primary"
|
||||
/>
|
||||
<div className="flex justify-between text-xs text-on-surface-muted">
|
||||
{REFRESH_PRESETS.map((p) => (
|
||||
<button
|
||||
key={p.label}
|
||||
onClick={() => setRefreshInterval(p.value)}
|
||||
className={`px-3 py-1 rounded-lg transition-colors ${
|
||||
refreshInterval === p.value
|
||||
? 'bg-primary/10 text-primary'
|
||||
: 'hover:bg-surface-light'
|
||||
}`}
|
||||
>
|
||||
{p.label}
|
||||
</button>
|
||||
))}
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="font-medium">SSE Connection</p>
|
||||
<p className="text-sm text-on-surface-variant mt-0.5">{sseInfo.description}</p>
|
||||
</div>
|
||||
<span className={`text-sm font-semibold whitespace-nowrap ${sseInfo.color}`}>
|
||||
{sseInfo.label}
|
||||
</span>
|
||||
</div>
|
||||
<p className="text-xs text-on-surface-muted">
|
||||
Endpoint: <code className="bg-surface-light px-1.5 py-0.5 rounded text-on-surface-variant">/api/events</code>
|
||||
· Events: agent.status, agent.task, agent.progress, fleet.update
|
||||
</p>
|
||||
<p className="text-xs text-on-surface-muted">
|
||||
Polling is disabled. All status updates are pushed from the server over a persistent SSE connection.
|
||||
The client reconnects automatically with exponential back-off on drop.
|
||||
</p>
|
||||
</div>
|
||||
</section>
|
||||
</div>
|
||||
|
||||
55
frontend/src/services/sse.ts
Normal file
55
frontend/src/services/sse.ts
Normal file
@@ -0,0 +1,55 @@
|
||||
/**
|
||||
* SSE event payload types matching the Go backend (internal/handler/sse.go).
|
||||
*
|
||||
* Event format on the wire:
|
||||
* event: <eventType>
|
||||
* data: <json>
|
||||
*/
|
||||
|
||||
import type { AgentStatus } from '../types'
|
||||
|
||||
/** agent.status — agent came online, went offline, changed state */
|
||||
export interface AgentStatusEvent {
|
||||
agentId: string
|
||||
status: AgentStatus
|
||||
/** Optional human-readable reason (e.g. error message) */
|
||||
reason?: string
|
||||
}
|
||||
|
||||
/** agent.task — a task was assigned to or completed by an agent */
|
||||
export interface AgentTaskEvent {
|
||||
agentId: string
|
||||
taskId: string
|
||||
title: string
|
||||
action: 'assigned' | 'completed' | 'failed'
|
||||
}
|
||||
|
||||
/** agent.progress — incremental progress update for a running task */
|
||||
export interface AgentProgressEvent {
|
||||
agentId: string
|
||||
taskId: string
|
||||
progress: number
|
||||
/** Optional description of what is currently happening */
|
||||
message?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* fleet.update — bulk refresh of all agents (e.g. after a deployment).
|
||||
* The backend may send partial or complete agent state.
|
||||
*/
|
||||
export interface FleetUpdateEvent {
|
||||
/** ISO timestamp of when the snapshot was taken */
|
||||
timestamp: string
|
||||
/** Number of agents in the fleet */
|
||||
agentCount: number
|
||||
}
|
||||
|
||||
/** Union of all SSE data payloads keyed by event type. */
|
||||
export type SSEPayloadMap = {
|
||||
'agent.status': AgentStatusEvent
|
||||
'agent.task': AgentTaskEvent
|
||||
'agent.progress': AgentProgressEvent
|
||||
'fleet.update': FleetUpdateEvent
|
||||
connected: { clientCount: number }
|
||||
message: unknown
|
||||
}
|
||||
Reference in New Issue
Block a user