CUB-125: implement real-time SSE/WebSocket in React frontend
- Add useSSE hook with exponential back-off reconnect (1s → 30s) - Add useRealtimeSync hook: maps SSE events to React Query invalidation (agent.status → agents; agent.task/agent.progress → tasks+agents; fleet.update → all) - Add SSEContext/SSEProvider so connection status is available app-wide - Mount SSEProvider in main.tsx inside QueryClientProvider (no polling) - Show live/connecting/reconnecting/disconnected badge in sidebar + mobile header - Update SettingsPage: replace polling interval UI with SSE status panel - Disable React Query polling (staleTime 60s); all updates pushed via SSE Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
159
frontend/src/hooks/useSSE.ts
Normal file
159
frontend/src/hooks/useSSE.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import { useEffect, useRef, useCallback, useState } from 'react'
|
||||
|
||||
/** SSE connection state reported to consumers. */
|
||||
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
|
||||
|
||||
/** Typed SSE event received from the backend. */
|
||||
export interface SSEMessage {
|
||||
/** event: field from the SSE frame */
|
||||
type: string
|
||||
/** parsed JSON from the data: field */
|
||||
data: unknown
|
||||
}
|
||||
|
||||
export interface UseSSEOptions {
|
||||
/** Endpoint URL — defaults to /api/events */
|
||||
url?: string
|
||||
/** Called for every SSE message (all event types) */
|
||||
onMessage?: (msg: SSEMessage) => void
|
||||
/** Called when connection opens or reconnects */
|
||||
onOpen?: () => void
|
||||
/** Called on unrecoverable error */
|
||||
onError?: (err: Event) => void
|
||||
/** Base delay in ms before the first reconnect attempt (default 1 000) */
|
||||
reconnectBaseMs?: number
|
||||
/** Maximum reconnect delay in ms (default 30 000) */
|
||||
reconnectMaxMs?: number
|
||||
/** Set false to disable auto-connect (useful in tests) */
|
||||
enabled?: boolean
|
||||
}
|
||||
|
||||
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
|
||||
|
||||
/**
|
||||
* useSSE — mounts a persistent SSE connection to the Control Center backend.
|
||||
*
|
||||
* Handles:
|
||||
* - Initial connection on mount
|
||||
* - Exponential back-off reconnection on drop
|
||||
* - Cleanup on unmount
|
||||
* - All four event types: agent.status, agent.task, agent.progress, fleet.update
|
||||
*/
|
||||
export function useSSE({
|
||||
url = '/api/events',
|
||||
onMessage,
|
||||
onOpen,
|
||||
onError,
|
||||
reconnectBaseMs = 1_000,
|
||||
reconnectMaxMs = 30_000,
|
||||
enabled = true,
|
||||
}: UseSSEOptions = {}): { status: SSEStatus } {
|
||||
const [status, setStatus] = useState<SSEStatus>('connecting')
|
||||
|
||||
// Stable refs so the effect doesn't need to re-run when callbacks change
|
||||
const onMessageRef = useRef(onMessage)
|
||||
const onOpenRef = useRef(onOpen)
|
||||
const onErrorRef = useRef(onError)
|
||||
onMessageRef.current = onMessage
|
||||
onOpenRef.current = onOpen
|
||||
onErrorRef.current = onError
|
||||
|
||||
const reconnectAttemptRef = useRef(0)
|
||||
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
|
||||
const esRef = useRef<EventSource | null>(null)
|
||||
const mountedRef = useRef(true)
|
||||
|
||||
const clearReconnectTimer = useCallback(() => {
|
||||
if (reconnectTimerRef.current !== null) {
|
||||
clearTimeout(reconnectTimerRef.current)
|
||||
reconnectTimerRef.current = null
|
||||
}
|
||||
}, [])
|
||||
|
||||
const connect = useCallback(() => {
|
||||
if (!mountedRef.current || !enabled) return
|
||||
|
||||
// Clean up any existing connection
|
||||
if (esRef.current) {
|
||||
esRef.current.close()
|
||||
esRef.current = null
|
||||
}
|
||||
|
||||
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
|
||||
|
||||
const es = new EventSource(url)
|
||||
esRef.current = es
|
||||
|
||||
es.onopen = () => {
|
||||
if (!mountedRef.current) return
|
||||
reconnectAttemptRef.current = 0
|
||||
setStatus('connected')
|
||||
onOpenRef.current?.()
|
||||
}
|
||||
|
||||
es.onerror = (evt) => {
|
||||
if (!mountedRef.current) return
|
||||
|
||||
// EventSource auto-retries but we manage our own to get back-off control
|
||||
es.close()
|
||||
esRef.current = null
|
||||
|
||||
onErrorRef.current?.(evt)
|
||||
|
||||
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
|
||||
const delay = Math.min(
|
||||
reconnectBaseMs * 2 ** reconnectAttemptRef.current,
|
||||
reconnectMaxMs,
|
||||
)
|
||||
reconnectAttemptRef.current += 1
|
||||
setStatus('reconnecting')
|
||||
|
||||
clearReconnectTimer()
|
||||
reconnectTimerRef.current = setTimeout(() => {
|
||||
if (mountedRef.current) connect()
|
||||
}, delay)
|
||||
}
|
||||
|
||||
// Register listeners for all known event types
|
||||
for (const eventType of SSE_EVENTS) {
|
||||
es.addEventListener(eventType, (evt: MessageEvent) => {
|
||||
if (!mountedRef.current) return
|
||||
let data: unknown = evt.data
|
||||
try {
|
||||
data = JSON.parse(evt.data as string)
|
||||
} catch {
|
||||
// leave as raw string
|
||||
}
|
||||
onMessageRef.current?.({ type: eventType, data })
|
||||
})
|
||||
}
|
||||
|
||||
// Catch-all for unnamed events (type == 'message')
|
||||
es.onmessage = (evt: MessageEvent) => {
|
||||
if (!mountedRef.current) return
|
||||
let data: unknown = evt.data
|
||||
try {
|
||||
data = JSON.parse(evt.data as string)
|
||||
} catch {
|
||||
// leave as raw string
|
||||
}
|
||||
onMessageRef.current?.({ type: 'message', data })
|
||||
}
|
||||
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, clearReconnectTimer])
|
||||
|
||||
useEffect(() => {
|
||||
mountedRef.current = true
|
||||
if (enabled) connect()
|
||||
|
||||
return () => {
|
||||
mountedRef.current = false
|
||||
clearReconnectTimer()
|
||||
if (esRef.current) {
|
||||
esRef.current.close()
|
||||
esRef.current = null
|
||||
}
|
||||
}
|
||||
}, [connect, enabled, clearReconnectTimer])
|
||||
|
||||
return { status }
|
||||
}
|
||||
Reference in New Issue
Block a user