Files
Control-Center/frontend/src/hooks/useSSE.ts
rex-bot ffc127f12d
Some checks failed
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
CUB-125: address Grimm review — tests, type fixes, error state circuit breaker
- Add missing 'offline' to AgentStatus union type (types/index.ts)
- Add max-retry circuit breaker to useSSE; error state is now reachable
- Wire typed SSE payloads (SSEPayloadMap discriminated union) into useRealtimeSync
- Add Vitest + 20 unit tests: useSSE lifecycle, back-off, circuit breaker,
  event parsing, cleanup; useRealtimeSync event-to-invalidation mapping
- Rebase on dev to remove stale CUB-119 legacy-deletion commit and align
  CI workflow (dev already consolidated into single dev.yml)
- Tests: npm test → 20/20 pass; Build: npm run build → 0 errors
2026-05-20 16:51:13 +00:00

181 lines
5.6 KiB
TypeScript

import { useEffect, useRef, useCallback, useState } from 'react'
/** SSE connection state reported to consumers. */
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
/** Typed SSE event received from the backend. */
export interface SSEMessage {
/** event: field from the SSE frame */
type: string
/** parsed JSON from the data: field */
data: unknown
}
export interface UseSSEOptions {
/** Endpoint URL — defaults to /api/events */
url?: string
/** Called for every SSE message (all event types) */
onMessage?: (msg: SSEMessage) => void
/** Called when connection opens or reconnects */
onOpen?: () => void
/** Called on every connection error (both transient and terminal) */
onError?: (err: Event) => void
/** Base delay in ms before the first reconnect attempt (default 1 000) */
reconnectBaseMs?: number
/** Maximum reconnect delay in ms (default 30 000) */
reconnectMaxMs?: number
/**
* Maximum number of consecutive reconnect attempts before giving up.
* When the limit is reached, status transitions to 'error'.
* Default undefined (unlimited).
*/
reconnectLimit?: number
/** Set false to disable auto-connect (useful in tests) */
enabled?: boolean
}
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
/**
* useSSE — mounts a persistent SSE connection to the Control Center backend.
*
* Handles:
* - Initial connection on mount
* - Exponential back-off reconnection on drop (1s → 2s → 4s … capped at reconnectMaxMs)
* - Circuit-breaker: after reconnectLimit consecutive failures, transitions to 'error'
* - Cleanup on unmount
* - All five event types: agent.status, agent.task, agent.progress, fleet.update, connected
*
* The 'connected' SSE event is an application-level handshake sent by the backend
* after the transport opens. This is distinct from onOpen, which fires at the
* transport level when the EventSource HTTP connection is established.
*/
export function useSSE({
url = '/api/events',
onMessage,
onOpen,
onError,
reconnectBaseMs = 1_000,
reconnectMaxMs = 30_000,
reconnectLimit,
enabled = true,
}: UseSSEOptions = {}): { status: SSEStatus } {
const [status, setStatus] = useState<SSEStatus>('connecting')
// Stable refs so the effect doesn't need to re-run when callbacks change
const onMessageRef = useRef(onMessage)
const onOpenRef = useRef(onOpen)
const onErrorRef = useRef(onError)
onMessageRef.current = onMessage
onOpenRef.current = onOpen
onErrorRef.current = onError
const reconnectAttemptRef = useRef(0)
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const esRef = useRef<EventSource | null>(null)
const mountedRef = useRef(true)
const clearReconnectTimer = useCallback(() => {
if (reconnectTimerRef.current !== null) {
clearTimeout(reconnectTimerRef.current)
reconnectTimerRef.current = null
}
}, [])
const connect = useCallback(() => {
if (!mountedRef.current || !enabled) return
// Clean up any existing connection
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
const es = new EventSource(url)
esRef.current = es
es.onopen = () => {
if (!mountedRef.current) return
reconnectAttemptRef.current = 0
setStatus('connected')
onOpenRef.current?.()
}
es.onerror = (evt) => {
if (!mountedRef.current) return
// EventSource auto-retries but we manage our own to get back-off control
es.close()
esRef.current = null
onErrorRef.current?.(evt)
reconnectAttemptRef.current += 1
// Circuit-breaker: give up after reconnectLimit consecutive failures
if (reconnectLimit !== undefined && reconnectAttemptRef.current > reconnectLimit) {
setStatus('error')
return
}
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
// Note: attempt is 1-based here (already incremented), so we use attempt-1 for the exponent
const delay = Math.min(
reconnectBaseMs * 2 ** (reconnectAttemptRef.current - 1),
reconnectMaxMs,
)
setStatus('reconnecting')
clearReconnectTimer()
reconnectTimerRef.current = setTimeout(() => {
if (mountedRef.current) connect()
}, delay)
}
// Register listeners for all known event types
for (const eventType of SSE_EVENTS) {
es.addEventListener(eventType, (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: eventType, data })
})
}
// Catch-all for unnamed events (type == 'message').
// Won't fire for the named events registered via addEventListener above.
es.onmessage = (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: 'message', data })
}
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, reconnectLimit, clearReconnectTimer])
useEffect(() => {
mountedRef.current = true
if (enabled) connect()
return () => {
mountedRef.current = false
clearReconnectTimer()
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
}
}, [connect, enabled, clearReconnectTimer])
return { status }
}