Compare commits
15 Commits
agent/otto
...
agent/dex/
| Author | SHA1 | Date | |
|---|---|---|---|
| b7b05bb4e3 | |||
| d370d5ec23 | |||
|
|
1b82e1d3a6 | ||
| 93bf434a47 | |||
| 010408cc45 | |||
| 23f9d4a8fb | |||
| 3d5bf16d37 | |||
| d9a1640b10 | |||
| 5347944c4c | |||
| 48a8598d3b | |||
| 20404b30bb | |||
| 6fd2d9bec4 | |||
|
|
439741e55f | ||
|
|
3c26b8deba | ||
|
|
d28d6e8dac |
@@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
|
||||
# When using docker-compose, these are set in the services section
|
||||
# See docker-compose.yml for service-specific environment variables
|
||||
|
||||
# ── Database Configuration ─────────────────────────────────────────────
|
||||
# ── Database Configuration ───────────────────────────────────────────────
|
||||
# Set in the db service environment section of docker-compose.yml
|
||||
# POSTGRES_USER=controlcenter
|
||||
# POSTGRES_PASSWORD=controlcenter
|
||||
@@ -47,4 +47,4 @@ GATEWAY_POLL_INTERVAL=5s
|
||||
# For Docker deployment:
|
||||
# 1. Copy .env.example to .env (backend only)
|
||||
# 2. Run: docker compose up -d
|
||||
# 3. Access frontend at http://localhost:3000
|
||||
# 3. Access frontend at http://localhost:3000
|
||||
1129
frontend/package-lock.json
generated
1129
frontend/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -7,7 +7,9 @@
|
||||
"dev": "vite",
|
||||
"build": "tsc -b && vite build",
|
||||
"lint": "eslint .",
|
||||
"preview": "vite preview"
|
||||
"preview": "vite preview",
|
||||
"test": "vitest run",
|
||||
"test:watch": "vitest"
|
||||
},
|
||||
"dependencies": {
|
||||
"@tanstack/react-query": "^5.100.9",
|
||||
@@ -20,6 +22,8 @@
|
||||
"devDependencies": {
|
||||
"@eslint/js": "^10.0.1",
|
||||
"@tailwindcss/vite": "^4.2.4",
|
||||
"@testing-library/jest-dom": "^6.9.1",
|
||||
"@testing-library/react": "^16.3.2",
|
||||
"@types/node": "^24.12.2",
|
||||
"@types/react": "^19.2.14",
|
||||
"@types/react-dom": "^19.2.3",
|
||||
@@ -29,10 +33,12 @@
|
||||
"eslint-plugin-react-hooks": "^7.1.1",
|
||||
"eslint-plugin-react-refresh": "^0.5.2",
|
||||
"globals": "^17.5.0",
|
||||
"jsdom": "^29.1.1",
|
||||
"postcss": "^8.5.14",
|
||||
"tailwindcss": "^4.2.4",
|
||||
"typescript": "~6.0.2",
|
||||
"typescript-eslint": "^8.58.2",
|
||||
"vite": "^8.0.10"
|
||||
"vite": "^8.0.10",
|
||||
"vitest": "^4.1.7"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { useState } from 'react'
|
||||
import { NavLink } from 'react-router-dom'
|
||||
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X } from 'lucide-react'
|
||||
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X, Wifi, WifiOff, Loader } from 'lucide-react'
|
||||
import { useSSEContext } from '../contexts/SSEContext'
|
||||
import type { SSEStatus } from '../hooks/useSSE'
|
||||
|
||||
const navItems = [
|
||||
{ to: '/', icon: Command, label: 'Hub' },
|
||||
@@ -10,9 +12,29 @@ const navItems = [
|
||||
{ to: '/settings', icon: Settings, label: 'Settings' },
|
||||
]
|
||||
|
||||
/** Small status pill shown in the sidebar footer and mobile header. */
|
||||
function SSEStatusBadge({ status, showLabel = false }: { status: SSEStatus; showLabel?: boolean }) {
|
||||
const cfg = {
|
||||
connected: { icon: Wifi, color: 'text-green-500', label: 'Live' },
|
||||
connecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Connecting' },
|
||||
reconnecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Reconnecting' },
|
||||
error: { icon: WifiOff, color: 'text-red-500', label: 'Disconnected' },
|
||||
}[status]
|
||||
|
||||
const Icon = cfg.icon
|
||||
|
||||
return (
|
||||
<div className="flex items-center gap-1.5" title={cfg.label}>
|
||||
<Icon size={14} className={cfg.color} />
|
||||
{showLabel && <span className={`text-xs ${cfg.color}`}>{cfg.label}</span>}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
export default function Layout({ children }: { children: React.ReactNode }) {
|
||||
const [expanded, setExpanded] = useState(false)
|
||||
const [mobileOpen, setMobileOpen] = useState(false)
|
||||
const { sseStatus } = useSSEContext()
|
||||
|
||||
return (
|
||||
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
|
||||
@@ -46,6 +68,15 @@ export default function Layout({ children }: { children: React.ReactNode }) {
|
||||
</NavLink>
|
||||
))}
|
||||
</nav>
|
||||
{/* SSE connection status — footer of sidebar */}
|
||||
<div className="px-4 py-3 border-t border-surface-light flex items-center gap-2">
|
||||
<SSEStatusBadge status={sseStatus} />
|
||||
{expanded && (
|
||||
<span className="text-xs text-on-surface-muted whitespace-nowrap">
|
||||
{sseStatus === 'connected' ? 'Live updates on' : sseStatus}
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
</aside>
|
||||
|
||||
{/* Mobile Header + Bottom Nav */}
|
||||
@@ -54,6 +85,7 @@ export default function Layout({ children }: { children: React.ReactNode }) {
|
||||
<div className="flex items-center gap-2">
|
||||
<Command size={22} className="text-primary" />
|
||||
<span className="font-bold">Control Center</span>
|
||||
<SSEStatusBadge status={sseStatus} />
|
||||
</div>
|
||||
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
|
||||
{mobileOpen ? <X size={22} /> : <Menu size={22} />}
|
||||
|
||||
23
frontend/src/contexts/SSEContext.tsx
Normal file
23
frontend/src/contexts/SSEContext.tsx
Normal file
@@ -0,0 +1,23 @@
|
||||
/**
|
||||
* SSEContext — provides SSE connection status throughout the component tree.
|
||||
* Mount <SSEProvider> once inside QueryClientProvider.
|
||||
*/
|
||||
import { createContext, useContext, type ReactNode } from 'react'
|
||||
import { useRealtimeSync } from '../hooks/useRealtimeSync'
|
||||
import type { SSEStatus } from '../hooks/useSSE'
|
||||
|
||||
interface SSEContextValue {
|
||||
sseStatus: SSEStatus
|
||||
}
|
||||
|
||||
const SSEContext = createContext<SSEContextValue>({ sseStatus: 'connecting' })
|
||||
|
||||
export function SSEProvider({ children }: { children: ReactNode }) {
|
||||
const { sseStatus } = useRealtimeSync()
|
||||
return <SSEContext.Provider value={{ sseStatus }}>{children}</SSEContext.Provider>
|
||||
}
|
||||
|
||||
/** Access the SSE connection status from any component. */
|
||||
export function useSSEContext(): SSEContextValue {
|
||||
return useContext(SSEContext)
|
||||
}
|
||||
129
frontend/src/hooks/useRealtimeSync.test.tsx
Normal file
129
frontend/src/hooks/useRealtimeSync.test.tsx
Normal file
@@ -0,0 +1,129 @@
|
||||
/**
|
||||
* Tests for useRealtimeSync — event → query invalidation mapping.
|
||||
*
|
||||
* Uses .tsx extension so Vite/OXC can parse JSX in the wrapper component.
|
||||
*/
|
||||
import { renderHook } from '@testing-library/react'
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
|
||||
import * as useSSEModule from './useSSE'
|
||||
import { useRealtimeSync } from './useRealtimeSync'
|
||||
import React from 'react'
|
||||
import type { SSEMessage } from '../services/sse'
|
||||
|
||||
describe('useRealtimeSync', () => {
|
||||
let queryClient: QueryClient
|
||||
let mockSSEOnMessage: ((msg: { type: string; data: unknown }) => void) | null = null
|
||||
|
||||
beforeEach(() => {
|
||||
queryClient = new QueryClient({
|
||||
defaultOptions: { queries: { retry: false } },
|
||||
})
|
||||
mockSSEOnMessage = null
|
||||
|
||||
// Spy on useSSE to capture the onMessage callback
|
||||
vi.spyOn(useSSEModule, 'useSSE').mockImplementation((opts) => {
|
||||
mockSSEOnMessage = opts?.onMessage ?? null
|
||||
return { status: 'connected' }
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks()
|
||||
})
|
||||
|
||||
function render() {
|
||||
return renderHook(() => useRealtimeSync(), {
|
||||
wrapper: ({ children }: { children: React.ReactNode }) => (
|
||||
React.createElement(QueryClientProvider, { client: queryClient }, children)
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
it('invalidates ["agents"] on agent.status event', async () => {
|
||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||
render()
|
||||
|
||||
const msg: SSEMessage = {
|
||||
type: 'agent.status',
|
||||
data: { agentId: 'a1', status: 'active' },
|
||||
}
|
||||
mockSSEOnMessage!(msg)
|
||||
|
||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
||||
expect(invalidateSpy).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('invalidates ["tasks"] and ["agents"] on agent.task event', async () => {
|
||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||
render()
|
||||
|
||||
const msg: SSEMessage = {
|
||||
type: 'agent.task',
|
||||
data: { agentId: 'a1', taskId: 't1', title: 'Test', action: 'assigned' },
|
||||
}
|
||||
mockSSEOnMessage!(msg)
|
||||
|
||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
||||
expect(invalidateSpy).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('invalidates ["tasks"] and ["agents"] on agent.progress event', async () => {
|
||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||
render()
|
||||
|
||||
const msg: SSEMessage = {
|
||||
type: 'agent.progress',
|
||||
data: { agentId: 'a1', taskId: 't1', progress: 50, message: 'working' },
|
||||
}
|
||||
mockSSEOnMessage!(msg)
|
||||
|
||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
||||
expect(invalidateSpy).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('invalidates ["agents"], ["sessions"], ["tasks"] on fleet.update event', async () => {
|
||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||
render()
|
||||
|
||||
const msg: SSEMessage = {
|
||||
type: 'fleet.update',
|
||||
data: { timestamp: '2026-05-20T12:00:00Z', agentCount: 5 },
|
||||
}
|
||||
mockSSEOnMessage!(msg)
|
||||
|
||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['sessions'] })
|
||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
||||
expect(invalidateSpy).toHaveBeenCalledTimes(3)
|
||||
})
|
||||
|
||||
it('does nothing on connected event', async () => {
|
||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||
render()
|
||||
|
||||
const msg: SSEMessage = {
|
||||
type: 'connected',
|
||||
data: { clientCount: 1 },
|
||||
}
|
||||
mockSSEOnMessage!(msg)
|
||||
|
||||
expect(invalidateSpy).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('does nothing on unknown event types', async () => {
|
||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
||||
render()
|
||||
|
||||
mockSSEOnMessage!({ type: 'unknown.event', data: {} })
|
||||
|
||||
expect(invalidateSpy).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('returns sseStatus from useSSE', () => {
|
||||
const { result } = render()
|
||||
expect(result.current.sseStatus).toBe('connected')
|
||||
})
|
||||
})
|
||||
64
frontend/src/hooks/useRealtimeSync.ts
Normal file
64
frontend/src/hooks/useRealtimeSync.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
/**
|
||||
* useRealtimeSync — mounts the SSE connection once at the app level and
|
||||
* wires incoming events to React Query cache invalidation.
|
||||
*
|
||||
* Event → query key mapping:
|
||||
* agent.status → ['agents']
|
||||
* agent.task → ['tasks'], ['agents']
|
||||
* agent.progress → ['tasks'], ['agents']
|
||||
* fleet.update → ['agents'], ['sessions'], ['tasks']
|
||||
*/
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import { useCallback } from 'react'
|
||||
import { useSSE, type SSEStatus } from './useSSE'
|
||||
import type { SSEMessage } from '../services/sse'
|
||||
|
||||
export function useRealtimeSync(): { sseStatus: SSEStatus } {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
const handleMessage = useCallback(
|
||||
(raw: { type: string; data: unknown }) => {
|
||||
// Cast to discriminated union — the backend contract guarantees these shapes
|
||||
const msg = raw as SSEMessage
|
||||
|
||||
switch (msg.type) {
|
||||
case 'agent.status':
|
||||
// msg.data: AgentStatusEvent { agentId, status, reason? }
|
||||
void msg.data.agentId // retained for type-narrowing — ensures payload matches contract
|
||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||
break
|
||||
|
||||
case 'agent.task':
|
||||
// msg.data: AgentTaskEvent { agentId, taskId, title, action }
|
||||
void msg.data.agentId
|
||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||
break
|
||||
|
||||
case 'agent.progress':
|
||||
// msg.data: AgentProgressEvent { agentId, taskId, progress, message? }
|
||||
void msg.data.agentId
|
||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||
break
|
||||
|
||||
case 'fleet.update':
|
||||
// msg.data: FleetUpdateEvent { timestamp, agentCount }
|
||||
void msg.data.agentCount
|
||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
||||
queryClient.invalidateQueries({ queryKey: ['sessions'] })
|
||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
||||
break
|
||||
|
||||
default:
|
||||
// 'connected' and unknown events — no action needed
|
||||
break
|
||||
}
|
||||
},
|
||||
[queryClient],
|
||||
)
|
||||
|
||||
const { status: sseStatus } = useSSE({ onMessage: handleMessage })
|
||||
|
||||
return { sseStatus }
|
||||
}
|
||||
267
frontend/src/hooks/useSSE.test.ts
Normal file
267
frontend/src/hooks/useSSE.test.ts
Normal file
@@ -0,0 +1,267 @@
|
||||
/**
|
||||
* Tests for useSSE — SSE connection lifecycle, back-off, event parsing, and cleanup.
|
||||
*
|
||||
* jsdom does not include EventSource, so we mock it completely.
|
||||
*/
|
||||
import { renderHook, act, waitFor } from '@testing-library/react'
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { useSSE } from './useSSE'
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mock EventSource — defined as a plain class so `new EventSource()` works
|
||||
// ---------------------------------------------------------------------------
|
||||
class MockEventSource {
|
||||
url: string
|
||||
onopen: (() => void) | null = null
|
||||
onerror: ((evt: Event) => void) | null = null
|
||||
onmessage: ((evt: MessageEvent) => void) | null = null
|
||||
private listeners: Map<string, Array<(evt: Event) => void>> = new Map()
|
||||
readyState: number = 0
|
||||
|
||||
constructor(url: string) {
|
||||
this.url = url
|
||||
}
|
||||
|
||||
addEventListener(type: string, handler: (evt: Event) => void) {
|
||||
if (!this.listeners.has(type)) this.listeners.set(type, [])
|
||||
this.listeners.get(type)!.push(handler)
|
||||
}
|
||||
|
||||
removeEventListener() { /* no-op for tests */ }
|
||||
|
||||
close() {
|
||||
this.readyState = 2
|
||||
this.onopen = null
|
||||
this.onerror = null
|
||||
this.onmessage = null
|
||||
this.listeners.clear()
|
||||
}
|
||||
|
||||
// Test helpers
|
||||
_simulateOpen() { this.onopen?.() }
|
||||
_simulateError() { this.onerror?.(new Event('error')) }
|
||||
_simulateNamedEvent(type: string, data: string) {
|
||||
const handlers = this.listeners.get(type)
|
||||
if (handlers) {
|
||||
const evt = new MessageEvent(type, { data }) as Event
|
||||
handlers.forEach((h) => h(evt))
|
||||
}
|
||||
}
|
||||
_simulateMessage(data: string) {
|
||||
this.onmessage?.(new MessageEvent('message', { data }) as MessageEvent)
|
||||
}
|
||||
|
||||
static readonly CONNECTING = 0
|
||||
static readonly OPEN = 1
|
||||
static readonly CLOSED = 2
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
let esInstances: MockEventSource[]
|
||||
|
||||
describe('useSSE', () => {
|
||||
beforeEach(() => {
|
||||
esInstances = []
|
||||
// Replace global EventSource with our mock class
|
||||
Object.defineProperty(globalThis, 'EventSource', {
|
||||
// The mock must use a class for `new EventSource()` to work
|
||||
value: class extends MockEventSource {
|
||||
constructor(url: string) {
|
||||
super(url)
|
||||
esInstances.push(this)
|
||||
}
|
||||
},
|
||||
writable: true,
|
||||
configurable: true,
|
||||
})
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true })
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks()
|
||||
vi.useRealTimers()
|
||||
})
|
||||
|
||||
// ── Initial connection ──────────────────────────────────────────────────
|
||||
it('starts in "connecting" state and creates an EventSource', () => {
|
||||
const { result } = renderHook(() => useSSE({ url: '/api/events' }))
|
||||
|
||||
expect(result.current.status).toBe('connecting')
|
||||
expect(esInstances.length).toBeGreaterThanOrEqual(1)
|
||||
expect(esInstances[0].url).toBe('/api/events')
|
||||
})
|
||||
|
||||
it('transitions to "connected" on open', async () => {
|
||||
const onOpen = vi.fn()
|
||||
const { result } = renderHook(() => useSSE({ url: '/api/events', onOpen }))
|
||||
|
||||
act(() => { esInstances[0]._simulateOpen() })
|
||||
|
||||
await waitFor(() => {
|
||||
expect(result.current.status).toBe('connected')
|
||||
})
|
||||
expect(onOpen).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
// ── Reconnection with exponential back-off ──────────────────────────────
|
||||
it('retries after error with exponential back-off', async () => {
|
||||
const { result } = renderHook(() =>
|
||||
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 30000 }),
|
||||
)
|
||||
|
||||
// First error → reconnecting, retry at 1s
|
||||
act(() => { esInstances[0]._simulateError() })
|
||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||
expect(esInstances).toHaveLength(1)
|
||||
|
||||
// Advance 1000ms → second EventSource created
|
||||
act(() => { vi.advanceTimersByTime(1000) })
|
||||
expect(esInstances).toHaveLength(2)
|
||||
|
||||
// Second error → reconnecting, retry at 2s
|
||||
act(() => { esInstances[1]._simulateError() })
|
||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||
act(() => { vi.advanceTimersByTime(2000) })
|
||||
expect(esInstances).toHaveLength(3)
|
||||
|
||||
// Third error → reconnecting, retry at 4s
|
||||
act(() => { esInstances[2]._simulateError() })
|
||||
act(() => { vi.advanceTimersByTime(4000) })
|
||||
expect(esInstances).toHaveLength(4)
|
||||
})
|
||||
|
||||
it('caps reconnect delay at reconnectMaxMs', async () => {
|
||||
renderHook(() =>
|
||||
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 10000 }),
|
||||
)
|
||||
|
||||
// Force 5 errors to push the exponent past the cap
|
||||
for (let i = 0; i < 5; i++) {
|
||||
act(() => { esInstances[i]._simulateError() })
|
||||
const expectedDelay = Math.min(1000 * 2 ** i, 10000)
|
||||
act(() => { vi.advanceTimersByTime(expectedDelay) })
|
||||
}
|
||||
|
||||
// 6 ES instances created (initial + 5 retries)
|
||||
expect(esInstances).toHaveLength(6)
|
||||
})
|
||||
|
||||
// ── Circuit-breaker (max retries) ───────────────────────────────────────
|
||||
it('transitions to "error" after reconnectLimit is exceeded', async () => {
|
||||
const { result } = renderHook(() =>
|
||||
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 2 }),
|
||||
)
|
||||
|
||||
// First error → reconnecting
|
||||
act(() => { esInstances[0]._simulateError() })
|
||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||
|
||||
// Advance → retry
|
||||
act(() => { vi.advanceTimersByTime(100) })
|
||||
|
||||
// Second error → reconnecting (attempt 2, still ≤ limit)
|
||||
act(() => { esInstances[1]._simulateError() })
|
||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||
act(() => { vi.advanceTimersByTime(200) })
|
||||
|
||||
// Third error → limit exceeded (3 > 2) → error
|
||||
act(() => { esInstances[2]._simulateError() })
|
||||
await waitFor(() => { expect(result.current.status).toBe('error') })
|
||||
})
|
||||
|
||||
it('resets reconnect counter on successful connection', async () => {
|
||||
const { result } = renderHook(() =>
|
||||
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 3 }),
|
||||
)
|
||||
|
||||
// Two errors then a successful connect
|
||||
act(() => { esInstances[0]._simulateError() })
|
||||
act(() => { vi.advanceTimersByTime(100) })
|
||||
|
||||
act(() => { esInstances[1]._simulateOpen() })
|
||||
await waitFor(() => { expect(result.current.status).toBe('connected') })
|
||||
|
||||
// Now error again — counter should be reset, so we get fresh attempts
|
||||
act(() => { esInstances[1]._simulateError() })
|
||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
||||
expect(result.current.status).toBe('reconnecting')
|
||||
})
|
||||
|
||||
// ── Cleanup on unmount ───────────────────────────────────────────────────
|
||||
it('closes EventSource on unmount', () => {
|
||||
const closeSpy = vi.spyOn(MockEventSource.prototype, 'close')
|
||||
const { unmount } = renderHook(() => useSSE({ url: '/api/events' }))
|
||||
|
||||
unmount()
|
||||
expect(closeSpy).toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('does not update state after unmount', async () => {
|
||||
const { result, unmount } = renderHook(() => useSSE({ url: '/api/events' }))
|
||||
|
||||
unmount()
|
||||
|
||||
// These should be no-ops after unmount (mountedRef guards)
|
||||
act(() => { esInstances[0]._simulateOpen() })
|
||||
act(() => { esInstances[0]._simulateError() })
|
||||
|
||||
// State should not have changed
|
||||
expect(result.current.status).toBe('connecting')
|
||||
})
|
||||
|
||||
// ── Event parsing ───────────────────────────────────────────────────────
|
||||
it('parses valid JSON data into objects', async () => {
|
||||
const onMessage = vi.fn()
|
||||
renderHook(() => useSSE({ url: '/api/events', onMessage }))
|
||||
|
||||
act(() => {
|
||||
esInstances[0]._simulateNamedEvent('agent.status', JSON.stringify({ agentId: 'a1', status: 'active' }))
|
||||
})
|
||||
|
||||
expect(onMessage).toHaveBeenCalledWith({
|
||||
type: 'agent.status',
|
||||
data: { agentId: 'a1', status: 'active' },
|
||||
})
|
||||
})
|
||||
|
||||
it('passes invalid JSON through as raw string', async () => {
|
||||
const onMessage = vi.fn()
|
||||
renderHook(() => useSSE({ url: '/api/events', onMessage }))
|
||||
|
||||
act(() => {
|
||||
esInstances[0]._simulateNamedEvent('agent.status', 'not valid json {{{')
|
||||
})
|
||||
|
||||
expect(onMessage).toHaveBeenCalledWith({
|
||||
type: 'agent.status',
|
||||
data: 'not valid json {{{',
|
||||
})
|
||||
})
|
||||
|
||||
// ── enabled=false skips connection ──────────────────────────────────────
|
||||
it('does not create EventSource when enabled=false', () => {
|
||||
const { result } = renderHook(() => useSSE({ url: '/api/events', enabled: false }))
|
||||
|
||||
expect(esInstances).toHaveLength(0)
|
||||
expect(result.current.status).toBe('connecting')
|
||||
})
|
||||
|
||||
// ── onError callback ────────────────────────────────────────────────────
|
||||
it('calls onError on connection failure', async () => {
|
||||
const onError = vi.fn()
|
||||
renderHook(() =>
|
||||
useSSE({ url: '/api/events', onError, reconnectBaseMs: 100 }),
|
||||
)
|
||||
|
||||
act(() => { esInstances[0]._simulateError() })
|
||||
expect(onError).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
// ── Default URL ─────────────────────────────────────────────────────────
|
||||
it('uses /api/events as default URL', () => {
|
||||
renderHook(() => useSSE())
|
||||
expect(esInstances[0].url).toBe('/api/events')
|
||||
})
|
||||
})
|
||||
180
frontend/src/hooks/useSSE.ts
Normal file
180
frontend/src/hooks/useSSE.ts
Normal file
@@ -0,0 +1,180 @@
|
||||
import { useEffect, useRef, useCallback, useState } from 'react'
|
||||
|
||||
/** SSE connection state reported to consumers. */
|
||||
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
|
||||
|
||||
/** Typed SSE event received from the backend. */
|
||||
export interface SSEMessage {
|
||||
/** event: field from the SSE frame */
|
||||
type: string
|
||||
/** parsed JSON from the data: field */
|
||||
data: unknown
|
||||
}
|
||||
|
||||
export interface UseSSEOptions {
|
||||
/** Endpoint URL — defaults to /api/events */
|
||||
url?: string
|
||||
/** Called for every SSE message (all event types) */
|
||||
onMessage?: (msg: SSEMessage) => void
|
||||
/** Called when connection opens or reconnects */
|
||||
onOpen?: () => void
|
||||
/** Called on every connection error (both transient and terminal) */
|
||||
onError?: (err: Event) => void
|
||||
/** Base delay in ms before the first reconnect attempt (default 1 000) */
|
||||
reconnectBaseMs?: number
|
||||
/** Maximum reconnect delay in ms (default 30 000) */
|
||||
reconnectMaxMs?: number
|
||||
/**
|
||||
* Maximum number of consecutive reconnect attempts before giving up.
|
||||
* When the limit is reached, status transitions to 'error'.
|
||||
* Default undefined (unlimited).
|
||||
*/
|
||||
reconnectLimit?: number
|
||||
/** Set false to disable auto-connect (useful in tests) */
|
||||
enabled?: boolean
|
||||
}
|
||||
|
||||
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
|
||||
|
||||
/**
|
||||
* useSSE — mounts a persistent SSE connection to the Control Center backend.
|
||||
*
|
||||
* Handles:
|
||||
* - Initial connection on mount
|
||||
* - Exponential back-off reconnection on drop (1s → 2s → 4s … capped at reconnectMaxMs)
|
||||
* - Circuit-breaker: after reconnectLimit consecutive failures, transitions to 'error'
|
||||
* - Cleanup on unmount
|
||||
* - All five event types: agent.status, agent.task, agent.progress, fleet.update, connected
|
||||
*
|
||||
* The 'connected' SSE event is an application-level handshake sent by the backend
|
||||
* after the transport opens. This is distinct from onOpen, which fires at the
|
||||
* transport level when the EventSource HTTP connection is established.
|
||||
*/
|
||||
export function useSSE({
|
||||
url = '/api/events',
|
||||
onMessage,
|
||||
onOpen,
|
||||
onError,
|
||||
reconnectBaseMs = 1_000,
|
||||
reconnectMaxMs = 30_000,
|
||||
reconnectLimit,
|
||||
enabled = true,
|
||||
}: UseSSEOptions = {}): { status: SSEStatus } {
|
||||
const [status, setStatus] = useState<SSEStatus>('connecting')
|
||||
|
||||
// Stable refs so the effect doesn't need to re-run when callbacks change
|
||||
const onMessageRef = useRef(onMessage)
|
||||
const onOpenRef = useRef(onOpen)
|
||||
const onErrorRef = useRef(onError)
|
||||
onMessageRef.current = onMessage
|
||||
onOpenRef.current = onOpen
|
||||
onErrorRef.current = onError
|
||||
|
||||
const reconnectAttemptRef = useRef(0)
|
||||
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
|
||||
const esRef = useRef<EventSource | null>(null)
|
||||
const mountedRef = useRef(true)
|
||||
|
||||
const clearReconnectTimer = useCallback(() => {
|
||||
if (reconnectTimerRef.current !== null) {
|
||||
clearTimeout(reconnectTimerRef.current)
|
||||
reconnectTimerRef.current = null
|
||||
}
|
||||
}, [])
|
||||
|
||||
const connect = useCallback(() => {
|
||||
if (!mountedRef.current || !enabled) return
|
||||
|
||||
// Clean up any existing connection
|
||||
if (esRef.current) {
|
||||
esRef.current.close()
|
||||
esRef.current = null
|
||||
}
|
||||
|
||||
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
|
||||
|
||||
const es = new EventSource(url)
|
||||
esRef.current = es
|
||||
|
||||
es.onopen = () => {
|
||||
if (!mountedRef.current) return
|
||||
reconnectAttemptRef.current = 0
|
||||
setStatus('connected')
|
||||
onOpenRef.current?.()
|
||||
}
|
||||
|
||||
es.onerror = (evt) => {
|
||||
if (!mountedRef.current) return
|
||||
|
||||
// EventSource auto-retries but we manage our own to get back-off control
|
||||
es.close()
|
||||
esRef.current = null
|
||||
|
||||
onErrorRef.current?.(evt)
|
||||
|
||||
reconnectAttemptRef.current += 1
|
||||
|
||||
// Circuit-breaker: give up after reconnectLimit consecutive failures
|
||||
if (reconnectLimit !== undefined && reconnectAttemptRef.current > reconnectLimit) {
|
||||
setStatus('error')
|
||||
return
|
||||
}
|
||||
|
||||
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
|
||||
// Note: attempt is 1-based here (already incremented), so we use attempt-1 for the exponent
|
||||
const delay = Math.min(
|
||||
reconnectBaseMs * 2 ** (reconnectAttemptRef.current - 1),
|
||||
reconnectMaxMs,
|
||||
)
|
||||
setStatus('reconnecting')
|
||||
|
||||
clearReconnectTimer()
|
||||
reconnectTimerRef.current = setTimeout(() => {
|
||||
if (mountedRef.current) connect()
|
||||
}, delay)
|
||||
}
|
||||
|
||||
// Register listeners for all known event types
|
||||
for (const eventType of SSE_EVENTS) {
|
||||
es.addEventListener(eventType, (evt: MessageEvent) => {
|
||||
if (!mountedRef.current) return
|
||||
let data: unknown = evt.data
|
||||
try {
|
||||
data = JSON.parse(evt.data as string)
|
||||
} catch {
|
||||
// leave as raw string
|
||||
}
|
||||
onMessageRef.current?.({ type: eventType, data })
|
||||
})
|
||||
}
|
||||
|
||||
// Catch-all for unnamed events (type == 'message').
|
||||
// Won't fire for the named events registered via addEventListener above.
|
||||
es.onmessage = (evt: MessageEvent) => {
|
||||
if (!mountedRef.current) return
|
||||
let data: unknown = evt.data
|
||||
try {
|
||||
data = JSON.parse(evt.data as string)
|
||||
} catch {
|
||||
// leave as raw string
|
||||
}
|
||||
onMessageRef.current?.({ type: 'message', data })
|
||||
}
|
||||
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, reconnectLimit, clearReconnectTimer])
|
||||
|
||||
useEffect(() => {
|
||||
mountedRef.current = true
|
||||
if (enabled) connect()
|
||||
|
||||
return () => {
|
||||
mountedRef.current = false
|
||||
clearReconnectTimer()
|
||||
if (esRef.current) {
|
||||
esRef.current.close()
|
||||
esRef.current = null
|
||||
}
|
||||
}
|
||||
}, [connect, enabled, clearReconnectTimer])
|
||||
|
||||
return { status }
|
||||
}
|
||||
@@ -4,13 +4,16 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
|
||||
import { BrowserRouter } from 'react-router-dom'
|
||||
import ErrorBoundary from './components/ErrorBoundary'
|
||||
import { ThemeProvider } from './hooks/useTheme'
|
||||
import { SSEProvider } from './contexts/SSEContext'
|
||||
import './index.css'
|
||||
import App from './App'
|
||||
|
||||
const queryClient = new QueryClient({
|
||||
defaultOptions: {
|
||||
queries: {
|
||||
staleTime: 30_000,
|
||||
// No polling — real-time updates come through SSE.
|
||||
// staleTime is kept high; data is pushed, not pulled.
|
||||
staleTime: 60_000,
|
||||
refetchOnWindowFocus: false,
|
||||
retry: 1,
|
||||
},
|
||||
@@ -22,9 +25,13 @@ createRoot(document.getElementById('root')!).render(
|
||||
<ErrorBoundary>
|
||||
<ThemeProvider>
|
||||
<QueryClientProvider client={queryClient}>
|
||||
<BrowserRouter>
|
||||
<App />
|
||||
</BrowserRouter>
|
||||
{/* SSEProvider must live inside QueryClientProvider so it can call
|
||||
useQueryClient() to invalidate caches on incoming events. */}
|
||||
<SSEProvider>
|
||||
<BrowserRouter>
|
||||
<App />
|
||||
</BrowserRouter>
|
||||
</SSEProvider>
|
||||
</QueryClientProvider>
|
||||
</ThemeProvider>
|
||||
</ErrorBoundary>
|
||||
|
||||
@@ -1,18 +1,36 @@
|
||||
import { useTheme } from '../hooks/useTheme'
|
||||
import { useLocalStorage } from '../hooks/useLocalStorage'
|
||||
import { Sun, Moon, Monitor, Zap, Clock } from 'lucide-react'
|
||||
import { useSSEContext } from '../contexts/SSEContext'
|
||||
import { Sun, Moon, Monitor, Zap, Radio } from 'lucide-react'
|
||||
|
||||
const REFRESH_PRESETS = [
|
||||
{ label: '5s', value: 5_000 },
|
||||
{ label: '10s', value: 10_000 },
|
||||
{ label: '30s', value: 30_000 },
|
||||
{ label: '60s', value: 60_000 },
|
||||
]
|
||||
const SSE_STATUS_COPY: Record<string, { label: string; description: string; color: string }> = {
|
||||
connected: {
|
||||
label: 'Connected',
|
||||
description: 'Real-time updates are active. Agent status, tasks, and progress stream live.',
|
||||
color: 'text-green-500',
|
||||
},
|
||||
connecting: {
|
||||
label: 'Connecting…',
|
||||
description: 'Establishing SSE connection to the backend.',
|
||||
color: 'text-yellow-500',
|
||||
},
|
||||
reconnecting: {
|
||||
label: 'Reconnecting…',
|
||||
description: 'Connection lost. Retrying with exponential back-off.',
|
||||
color: 'text-yellow-500',
|
||||
},
|
||||
error: {
|
||||
label: 'Disconnected',
|
||||
description: 'Could not connect to the SSE endpoint. Check that the backend is reachable.',
|
||||
color: 'text-red-500',
|
||||
},
|
||||
}
|
||||
|
||||
export default function SettingsPage() {
|
||||
const { isDark, toggleTheme } = useTheme()
|
||||
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
|
||||
const [refreshInterval, setRefreshInterval] = useLocalStorage('cc-refresh-interval', 30_000)
|
||||
const { sseStatus } = useSSEContext()
|
||||
const sseInfo = SSE_STATUS_COPY[sseStatus] ?? SSE_STATUS_COPY.error
|
||||
|
||||
return (
|
||||
<div className="space-y-8 max-w-2xl">
|
||||
@@ -80,45 +98,31 @@ export default function SettingsPage() {
|
||||
</div>
|
||||
</section>
|
||||
|
||||
{/* Refresh */}
|
||||
{/* Real-time connection status */}
|
||||
<section className="space-y-4">
|
||||
<h2 className="text-lg font-semibold flex items-center gap-2">
|
||||
<Clock size={20} className="text-primary" />
|
||||
Auto Refresh
|
||||
<Radio size={20} className="text-primary" />
|
||||
Real-time Updates
|
||||
</h2>
|
||||
|
||||
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
|
||||
<p className="text-sm text-on-surface-variant">Data refresh interval for agent status and logs</p>
|
||||
|
||||
<div className="flex flex-col gap-2">
|
||||
<input
|
||||
type="range"
|
||||
min="0"
|
||||
max="3"
|
||||
step="1"
|
||||
value={REFRESH_PRESETS.findIndex((p) => p.value === refreshInterval)}
|
||||
onChange={(e) => {
|
||||
const idx = parseInt(e.target.value)
|
||||
setRefreshInterval(REFRESH_PRESETS[idx].value)
|
||||
}}
|
||||
className="w-full accent-primary"
|
||||
/>
|
||||
<div className="flex justify-between text-xs text-on-surface-muted">
|
||||
{REFRESH_PRESETS.map((p) => (
|
||||
<button
|
||||
key={p.label}
|
||||
onClick={() => setRefreshInterval(p.value)}
|
||||
className={`px-3 py-1 rounded-lg transition-colors ${
|
||||
refreshInterval === p.value
|
||||
? 'bg-primary/10 text-primary'
|
||||
: 'hover:bg-surface-light'
|
||||
}`}
|
||||
>
|
||||
{p.label}
|
||||
</button>
|
||||
))}
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="font-medium">SSE Connection</p>
|
||||
<p className="text-sm text-on-surface-variant mt-0.5">{sseInfo.description}</p>
|
||||
</div>
|
||||
<span className={`text-sm font-semibold whitespace-nowrap ${sseInfo.color}`}>
|
||||
{sseInfo.label}
|
||||
</span>
|
||||
</div>
|
||||
<p className="text-xs text-on-surface-muted">
|
||||
Endpoint: <code className="bg-surface-light px-1.5 py-0.5 rounded text-on-surface-variant">/api/events</code>
|
||||
· Events: agent.status, agent.task, agent.progress, fleet.update
|
||||
</p>
|
||||
<p className="text-xs text-on-surface-muted">
|
||||
Polling is disabled. All status updates are pushed from the server over a persistent SSE connection.
|
||||
The client reconnects automatically with exponential back-off on drop.
|
||||
</p>
|
||||
</div>
|
||||
</section>
|
||||
</div>
|
||||
|
||||
72
frontend/src/services/sse.ts
Normal file
72
frontend/src/services/sse.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
/**
|
||||
* SSE event payload types matching the Go backend (internal/handler/sse.go).
|
||||
*
|
||||
* Event format on the wire:
|
||||
* event: <eventType>
|
||||
* data: <json>
|
||||
*
|
||||
* The types below define the backend contract. The SSEPayloadMap maps
|
||||
* each event type string to its expected payload shape. SSEMessage is a
|
||||
* discriminated union on `type` — when you switch on msg.type, TypeScript
|
||||
* narrows msg.data to the correct payload interface automatically.
|
||||
*/
|
||||
|
||||
import type { AgentStatus } from '../types'
|
||||
|
||||
/** agent.status — agent came online, went offline, changed state */
|
||||
export interface AgentStatusEvent {
|
||||
agentId: string
|
||||
status: AgentStatus
|
||||
/** Optional human-readable reason (e.g. error message) */
|
||||
reason?: string
|
||||
}
|
||||
|
||||
/** agent.task — a task was assigned to or completed by an agent */
|
||||
export interface AgentTaskEvent {
|
||||
agentId: string
|
||||
taskId: string
|
||||
title: string
|
||||
action: 'assigned' | 'completed' | 'failed'
|
||||
}
|
||||
|
||||
/** agent.progress — incremental progress update for a running task */
|
||||
export interface AgentProgressEvent {
|
||||
agentId: string
|
||||
taskId: string
|
||||
progress: number
|
||||
/** Optional description of what is currently happening */
|
||||
message?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* fleet.update — bulk refresh of all agents (e.g. after a deployment).
|
||||
* The backend may send partial or complete agent state.
|
||||
*/
|
||||
export interface FleetUpdateEvent {
|
||||
/** ISO timestamp of when the snapshot was taken */
|
||||
timestamp: string
|
||||
/** Number of agents in the fleet */
|
||||
agentCount: number
|
||||
}
|
||||
|
||||
/** Union of all SSE data payloads keyed by event type. */
|
||||
export type SSEPayloadMap = {
|
||||
'agent.status': AgentStatusEvent
|
||||
'agent.task': AgentTaskEvent
|
||||
'agent.progress': AgentProgressEvent
|
||||
'fleet.update': FleetUpdateEvent
|
||||
connected: { clientCount: number }
|
||||
message: unknown
|
||||
}
|
||||
|
||||
/**
|
||||
* Discriminated SSE message — the `type` field narrows `data` via SSEPayloadMap.
|
||||
*
|
||||
* Usage:
|
||||
* if (msg.type === 'agent.status') {
|
||||
* msg.data.agentId // ✅ TypeScript knows this is AgentStatusEvent
|
||||
* }
|
||||
*/
|
||||
export type SSEMessage = {
|
||||
[K in keyof SSEPayloadMap]: { type: K; data: SSEPayloadMap[K] }
|
||||
}[keyof SSEPayloadMap]
|
||||
1
frontend/src/test-setup.ts
Normal file
1
frontend/src/test-setup.ts
Normal file
@@ -0,0 +1 @@
|
||||
import '@testing-library/jest-dom'
|
||||
@@ -1,4 +1,4 @@
|
||||
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error'
|
||||
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' | 'offline'
|
||||
|
||||
export interface Agent {
|
||||
id: string
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
"target": "es2023",
|
||||
"lib": ["ES2023", "DOM"],
|
||||
"module": "esnext",
|
||||
"types": ["vite/client"],
|
||||
"types": ["vite/client", "vitest/globals"],
|
||||
"skipLibCheck": true,
|
||||
|
||||
/* Bundler mode */
|
||||
|
||||
@@ -20,5 +20,5 @@
|
||||
"erasableSyntaxOnly": true,
|
||||
"noFallthroughCasesInSwitch": true
|
||||
},
|
||||
"include": ["vite.config.ts"]
|
||||
"include": ["vite.config.ts", "vitest.config.ts"]
|
||||
}
|
||||
|
||||
11
frontend/vitest.config.ts
Normal file
11
frontend/vitest.config.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { defineConfig } from 'vitest/config'
|
||||
import react from '@vitejs/plugin-react'
|
||||
|
||||
export default defineConfig({
|
||||
plugins: [react()],
|
||||
test: {
|
||||
environment: 'jsdom',
|
||||
globals: true,
|
||||
setupFiles: ['./src/test-setup.ts'],
|
||||
},
|
||||
})
|
||||
@@ -112,7 +112,7 @@ func main() {
|
||||
<-quit
|
||||
slog.Info("shutting down server...")
|
||||
|
||||
cancel() // stop gateway polling
|
||||
cancel() // stop gateway clients
|
||||
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer shutdownCancel()
|
||||
@@ -136,4 +136,4 @@ func parseLogLevel(level string) slog.Level {
|
||||
default:
|
||||
return slog.LevelInfo
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,30 +10,30 @@ import (
|
||||
|
||||
// Config holds all application configuration.
|
||||
type Config struct {
|
||||
Port int
|
||||
DatabaseURL string
|
||||
CORSOrigin string
|
||||
LogLevel string
|
||||
Environment string
|
||||
GatewayRestURL string
|
||||
GatewayRestPollInterval time.Duration
|
||||
WSGatewayURL string
|
||||
WSGatewayToken string
|
||||
Port int
|
||||
DatabaseURL string
|
||||
CORSOrigin string
|
||||
LogLevel string
|
||||
Environment string
|
||||
GatewayRestURL string
|
||||
GatewayRestPollInterval time.Duration
|
||||
WSGatewayURL string
|
||||
WSGatewayToken string
|
||||
}
|
||||
|
||||
// Load reads configuration from environment variables, applying defaults where
|
||||
// values are not set. All secrets come from the environment — nothing is hardcoded.
|
||||
func Load() *Config {
|
||||
return &Config{
|
||||
Port: getEnvInt("PORT", 8080),
|
||||
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
|
||||
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||
Environment: getEnv("ENVIRONMENT", "development"),
|
||||
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
||||
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
||||
Port: getEnvInt("PORT", 8080),
|
||||
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
|
||||
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||
Environment: getEnv("ENVIRONMENT", "development"),
|
||||
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
||||
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,4 +60,4 @@ func getEnvDuration(key string, fallback time.Duration) time.Duration {
|
||||
}
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,10 @@
|
||||
// Package gateway provides an OpenClaw gateway integration client that
|
||||
// polls agent states, persists them via the repository layer, and broadcasts
|
||||
// changes through the SSE broker for real-time frontend updates.
|
||||
//
|
||||
// When a WSClient is wired via SetWSClient, the REST poller becomes a
|
||||
// fallback: it waits for the WS client to signal readiness, and only starts
|
||||
// polling if WS fails to connect within 30 seconds.
|
||||
package gateway
|
||||
|
||||
import (
|
||||
@@ -29,7 +33,7 @@ type Client struct {
|
||||
broker *handler.Broker
|
||||
wsClient *WSClient // optional WS client; when set, REST is fallback only
|
||||
wsReady chan struct{} // closed once WS connection is established
|
||||
wsReadyOnce sync.Once // protects wsReady close from double-close race
|
||||
wsReadyOnce sync.Once // protects wsReady close from double-close race
|
||||
}
|
||||
|
||||
// Config holds gateway client configuration, typically loaded from environment.
|
||||
@@ -140,7 +144,6 @@ func (c *Client) poll(ctx context.Context) {
|
||||
}
|
||||
|
||||
for _, ga := range agents {
|
||||
// Check if agent already exists; if so, update; otherwise create.
|
||||
existing, err := c.agents.Get(ctx, ga.ID)
|
||||
if err != nil {
|
||||
// Not found — create it
|
||||
@@ -185,51 +188,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
||||
slog.Info("seeding demo agents")
|
||||
demoAgents := []models.AgentCardData{
|
||||
{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Role: "Orchestrator",
|
||||
Status: models.AgentStatusActive,
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Role: "Orchestrator",
|
||||
Status: models.AgentStatusActive,
|
||||
CurrentTask: strPtr("Orchestrating tasks"),
|
||||
SessionKey: "otto-session",
|
||||
Channel: "discord",
|
||||
Channel: "discord",
|
||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||
},
|
||||
{
|
||||
ID: "rex",
|
||||
DisplayName: "Rex",
|
||||
Role: "Frontend Dev",
|
||||
Status: models.AgentStatusIdle,
|
||||
ID: "rex",
|
||||
DisplayName: "Rex",
|
||||
Role: "Frontend Dev",
|
||||
Status: models.AgentStatusIdle,
|
||||
SessionKey: "rex-session",
|
||||
Channel: "discord",
|
||||
Channel: "discord",
|
||||
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
||||
},
|
||||
{
|
||||
ID: "dex",
|
||||
DisplayName: "Dex",
|
||||
Role: "Backend Dev",
|
||||
Status: models.AgentStatusThinking,
|
||||
ID: "dex",
|
||||
DisplayName: "Dex",
|
||||
Role: "Backend Dev",
|
||||
Status: models.AgentStatusThinking,
|
||||
CurrentTask: strPtr("Designing API contracts"),
|
||||
SessionKey: "dex-session",
|
||||
Channel: "discord",
|
||||
Channel: "discord",
|
||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||
},
|
||||
{
|
||||
ID: "hex",
|
||||
DisplayName: "Hex",
|
||||
Role: "Database Specialist",
|
||||
Status: models.AgentStatusActive,
|
||||
ID: "hex",
|
||||
DisplayName: "Hex",
|
||||
Role: "Database Specialist",
|
||||
Status: models.AgentStatusActive,
|
||||
CurrentTask: strPtr("Reviewing schema migrations"),
|
||||
SessionKey: "hex-session",
|
||||
Channel: "discord",
|
||||
Channel: "discord",
|
||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||
},
|
||||
{
|
||||
ID: "pip",
|
||||
DisplayName: "Pip",
|
||||
Role: "Edge Device Dev",
|
||||
Status: models.AgentStatusIdle,
|
||||
ID: "pip",
|
||||
DisplayName: "Pip",
|
||||
Role: "Edge Device Dev",
|
||||
Status: models.AgentStatusIdle,
|
||||
SessionKey: "pip-session",
|
||||
Channel: "discord",
|
||||
Channel: "discord",
|
||||
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
||||
},
|
||||
}
|
||||
@@ -243,4 +246,4 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func strPtr(s string) *string { return &s }
|
||||
func strPtr(s string) *string { return &s }
|
||||
516
go-backend/internal/gateway/events_test.go
Normal file
516
go-backend/internal/gateway/events_test.go
Normal file
@@ -0,0 +1,516 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||
)
|
||||
|
||||
// ── Mock AgentRepo ────────────────────────────────────────────────────────
|
||||
|
||||
type mockAgentRepo struct {
|
||||
mu sync.Mutex
|
||||
agents map[string]models.AgentCardData
|
||||
updateCalls []updateCall
|
||||
}
|
||||
|
||||
type updateCall struct {
|
||||
id string
|
||||
req models.UpdateAgentRequest
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Get(_ context.Context, id string) (models.AgentCardData, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
a, ok := m.agents[id]
|
||||
if !ok {
|
||||
return models.AgentCardData{}, errNotFound
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Update(_ context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
a, ok := m.agents[id]
|
||||
if !ok {
|
||||
return models.AgentCardData{}, errNotFound
|
||||
}
|
||||
|
||||
if req.Status != nil {
|
||||
a.Status = *req.Status
|
||||
}
|
||||
if req.DisplayName != nil {
|
||||
a.DisplayName = *req.DisplayName
|
||||
}
|
||||
if req.Role != nil {
|
||||
a.Role = *req.Role
|
||||
}
|
||||
if req.Channel != nil {
|
||||
a.Channel = *req.Channel
|
||||
}
|
||||
if req.CurrentTask != nil {
|
||||
a.CurrentTask = req.CurrentTask
|
||||
}
|
||||
if req.TaskProgress != nil {
|
||||
a.TaskProgress = req.TaskProgress
|
||||
}
|
||||
if req.TaskElapsed != nil {
|
||||
a.TaskElapsed = req.TaskElapsed
|
||||
}
|
||||
if req.ErrorMessage != nil {
|
||||
a.ErrorMessage = req.ErrorMessage
|
||||
}
|
||||
if req.LastActivityAt != nil {
|
||||
a.LastActivity = *req.LastActivityAt
|
||||
}
|
||||
|
||||
m.agents[id] = a
|
||||
m.updateCalls = append(m.updateCalls, updateCall{id, req})
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Create(_ context.Context, a models.AgentCardData) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.agents[a.ID] = a
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) List(_ context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
var result []models.AgentCardData
|
||||
for _, a := range m.agents {
|
||||
if statusFilter == "" || a.Status == statusFilter {
|
||||
result = append(result, a)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Delete(_ context.Context, id string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.agents, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Count(_ context.Context) (int, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return len(m.agents), nil
|
||||
}
|
||||
|
||||
// errNotFound is returned by the mock repo when an agent is not found.
|
||||
var errNotFound = fmt.Errorf("not found")
|
||||
|
||||
// ── Broadcast capture helper ───────────────────────────────────────────────
|
||||
|
||||
// broadcastCapture wraps a real Broker and captures all broadcasts
|
||||
// via a subscribed channel. Use captured() to retrieve events that have
|
||||
// been received so far. Call close() to unsubscribe when done.
|
||||
type broadcastCapture struct {
|
||||
broker *handler.Broker
|
||||
ch chan handler.SSEEvent
|
||||
}
|
||||
|
||||
func newBroadcastCapture(broker *handler.Broker) *broadcastCapture {
|
||||
return &broadcastCapture{
|
||||
broker: broker,
|
||||
ch: broker.Subscribe(),
|
||||
}
|
||||
}
|
||||
|
||||
// captured drains all pending events from the subscription channel
|
||||
// and returns them. This is synchronous — it only returns events that
|
||||
// have already been sent to the channel.
|
||||
func (bc *broadcastCapture) captured() []handler.SSEEvent {
|
||||
var events []handler.SSEEvent
|
||||
for {
|
||||
select {
|
||||
case evt := <-bc.ch:
|
||||
events = append(events, evt)
|
||||
default:
|
||||
return events
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bc *broadcastCapture) close() {
|
||||
bc.broker.Unsubscribe(bc.ch)
|
||||
}
|
||||
|
||||
// ── Test helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
// newTestWSClient creates a WSClient wired to a mock repo and a real broker.
|
||||
// Returns the client, the mock repo, and a broadcast capture.
|
||||
func newTestWSClient() (*WSClient, *mockAgentRepo, *handler.Broker, *broadcastCapture) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
client := NewWSClient(WSConfig{}, repo, broker, slog.Default())
|
||||
return client, repo, broker, capture
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestHandleSessionsChanged_Active(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["otto"] = models.AgentCardData{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Status: models.AgentStatusIdle,
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"sessionKey": "s1",
|
||||
"agentId": "otto",
|
||||
"status": "running",
|
||||
"totalTokens": 500,
|
||||
"currentTask": "Orchestrating tasks"
|
||||
}`)
|
||||
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
// Verify: agent status updated to active
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["otto"]
|
||||
calls := make([]updateCall, len(repo.updateCalls))
|
||||
copy(calls, repo.updateCalls)
|
||||
repo.mu.Unlock()
|
||||
|
||||
if agent.Status != models.AgentStatusActive {
|
||||
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusActive)
|
||||
}
|
||||
|
||||
// Verify: update was called
|
||||
if len(calls) == 0 {
|
||||
t.Fatal("expected at least one update call")
|
||||
}
|
||||
if calls[0].id != "otto" {
|
||||
t.Errorf("update call agentId = %q, want %q", calls[0].id, "otto")
|
||||
}
|
||||
|
||||
// Verify: broker broadcast "agent.status"
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSessionsChanged_Idle(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["dex"] = models.AgentCardData{
|
||||
ID: "dex",
|
||||
DisplayName: "Dex",
|
||||
Status: models.AgentStatusActive,
|
||||
CurrentTask: strPtr("Writing API"),
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"sessionKey": "s2",
|
||||
"agentId": "dex",
|
||||
"status": "done",
|
||||
"totalTokens": 1000
|
||||
}`)
|
||||
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["dex"]
|
||||
repo.mu.Unlock()
|
||||
|
||||
// Verify: agent goes idle
|
||||
if agent.Status != models.AgentStatusIdle {
|
||||
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusIdle)
|
||||
}
|
||||
|
||||
// Verify: current task cleared (set to empty string)
|
||||
if agent.CurrentTask != nil && *agent.CurrentTask != "" {
|
||||
t.Errorf("current task = %q, want empty (cleared on idle)", *agent.CurrentTask)
|
||||
}
|
||||
|
||||
// Verify: broker fires "agent.status"
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSessionsChanged_ArrayPayload(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusIdle}
|
||||
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
|
||||
|
||||
payload := json.RawMessage(`[
|
||||
{"sessionKey":"s1","agentId":"otto","status":"running","totalTokens":100},
|
||||
{"sessionKey":"s2","agentId":"dex","status":"streaming","totalTokens":200}
|
||||
]`)
|
||||
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
otto := repo.agents["otto"]
|
||||
dex := repo.agents["dex"]
|
||||
repo.mu.Unlock()
|
||||
|
||||
if otto.Status != models.AgentStatusActive {
|
||||
t.Errorf("otto status = %q, want active", otto.Status)
|
||||
}
|
||||
if dex.Status != models.AgentStatusActive {
|
||||
t.Errorf("dex status = %q, want active", dex.Status)
|
||||
}
|
||||
|
||||
// Both should produce broadcasts
|
||||
events := capture.captured()
|
||||
statusCount := 0
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
statusCount++
|
||||
}
|
||||
}
|
||||
if statusCount < 2 {
|
||||
t.Errorf("expected at least 2 agent.status broadcasts, got %d", statusCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSessionsChanged_SkipsEmptyAgentID(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`{"sessionKey":"s1","agentId":"","status":"running"}`)
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) > 0 {
|
||||
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSessionsChanged_UnparseablePayload(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`not json at all`)
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) > 0 {
|
||||
t.Errorf("expected no broadcasts for unparseable payload, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlePresence(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["pip"] = models.AgentCardData{
|
||||
ID: "pip",
|
||||
DisplayName: "Pip",
|
||||
Status: models.AgentStatusActive,
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"agentId": "pip",
|
||||
"connected": true,
|
||||
"lastActivityAt": "2025-01-01T00:00:00Z"
|
||||
}`)
|
||||
|
||||
client.handlePresence(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["pip"]
|
||||
calls := make([]updateCall, len(repo.updateCalls))
|
||||
copy(calls, repo.updateCalls)
|
||||
repo.mu.Unlock()
|
||||
|
||||
// Agent should still be active (connected=true doesn't change status)
|
||||
if agent.Status != models.AgentStatusActive {
|
||||
t.Errorf("agent status = %q, want active", agent.Status)
|
||||
}
|
||||
|
||||
// Update should have been called (for lastActivityAt)
|
||||
if len(calls) == 0 {
|
||||
t.Fatal("expected at least one update call")
|
||||
}
|
||||
|
||||
// Verify broadcast
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlePresence_Disconnect(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["pip"] = models.AgentCardData{
|
||||
ID: "pip",
|
||||
DisplayName: "Pip",
|
||||
Status: models.AgentStatusActive,
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"agentId": "pip",
|
||||
"connected": false
|
||||
}`)
|
||||
|
||||
client.handlePresence(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["pip"]
|
||||
repo.mu.Unlock()
|
||||
|
||||
// Agent should go idle on disconnect
|
||||
if agent.Status != models.AgentStatusIdle {
|
||||
t.Errorf("agent status = %q, want idle after disconnect", agent.Status)
|
||||
}
|
||||
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'agent.status' on disconnect")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlePresence_EmptyAgentID(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`{"agentId":"","connected":true}`)
|
||||
client.handlePresence(payload)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) > 0 {
|
||||
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleAgentConfig(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["rex"] = models.AgentCardData{
|
||||
ID: "rex",
|
||||
DisplayName: "Rex",
|
||||
Role: "Frontend Dev",
|
||||
Status: models.AgentStatusIdle,
|
||||
Channel: "discord",
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"id": "rex",
|
||||
"name": "Rex the Dev",
|
||||
"role": "Senior Frontend",
|
||||
"channel": "telegram"
|
||||
}`)
|
||||
|
||||
client.handleAgentConfig(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["rex"]
|
||||
calls := make([]updateCall, len(repo.updateCalls))
|
||||
copy(calls, repo.updateCalls)
|
||||
repo.mu.Unlock()
|
||||
|
||||
// Verify DisplayName and Role updated
|
||||
if agent.DisplayName != "Rex the Dev" {
|
||||
t.Errorf("displayName = %q, want %q", agent.DisplayName, "Rex the Dev")
|
||||
}
|
||||
if agent.Role != "Senior Frontend" {
|
||||
t.Errorf("role = %q, want %q", agent.Role, "Senior Frontend")
|
||||
}
|
||||
if agent.Channel != "telegram" {
|
||||
t.Errorf("channel = %q, want %q", agent.Channel, "telegram")
|
||||
}
|
||||
|
||||
// Verify update was called
|
||||
if len(calls) == 0 {
|
||||
t.Fatal("expected at least one update call")
|
||||
}
|
||||
|
||||
// Verify broker fires "fleet.update"
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "fleet.update" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'fleet.update'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleAgentConfig_EmptyID(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`{"id":"","name":"Ghost"}`)
|
||||
client.handleAgentConfig(payload)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) > 0 {
|
||||
t.Errorf("expected no broadcasts for empty id, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleAgentConfig_NotFound(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`{"id":"unknown","name":"Ghost","role":"Phantom"}`)
|
||||
client.handleAgentConfig(payload)
|
||||
|
||||
// Agent doesn't exist in repo, so Update will fail → handler logs warning, returns early
|
||||
events := capture.captured()
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "fleet.update" {
|
||||
t.Error("fleet.update should not be broadcast when agent update fails")
|
||||
}
|
||||
}
|
||||
}
|
||||
236
go-backend/internal/gateway/sync_test.go
Normal file
236
go-backend/internal/gateway/sync_test.go
Normal file
@@ -0,0 +1,236 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||
)
|
||||
|
||||
func TestInitialSync(t *testing.T) {
|
||||
_ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
// --- Test agentItemToCard + session merge (the core of initialSync) ---
|
||||
|
||||
agentItems := []agentListItem{
|
||||
{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"},
|
||||
{ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"},
|
||||
}
|
||||
|
||||
sessionItems := []sessionListItem{
|
||||
{SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"},
|
||||
{SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"},
|
||||
}
|
||||
|
||||
// Build sessionByAgent map (mirrors initialSync logic)
|
||||
sessionByAgent := make(map[string]sessionListItem)
|
||||
for _, s := range sessionItems {
|
||||
if s.AgentID != "" {
|
||||
sessionByAgent[s.AgentID] = s
|
||||
}
|
||||
}
|
||||
|
||||
// Merge and verify
|
||||
merged := make([]models.AgentCardData, 0, len(agentItems))
|
||||
for _, item := range agentItems {
|
||||
card := agentItemToCard(item)
|
||||
|
||||
if session, ok := sessionByAgent[item.ID]; ok {
|
||||
card.SessionKey = session.SessionKey
|
||||
card.Status = mapSessionStatus(session.Status)
|
||||
card.LastActivity = session.LastActivityAt
|
||||
|
||||
if session.TotalTokens > 0 {
|
||||
prog := min(session.TotalTokens/100, 100)
|
||||
card.TaskProgress = &prog
|
||||
}
|
||||
}
|
||||
|
||||
merged = append(merged, card)
|
||||
}
|
||||
|
||||
// Verify otto: running → active
|
||||
if merged[0].ID != "otto" {
|
||||
t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto")
|
||||
}
|
||||
if merged[0].Status != models.AgentStatusActive {
|
||||
t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive)
|
||||
}
|
||||
if merged[0].SessionKey != "s1" {
|
||||
t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1")
|
||||
}
|
||||
if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 {
|
||||
t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress)
|
||||
}
|
||||
|
||||
// Verify dex: done → idle
|
||||
if merged[1].ID != "dex" {
|
||||
t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex")
|
||||
}
|
||||
if merged[1].Status != models.AgentStatusIdle {
|
||||
t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle)
|
||||
}
|
||||
if merged[1].SessionKey != "s2" {
|
||||
t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialSync_PersistCreatesNew(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
// Simulate the persist logic from initialSync:
|
||||
// new agents should be created
|
||||
card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"})
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Agent doesn't exist → create
|
||||
_, err := repo.Get(ctx, card.ID)
|
||||
if err == nil {
|
||||
t.Fatal("expected agent to not exist yet")
|
||||
}
|
||||
|
||||
if err := repo.Create(ctx, card); err != nil {
|
||||
t.Fatalf("Create failed: %v", err)
|
||||
}
|
||||
|
||||
got, err := repo.Get(ctx, card.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("Get after Create failed: %v", err)
|
||||
}
|
||||
|
||||
if got.ID != "otto" {
|
||||
t.Errorf("got.ID = %q, want %q", got.ID, "otto")
|
||||
}
|
||||
if got.DisplayName != "Otto" {
|
||||
t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto")
|
||||
}
|
||||
if got.Role != "Orchestrator" {
|
||||
t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialSync_PersistUpdatesExisting(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Pre-populate with existing agent
|
||||
repo.agents["otto"] = models.AgentCardData{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Role: "Old Role",
|
||||
Status: models.AgentStatusIdle,
|
||||
}
|
||||
|
||||
// Simulate initialSync: agent exists, name/role changed → update
|
||||
newName := "Otto Prime"
|
||||
newRole := "Super Orchestrator"
|
||||
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
|
||||
DisplayName: &newName,
|
||||
Role: &newRole,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Update failed: %v", err)
|
||||
}
|
||||
|
||||
got, err := repo.Get(ctx, "otto")
|
||||
if err != nil {
|
||||
t.Fatalf("Get after Update failed: %v", err)
|
||||
}
|
||||
|
||||
if got.DisplayName != "Otto Prime" {
|
||||
t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime")
|
||||
}
|
||||
if got.Role != "Super Orchestrator" {
|
||||
t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialSync_MergesSessionStatus(t *testing.T) {
|
||||
// When initialSync merges session state, an agent whose existing status
|
||||
// differs from the session-derived status should be updated.
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
ctx := context.Background()
|
||||
|
||||
repo.agents["otto"] = models.AgentCardData{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Role: "Orchestrator",
|
||||
Status: models.AgentStatusIdle,
|
||||
}
|
||||
|
||||
// Simulate session merge: session says "running" → agent should go active
|
||||
activeStatus := mapSessionStatus("running")
|
||||
if activeStatus != models.AgentStatusActive {
|
||||
t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus)
|
||||
}
|
||||
|
||||
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
|
||||
Status: &activeStatus,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Update failed: %v", err)
|
||||
}
|
||||
|
||||
got, err := repo.Get(ctx, "otto")
|
||||
if err != nil {
|
||||
t.Fatalf("Get failed: %v", err)
|
||||
}
|
||||
|
||||
if got.Status != models.AgentStatusActive {
|
||||
t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialSync_BroadcastsFleet(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
// Create some agents in the repo
|
||||
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive}
|
||||
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
|
||||
|
||||
// Simulate the final broadcast from initialSync
|
||||
mergedAgents := []models.AgentCardData{
|
||||
repo.agents["otto"],
|
||||
repo.agents["dex"],
|
||||
}
|
||||
broker.Broadcast("fleet.update", mergedAgents)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) == 0 {
|
||||
t.Fatal("expected at least one broadcast event")
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "fleet.update" {
|
||||
found = true
|
||||
// Verify data is the merged agents list
|
||||
agents, ok := evt.Data.([]models.AgentCardData)
|
||||
if !ok {
|
||||
t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data)
|
||||
}
|
||||
if len(agents) != 2 {
|
||||
t.Errorf("fleet.update agents count = %d, want 2", len(agents))
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected fleet.update broadcast event")
|
||||
}
|
||||
}
|
||||
@@ -92,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
||||
|
||||
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
|
||||
type wsFrame struct {
|
||||
Type string `json:"type"` // "req", "res", "event"
|
||||
ID string `json:"id,omitempty"` // request/response correlation
|
||||
Method string `json:"method,omitempty"` // method name (req frames)
|
||||
Event string `json:"event,omitempty"` // event name (event frames)
|
||||
Type string `json:"type"` // "req", "res", "event"
|
||||
ID string `json:"id,omitempty"` // request/response correlation
|
||||
Method string `json:"method,omitempty"` // method name (req frames)
|
||||
Event string `json:"event,omitempty"` // event name (event frames)
|
||||
Params json.RawMessage `json:"params,omitempty"`
|
||||
Result json.RawMessage `json:"result,omitempty"`
|
||||
Error *wsError `json:"error,omitempty"`
|
||||
@@ -229,7 +229,34 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
||||
c.connId = helloOK.ConnID
|
||||
c.connMu.Unlock()
|
||||
|
||||
// Notify REST client that WS is live so it stands down
|
||||
// Step 2b: Register live event handlers BEFORE starting the read
|
||||
// loop. This eliminates the race window where readLoop dispatches
|
||||
// live events as "unhandled" because no handlers are registered yet.
|
||||
// The handlers only depend on c.agents and c.broker, which are wired
|
||||
// in the constructor — they do not need initialSync to have completed.
|
||||
c.registerEventHandlers()
|
||||
|
||||
// Step 2c: Start the read loop in a goroutine so that Send() in
|
||||
// initialSync can receive responses. The read loop goroutine will
|
||||
// continue running after initialSync completes, routing live events
|
||||
// and any future RPC responses. Because handlers are already
|
||||
// registered, any events arriving during or after initialSync are
|
||||
// dispatched correctly.
|
||||
readLoopErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
readLoopErrCh <- c.readLoop(ctx, conn)
|
||||
}()
|
||||
|
||||
// Step 2d: Initial sync — fetch agents + sessions from gateway.
|
||||
// This works because the read loop is active and will route
|
||||
// response frames back to Send() via handleResponse.
|
||||
if err := c.initialSync(ctx); err != nil {
|
||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||
}
|
||||
|
||||
// Notify REST client that WS is live so it stands down.
|
||||
// This must happen AFTER initialSync so that the REST poller
|
||||
// doesn't start polling while we're still syncing.
|
||||
if c.restClient != nil {
|
||||
c.restClient.MarkWSReady()
|
||||
c.logger.Info("ws client notified REST fallback to stand down")
|
||||
@@ -238,16 +265,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
||||
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
||||
c.wsReadyOnce = sync.Once{}
|
||||
|
||||
// Step 2b: Initial sync — fetch agents + sessions from gateway
|
||||
if err := c.initialSync(ctx); err != nil {
|
||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||
}
|
||||
|
||||
// Step 2c: Register live event handlers
|
||||
c.registerEventHandlers()
|
||||
|
||||
// Step 3: Read loop
|
||||
return c.readLoop(ctx, conn)
|
||||
// Step 3: Wait for the read loop goroutine to finish (blocks
|
||||
// until the connection drops or context is cancelled).
|
||||
return <-readLoopErrCh
|
||||
}
|
||||
|
||||
// readChallenge reads the first frame from the gateway, which must be a
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -466,6 +467,236 @@ func TestAgentItemToCard(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// ── 6. Test: Initial sync ordering (readLoop active before Send) ──────────
|
||||
|
||||
// TestConnectAndRun_InitialSyncOrdering verifies that the WS client
|
||||
// completes initial sync successfully. This test would hang/timeout if
|
||||
// readLoop were NOT started before initialSync, because Send() relies on
|
||||
// readLoop→routeFrame→handleResponse to deliver RPC responses.
|
||||
func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||
// Handshake
|
||||
handleHandshake(t, conn)
|
||||
|
||||
// After handshake, respond to RPCs
|
||||
for {
|
||||
var req map[string]any
|
||||
if err := conn.ReadJSON(&req); err != nil {
|
||||
break
|
||||
}
|
||||
reqID, _ := req["id"].(string)
|
||||
method, _ := req["method"].(string)
|
||||
|
||||
var result any
|
||||
switch method {
|
||||
case "agents.list":
|
||||
result = []map[string]any{
|
||||
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
||||
{"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"},
|
||||
}
|
||||
case "sessions.list":
|
||||
result = []map[string]any{
|
||||
{"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"},
|
||||
}
|
||||
default:
|
||||
result = map[string]any{}
|
||||
}
|
||||
|
||||
res := map[string]any{
|
||||
"type": "res",
|
||||
"id": reqID,
|
||||
"ok": true,
|
||||
"result": result,
|
||||
}
|
||||
if err := conn.WriteJSON(res); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
defer srv.Close()
|
||||
|
||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
client.Start(ctx)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait for initial sync to complete by checking repo state.
|
||||
// The agents should be persisted from the RPC responses.
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
repo.mu.Lock()
|
||||
_, ottoOK := repo.agents["otto"]
|
||||
_, dexOK := repo.agents["dex"]
|
||||
repo.mu.Unlock()
|
||||
if ottoOK && dexOK {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
repo.mu.Lock()
|
||||
_, ottoOK := repo.agents["otto"]
|
||||
_, dexOK := repo.agents["dex"]
|
||||
repo.mu.Unlock()
|
||||
|
||||
if !ottoOK {
|
||||
t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()")
|
||||
}
|
||||
if !dexOK {
|
||||
t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()")
|
||||
}
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("WSClient did not shut down cleanly")
|
||||
}
|
||||
}
|
||||
|
||||
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
|
||||
|
||||
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
|
||||
// arriving during initial sync are NOT dropped. This is a regression test
|
||||
// for the race where readLoop started before registerEventHandlers(),
|
||||
// causing events read during that window to be logged as "unhandled" and lost.
|
||||
//
|
||||
// The mock server sends a live event (sessions.changed) right after the
|
||||
// handshake, interleaved with the RPC responses for agents.list and
|
||||
// sessions.list. The test asserts the event is received by the handler.
|
||||
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
// Pre-seed an agent so the event handler can update it.
|
||||
repo.agents["otto"] = models.AgentCardData{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Status: models.AgentStatusIdle,
|
||||
}
|
||||
|
||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||
// Handshake
|
||||
handleHandshake(t, conn)
|
||||
|
||||
// After handshake, process RPCs and inject a live event.
|
||||
for {
|
||||
var req map[string]any
|
||||
if err := conn.ReadJSON(&req); err != nil {
|
||||
break
|
||||
}
|
||||
reqID, _ := req["id"].(string)
|
||||
method, _ := req["method"].(string)
|
||||
|
||||
// Respond to agents.list RPC
|
||||
if method == "agents.list" {
|
||||
// Before responding, inject a live event — simulates
|
||||
// a gateway pushing a presence update during sync.
|
||||
evt := map[string]any{
|
||||
"type": "event",
|
||||
"event": "presence",
|
||||
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
|
||||
}
|
||||
if err := conn.WriteJSON(evt); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Now send the RPC response
|
||||
res := map[string]any{
|
||||
"type": "res",
|
||||
"id": reqID,
|
||||
"ok": true,
|
||||
"result": []map[string]any{
|
||||
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
||||
},
|
||||
}
|
||||
if err := conn.WriteJSON(res); err != nil {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Respond to sessions.list RPC
|
||||
if method == "sessions.list" {
|
||||
res := map[string]any{
|
||||
"type": "res",
|
||||
"id": reqID,
|
||||
"ok": true,
|
||||
"result": []map[string]any{},
|
||||
}
|
||||
if err := conn.WriteJSON(res); err != nil {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Default response for other methods
|
||||
res := map[string]any{
|
||||
"type": "res",
|
||||
"id": reqID,
|
||||
"ok": true,
|
||||
"result": map[string]any{},
|
||||
}
|
||||
if err := conn.WriteJSON(res); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
defer srv.Close()
|
||||
|
||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
client.Start(ctx)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait for the presence event to be processed by checking the repo.
|
||||
// The presence handler updates the agent, so we check for the
|
||||
// lastActivityAt change.
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
var lastActivity string
|
||||
for time.Now().Before(deadline) {
|
||||
repo.mu.Lock()
|
||||
if a, ok := repo.agents["otto"]; ok {
|
||||
lastActivity = a.LastActivity
|
||||
}
|
||||
repo.mu.Unlock()
|
||||
if lastActivity == "2025-05-20T12:30:00Z" {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
if lastActivity != "2025-05-20T12:30:00Z" {
|
||||
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
|
||||
}
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("WSClient did not shut down cleanly")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStrPtr(t *testing.T) {
|
||||
s := "hello"
|
||||
p := strPtr(s)
|
||||
|
||||
Reference in New Issue
Block a user