CUB-200: resolve merge conflicts with dev — adopt dev's consolidated workflows and improved Go gateway code
Some checks failed
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped

This commit is contained in:
Dex
2026-05-20 21:26:17 +00:00
30 changed files with 3547 additions and 233 deletions

View File

@@ -12,16 +12,15 @@ ENVIRONMENT=development
# Format: postgresql://user:password@host:port/database?sslmode=disable
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
# Gateway (OpenClaw) connection — WebSocket (primary)
# WebSocket URL for real-time OpenClaw gateway v3 protocol
GATEWAY_WS_URL=ws://localhost:18789/
# Auth token for the OpenClaw gateway (operator scope)
OPENCLAW_GATEWAY_TOKEN=
# Gateway (OpenClaw) connection
# WebSocket gateway config (primary path)
WS_GATEWAY_URL=ws://host.docker.internal:18789/
# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment)
GATEWAY_TOKEN=
# Gateway (OpenClaw) connection — REST (fallback)
# URL to the OpenClaw gateway API for polling agent states (used only if WS fails)
GATEWAY_URL=http://localhost:18789/api/agents
# Polling interval for agent state updates
# REST poller config (fallback, only used if WS fails to connect)
GATEWAY_URL=http://host.docker.internal:18789/api/agents
# Polling interval for agent state updates (fallback only)
GATEWAY_POLL_INTERVAL=5s
# ── Frontend Variables (via Vite) ───────────────────────────────────────
@@ -33,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
# When using docker-compose, these are set in the services section
# See docker-compose.yml for service-specific environment variables
# ── Database Configuration ─────────────────────────────────────────────
# ── Database Configuration ───────────────────────────────────────────────
# Set in the db service environment section of docker-compose.yml
# POSTGRES_USER=controlcenter
# POSTGRES_PASSWORD=controlcenter
@@ -48,4 +47,4 @@ GATEWAY_POLL_INTERVAL=5s
# For Docker deployment:
# 1. Copy .env.example to .env (backend only)
# 2. Run: docker compose up -d
# 3. Access frontend at http://localhost:3000
# 3. Access frontend at http://localhost:3000

11
ci-image/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM catthehacker/ubuntu:act-latest
# Install Go 1.23
RUN curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
# Install Node 22
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
ENV PATH="/usr/local/go/bin:${PATH}"

View File

@@ -16,6 +16,8 @@ services:
- ENVIRONMENT=production
- PORT=8080
- GATEWAY_URL=http://host.docker.internal:18789/api/agents
- WS_GATEWAY_URL=ws://host.docker.internal:18789/
- GATEWAY_TOKEN=${GATEWAY_TOKEN:-}
depends_on:
db:
condition: service_healthy

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,9 @@
"dev": "vite",
"build": "tsc -b && vite build",
"lint": "eslint .",
"preview": "vite preview"
"preview": "vite preview",
"test": "vitest run",
"test:watch": "vitest"
},
"dependencies": {
"@tanstack/react-query": "^5.100.9",
@@ -20,6 +22,8 @@
"devDependencies": {
"@eslint/js": "^10.0.1",
"@tailwindcss/vite": "^4.2.4",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.2",
"@types/node": "^24.12.2",
"@types/react": "^19.2.14",
"@types/react-dom": "^19.2.3",
@@ -29,10 +33,12 @@
"eslint-plugin-react-hooks": "^7.1.1",
"eslint-plugin-react-refresh": "^0.5.2",
"globals": "^17.5.0",
"jsdom": "^29.1.1",
"postcss": "^8.5.14",
"tailwindcss": "^4.2.4",
"typescript": "~6.0.2",
"typescript-eslint": "^8.58.2",
"vite": "^8.0.10"
"vite": "^8.0.10",
"vitest": "^4.1.7"
}
}

View File

@@ -1,6 +1,8 @@
import { useState } from 'react'
import { NavLink } from 'react-router-dom'
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X } from 'lucide-react'
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X, Wifi, WifiOff, Loader } from 'lucide-react'
import { useSSEContext } from '../contexts/SSEContext'
import type { SSEStatus } from '../hooks/useSSE'
const navItems = [
{ to: '/', icon: Command, label: 'Hub' },
@@ -10,9 +12,29 @@ const navItems = [
{ to: '/settings', icon: Settings, label: 'Settings' },
]
/** Small status pill shown in the sidebar footer and mobile header. */
function SSEStatusBadge({ status, showLabel = false }: { status: SSEStatus; showLabel?: boolean }) {
const cfg = {
connected: { icon: Wifi, color: 'text-green-500', label: 'Live' },
connecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Connecting' },
reconnecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Reconnecting' },
error: { icon: WifiOff, color: 'text-red-500', label: 'Disconnected' },
}[status]
const Icon = cfg.icon
return (
<div className="flex items-center gap-1.5" title={cfg.label}>
<Icon size={14} className={cfg.color} />
{showLabel && <span className={`text-xs ${cfg.color}`}>{cfg.label}</span>}
</div>
)
}
export default function Layout({ children }: { children: React.ReactNode }) {
const [expanded, setExpanded] = useState(false)
const [mobileOpen, setMobileOpen] = useState(false)
const { sseStatus } = useSSEContext()
return (
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
@@ -46,6 +68,15 @@ export default function Layout({ children }: { children: React.ReactNode }) {
</NavLink>
))}
</nav>
{/* SSE connection status — footer of sidebar */}
<div className="px-4 py-3 border-t border-surface-light flex items-center gap-2">
<SSEStatusBadge status={sseStatus} />
{expanded && (
<span className="text-xs text-on-surface-muted whitespace-nowrap">
{sseStatus === 'connected' ? 'Live updates on' : sseStatus}
</span>
)}
</div>
</aside>
{/* Mobile Header + Bottom Nav */}
@@ -54,6 +85,7 @@ export default function Layout({ children }: { children: React.ReactNode }) {
<div className="flex items-center gap-2">
<Command size={22} className="text-primary" />
<span className="font-bold">Control Center</span>
<SSEStatusBadge status={sseStatus} />
</div>
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
{mobileOpen ? <X size={22} /> : <Menu size={22} />}

View File

@@ -0,0 +1,23 @@
/**
* SSEContext — provides SSE connection status throughout the component tree.
* Mount <SSEProvider> once inside QueryClientProvider.
*/
import { createContext, useContext, type ReactNode } from 'react'
import { useRealtimeSync } from '../hooks/useRealtimeSync'
import type { SSEStatus } from '../hooks/useSSE'
interface SSEContextValue {
sseStatus: SSEStatus
}
const SSEContext = createContext<SSEContextValue>({ sseStatus: 'connecting' })
export function SSEProvider({ children }: { children: ReactNode }) {
const { sseStatus } = useRealtimeSync()
return <SSEContext.Provider value={{ sseStatus }}>{children}</SSEContext.Provider>
}
/** Access the SSE connection status from any component. */
export function useSSEContext(): SSEContextValue {
return useContext(SSEContext)
}

View File

@@ -0,0 +1,129 @@
/**
* Tests for useRealtimeSync — event → query invalidation mapping.
*
* Uses .tsx extension so Vite/OXC can parse JSX in the wrapper component.
*/
import { renderHook } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import * as useSSEModule from './useSSE'
import { useRealtimeSync } from './useRealtimeSync'
import React from 'react'
import type { SSEMessage } from '../services/sse'
describe('useRealtimeSync', () => {
let queryClient: QueryClient
let mockSSEOnMessage: ((msg: { type: string; data: unknown }) => void) | null = null
beforeEach(() => {
queryClient = new QueryClient({
defaultOptions: { queries: { retry: false } },
})
mockSSEOnMessage = null
// Spy on useSSE to capture the onMessage callback
vi.spyOn(useSSEModule, 'useSSE').mockImplementation((opts) => {
mockSSEOnMessage = opts?.onMessage ?? null
return { status: 'connected' }
})
})
afterEach(() => {
vi.restoreAllMocks()
})
function render() {
return renderHook(() => useRealtimeSync(), {
wrapper: ({ children }: { children: React.ReactNode }) => (
React.createElement(QueryClientProvider, { client: queryClient }, children)
),
})
}
it('invalidates ["agents"] on agent.status event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.status',
data: { agentId: 'a1', status: 'active' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(1)
})
it('invalidates ["tasks"] and ["agents"] on agent.task event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.task',
data: { agentId: 'a1', taskId: 't1', title: 'Test', action: 'assigned' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(2)
})
it('invalidates ["tasks"] and ["agents"] on agent.progress event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.progress',
data: { agentId: 'a1', taskId: 't1', progress: 50, message: 'working' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(2)
})
it('invalidates ["agents"], ["sessions"], ["tasks"] on fleet.update event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'fleet.update',
data: { timestamp: '2026-05-20T12:00:00Z', agentCount: 5 },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['sessions'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledTimes(3)
})
it('does nothing on connected event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'connected',
data: { clientCount: 1 },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).not.toHaveBeenCalled()
})
it('does nothing on unknown event types', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
mockSSEOnMessage!({ type: 'unknown.event', data: {} })
expect(invalidateSpy).not.toHaveBeenCalled()
})
it('returns sseStatus from useSSE', () => {
const { result } = render()
expect(result.current.sseStatus).toBe('connected')
})
})

View File

@@ -0,0 +1,64 @@
/**
* useRealtimeSync — mounts the SSE connection once at the app level and
* wires incoming events to React Query cache invalidation.
*
* Event → query key mapping:
* agent.status → ['agents']
* agent.task → ['tasks'], ['agents']
* agent.progress → ['tasks'], ['agents']
* fleet.update → ['agents'], ['sessions'], ['tasks']
*/
import { useQueryClient } from '@tanstack/react-query'
import { useCallback } from 'react'
import { useSSE, type SSEStatus } from './useSSE'
import type { SSEMessage } from '../services/sse'
export function useRealtimeSync(): { sseStatus: SSEStatus } {
const queryClient = useQueryClient()
const handleMessage = useCallback(
(raw: { type: string; data: unknown }) => {
// Cast to discriminated union — the backend contract guarantees these shapes
const msg = raw as SSEMessage
switch (msg.type) {
case 'agent.status':
// msg.data: AgentStatusEvent { agentId, status, reason? }
void msg.data.agentId // retained for type-narrowing — ensures payload matches contract
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'agent.task':
// msg.data: AgentTaskEvent { agentId, taskId, title, action }
void msg.data.agentId
queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'agent.progress':
// msg.data: AgentProgressEvent { agentId, taskId, progress, message? }
void msg.data.agentId
queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'fleet.update':
// msg.data: FleetUpdateEvent { timestamp, agentCount }
void msg.data.agentCount
queryClient.invalidateQueries({ queryKey: ['agents'] })
queryClient.invalidateQueries({ queryKey: ['sessions'] })
queryClient.invalidateQueries({ queryKey: ['tasks'] })
break
default:
// 'connected' and unknown events — no action needed
break
}
},
[queryClient],
)
const { status: sseStatus } = useSSE({ onMessage: handleMessage })
return { sseStatus }
}

View File

@@ -0,0 +1,267 @@
/**
* Tests for useSSE — SSE connection lifecycle, back-off, event parsing, and cleanup.
*
* jsdom does not include EventSource, so we mock it completely.
*/
import { renderHook, act, waitFor } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { useSSE } from './useSSE'
// ---------------------------------------------------------------------------
// Mock EventSource — defined as a plain class so `new EventSource()` works
// ---------------------------------------------------------------------------
class MockEventSource {
url: string
onopen: (() => void) | null = null
onerror: ((evt: Event) => void) | null = null
onmessage: ((evt: MessageEvent) => void) | null = null
private listeners: Map<string, Array<(evt: Event) => void>> = new Map()
readyState: number = 0
constructor(url: string) {
this.url = url
}
addEventListener(type: string, handler: (evt: Event) => void) {
if (!this.listeners.has(type)) this.listeners.set(type, [])
this.listeners.get(type)!.push(handler)
}
removeEventListener() { /* no-op for tests */ }
close() {
this.readyState = 2
this.onopen = null
this.onerror = null
this.onmessage = null
this.listeners.clear()
}
// Test helpers
_simulateOpen() { this.onopen?.() }
_simulateError() { this.onerror?.(new Event('error')) }
_simulateNamedEvent(type: string, data: string) {
const handlers = this.listeners.get(type)
if (handlers) {
const evt = new MessageEvent(type, { data }) as Event
handlers.forEach((h) => h(evt))
}
}
_simulateMessage(data: string) {
this.onmessage?.(new MessageEvent('message', { data }) as MessageEvent)
}
static readonly CONNECTING = 0
static readonly OPEN = 1
static readonly CLOSED = 2
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
let esInstances: MockEventSource[]
describe('useSSE', () => {
beforeEach(() => {
esInstances = []
// Replace global EventSource with our mock class
Object.defineProperty(globalThis, 'EventSource', {
// The mock must use a class for `new EventSource()` to work
value: class extends MockEventSource {
constructor(url: string) {
super(url)
esInstances.push(this)
}
},
writable: true,
configurable: true,
})
vi.useFakeTimers({ shouldAdvanceTime: true })
})
afterEach(() => {
vi.restoreAllMocks()
vi.useRealTimers()
})
// ── Initial connection ──────────────────────────────────────────────────
it('starts in "connecting" state and creates an EventSource', () => {
const { result } = renderHook(() => useSSE({ url: '/api/events' }))
expect(result.current.status).toBe('connecting')
expect(esInstances.length).toBeGreaterThanOrEqual(1)
expect(esInstances[0].url).toBe('/api/events')
})
it('transitions to "connected" on open', async () => {
const onOpen = vi.fn()
const { result } = renderHook(() => useSSE({ url: '/api/events', onOpen }))
act(() => { esInstances[0]._simulateOpen() })
await waitFor(() => {
expect(result.current.status).toBe('connected')
})
expect(onOpen).toHaveBeenCalledTimes(1)
})
// ── Reconnection with exponential back-off ──────────────────────────────
it('retries after error with exponential back-off', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 30000 }),
)
// First error → reconnecting, retry at 1s
act(() => { esInstances[0]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
expect(esInstances).toHaveLength(1)
// Advance 1000ms → second EventSource created
act(() => { vi.advanceTimersByTime(1000) })
expect(esInstances).toHaveLength(2)
// Second error → reconnecting, retry at 2s
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
act(() => { vi.advanceTimersByTime(2000) })
expect(esInstances).toHaveLength(3)
// Third error → reconnecting, retry at 4s
act(() => { esInstances[2]._simulateError() })
act(() => { vi.advanceTimersByTime(4000) })
expect(esInstances).toHaveLength(4)
})
it('caps reconnect delay at reconnectMaxMs', async () => {
renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 10000 }),
)
// Force 5 errors to push the exponent past the cap
for (let i = 0; i < 5; i++) {
act(() => { esInstances[i]._simulateError() })
const expectedDelay = Math.min(1000 * 2 ** i, 10000)
act(() => { vi.advanceTimersByTime(expectedDelay) })
}
// 6 ES instances created (initial + 5 retries)
expect(esInstances).toHaveLength(6)
})
// ── Circuit-breaker (max retries) ───────────────────────────────────────
it('transitions to "error" after reconnectLimit is exceeded', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 2 }),
)
// First error → reconnecting
act(() => { esInstances[0]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
// Advance → retry
act(() => { vi.advanceTimersByTime(100) })
// Second error → reconnecting (attempt 2, still ≤ limit)
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
act(() => { vi.advanceTimersByTime(200) })
// Third error → limit exceeded (3 > 2) → error
act(() => { esInstances[2]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('error') })
})
it('resets reconnect counter on successful connection', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 3 }),
)
// Two errors then a successful connect
act(() => { esInstances[0]._simulateError() })
act(() => { vi.advanceTimersByTime(100) })
act(() => { esInstances[1]._simulateOpen() })
await waitFor(() => { expect(result.current.status).toBe('connected') })
// Now error again — counter should be reset, so we get fresh attempts
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
expect(result.current.status).toBe('reconnecting')
})
// ── Cleanup on unmount ───────────────────────────────────────────────────
it('closes EventSource on unmount', () => {
const closeSpy = vi.spyOn(MockEventSource.prototype, 'close')
const { unmount } = renderHook(() => useSSE({ url: '/api/events' }))
unmount()
expect(closeSpy).toHaveBeenCalled()
})
it('does not update state after unmount', async () => {
const { result, unmount } = renderHook(() => useSSE({ url: '/api/events' }))
unmount()
// These should be no-ops after unmount (mountedRef guards)
act(() => { esInstances[0]._simulateOpen() })
act(() => { esInstances[0]._simulateError() })
// State should not have changed
expect(result.current.status).toBe('connecting')
})
// ── Event parsing ───────────────────────────────────────────────────────
it('parses valid JSON data into objects', async () => {
const onMessage = vi.fn()
renderHook(() => useSSE({ url: '/api/events', onMessage }))
act(() => {
esInstances[0]._simulateNamedEvent('agent.status', JSON.stringify({ agentId: 'a1', status: 'active' }))
})
expect(onMessage).toHaveBeenCalledWith({
type: 'agent.status',
data: { agentId: 'a1', status: 'active' },
})
})
it('passes invalid JSON through as raw string', async () => {
const onMessage = vi.fn()
renderHook(() => useSSE({ url: '/api/events', onMessage }))
act(() => {
esInstances[0]._simulateNamedEvent('agent.status', 'not valid json {{{')
})
expect(onMessage).toHaveBeenCalledWith({
type: 'agent.status',
data: 'not valid json {{{',
})
})
// ── enabled=false skips connection ──────────────────────────────────────
it('does not create EventSource when enabled=false', () => {
const { result } = renderHook(() => useSSE({ url: '/api/events', enabled: false }))
expect(esInstances).toHaveLength(0)
expect(result.current.status).toBe('connecting')
})
// ── onError callback ────────────────────────────────────────────────────
it('calls onError on connection failure', async () => {
const onError = vi.fn()
renderHook(() =>
useSSE({ url: '/api/events', onError, reconnectBaseMs: 100 }),
)
act(() => { esInstances[0]._simulateError() })
expect(onError).toHaveBeenCalledTimes(1)
})
// ── Default URL ─────────────────────────────────────────────────────────
it('uses /api/events as default URL', () => {
renderHook(() => useSSE())
expect(esInstances[0].url).toBe('/api/events')
})
})

View File

@@ -0,0 +1,180 @@
import { useEffect, useRef, useCallback, useState } from 'react'
/** SSE connection state reported to consumers. */
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
/** Typed SSE event received from the backend. */
export interface SSEMessage {
/** event: field from the SSE frame */
type: string
/** parsed JSON from the data: field */
data: unknown
}
export interface UseSSEOptions {
/** Endpoint URL — defaults to /api/events */
url?: string
/** Called for every SSE message (all event types) */
onMessage?: (msg: SSEMessage) => void
/** Called when connection opens or reconnects */
onOpen?: () => void
/** Called on every connection error (both transient and terminal) */
onError?: (err: Event) => void
/** Base delay in ms before the first reconnect attempt (default 1 000) */
reconnectBaseMs?: number
/** Maximum reconnect delay in ms (default 30 000) */
reconnectMaxMs?: number
/**
* Maximum number of consecutive reconnect attempts before giving up.
* When the limit is reached, status transitions to 'error'.
* Default undefined (unlimited).
*/
reconnectLimit?: number
/** Set false to disable auto-connect (useful in tests) */
enabled?: boolean
}
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
/**
* useSSE — mounts a persistent SSE connection to the Control Center backend.
*
* Handles:
* - Initial connection on mount
* - Exponential back-off reconnection on drop (1s → 2s → 4s … capped at reconnectMaxMs)
* - Circuit-breaker: after reconnectLimit consecutive failures, transitions to 'error'
* - Cleanup on unmount
* - All five event types: agent.status, agent.task, agent.progress, fleet.update, connected
*
* The 'connected' SSE event is an application-level handshake sent by the backend
* after the transport opens. This is distinct from onOpen, which fires at the
* transport level when the EventSource HTTP connection is established.
*/
export function useSSE({
url = '/api/events',
onMessage,
onOpen,
onError,
reconnectBaseMs = 1_000,
reconnectMaxMs = 30_000,
reconnectLimit,
enabled = true,
}: UseSSEOptions = {}): { status: SSEStatus } {
const [status, setStatus] = useState<SSEStatus>('connecting')
// Stable refs so the effect doesn't need to re-run when callbacks change
const onMessageRef = useRef(onMessage)
const onOpenRef = useRef(onOpen)
const onErrorRef = useRef(onError)
onMessageRef.current = onMessage
onOpenRef.current = onOpen
onErrorRef.current = onError
const reconnectAttemptRef = useRef(0)
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const esRef = useRef<EventSource | null>(null)
const mountedRef = useRef(true)
const clearReconnectTimer = useCallback(() => {
if (reconnectTimerRef.current !== null) {
clearTimeout(reconnectTimerRef.current)
reconnectTimerRef.current = null
}
}, [])
const connect = useCallback(() => {
if (!mountedRef.current || !enabled) return
// Clean up any existing connection
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
const es = new EventSource(url)
esRef.current = es
es.onopen = () => {
if (!mountedRef.current) return
reconnectAttemptRef.current = 0
setStatus('connected')
onOpenRef.current?.()
}
es.onerror = (evt) => {
if (!mountedRef.current) return
// EventSource auto-retries but we manage our own to get back-off control
es.close()
esRef.current = null
onErrorRef.current?.(evt)
reconnectAttemptRef.current += 1
// Circuit-breaker: give up after reconnectLimit consecutive failures
if (reconnectLimit !== undefined && reconnectAttemptRef.current > reconnectLimit) {
setStatus('error')
return
}
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
// Note: attempt is 1-based here (already incremented), so we use attempt-1 for the exponent
const delay = Math.min(
reconnectBaseMs * 2 ** (reconnectAttemptRef.current - 1),
reconnectMaxMs,
)
setStatus('reconnecting')
clearReconnectTimer()
reconnectTimerRef.current = setTimeout(() => {
if (mountedRef.current) connect()
}, delay)
}
// Register listeners for all known event types
for (const eventType of SSE_EVENTS) {
es.addEventListener(eventType, (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: eventType, data })
})
}
// Catch-all for unnamed events (type == 'message').
// Won't fire for the named events registered via addEventListener above.
es.onmessage = (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: 'message', data })
}
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, reconnectLimit, clearReconnectTimer])
useEffect(() => {
mountedRef.current = true
if (enabled) connect()
return () => {
mountedRef.current = false
clearReconnectTimer()
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
}
}, [connect, enabled, clearReconnectTimer])
return { status }
}

View File

@@ -4,13 +4,16 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import { BrowserRouter } from 'react-router-dom'
import ErrorBoundary from './components/ErrorBoundary'
import { ThemeProvider } from './hooks/useTheme'
import { SSEProvider } from './contexts/SSEContext'
import './index.css'
import App from './App'
const queryClient = new QueryClient({
defaultOptions: {
queries: {
staleTime: 30_000,
// No polling — real-time updates come through SSE.
// staleTime is kept high; data is pushed, not pulled.
staleTime: 60_000,
refetchOnWindowFocus: false,
retry: 1,
},
@@ -22,9 +25,13 @@ createRoot(document.getElementById('root')!).render(
<ErrorBoundary>
<ThemeProvider>
<QueryClientProvider client={queryClient}>
<BrowserRouter>
<App />
</BrowserRouter>
{/* SSEProvider must live inside QueryClientProvider so it can call
useQueryClient() to invalidate caches on incoming events. */}
<SSEProvider>
<BrowserRouter>
<App />
</BrowserRouter>
</SSEProvider>
</QueryClientProvider>
</ThemeProvider>
</ErrorBoundary>

View File

@@ -1,18 +1,36 @@
import { useTheme } from '../hooks/useTheme'
import { useLocalStorage } from '../hooks/useLocalStorage'
import { Sun, Moon, Monitor, Zap, Clock } from 'lucide-react'
import { useSSEContext } from '../contexts/SSEContext'
import { Sun, Moon, Monitor, Zap, Radio } from 'lucide-react'
const REFRESH_PRESETS = [
{ label: '5s', value: 5_000 },
{ label: '10s', value: 10_000 },
{ label: '30s', value: 30_000 },
{ label: '60s', value: 60_000 },
]
const SSE_STATUS_COPY: Record<string, { label: string; description: string; color: string }> = {
connected: {
label: 'Connected',
description: 'Real-time updates are active. Agent status, tasks, and progress stream live.',
color: 'text-green-500',
},
connecting: {
label: 'Connecting…',
description: 'Establishing SSE connection to the backend.',
color: 'text-yellow-500',
},
reconnecting: {
label: 'Reconnecting…',
description: 'Connection lost. Retrying with exponential back-off.',
color: 'text-yellow-500',
},
error: {
label: 'Disconnected',
description: 'Could not connect to the SSE endpoint. Check that the backend is reachable.',
color: 'text-red-500',
},
}
export default function SettingsPage() {
const { isDark, toggleTheme } = useTheme()
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
const [refreshInterval, setRefreshInterval] = useLocalStorage('cc-refresh-interval', 30_000)
const { sseStatus } = useSSEContext()
const sseInfo = SSE_STATUS_COPY[sseStatus] ?? SSE_STATUS_COPY.error
return (
<div className="space-y-8 max-w-2xl">
@@ -80,45 +98,31 @@ export default function SettingsPage() {
</div>
</section>
{/* Refresh */}
{/* Real-time connection status */}
<section className="space-y-4">
<h2 className="text-lg font-semibold flex items-center gap-2">
<Clock size={20} className="text-primary" />
Auto Refresh
<Radio size={20} className="text-primary" />
Real-time Updates
</h2>
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
<p className="text-sm text-on-surface-variant">Data refresh interval for agent status and logs</p>
<div className="flex flex-col gap-2">
<input
type="range"
min="0"
max="3"
step="1"
value={REFRESH_PRESETS.findIndex((p) => p.value === refreshInterval)}
onChange={(e) => {
const idx = parseInt(e.target.value)
setRefreshInterval(REFRESH_PRESETS[idx].value)
}}
className="w-full accent-primary"
/>
<div className="flex justify-between text-xs text-on-surface-muted">
{REFRESH_PRESETS.map((p) => (
<button
key={p.label}
onClick={() => setRefreshInterval(p.value)}
className={`px-3 py-1 rounded-lg transition-colors ${
refreshInterval === p.value
? 'bg-primary/10 text-primary'
: 'hover:bg-surface-light'
}`}
>
{p.label}
</button>
))}
<div className="flex items-center justify-between">
<div>
<p className="font-medium">SSE Connection</p>
<p className="text-sm text-on-surface-variant mt-0.5">{sseInfo.description}</p>
</div>
<span className={`text-sm font-semibold whitespace-nowrap ${sseInfo.color}`}>
{sseInfo.label}
</span>
</div>
<p className="text-xs text-on-surface-muted">
Endpoint: <code className="bg-surface-light px-1.5 py-0.5 rounded text-on-surface-variant">/api/events</code>
&nbsp;·&nbsp;Events: agent.status, agent.task, agent.progress, fleet.update
</p>
<p className="text-xs text-on-surface-muted">
Polling is disabled. All status updates are pushed from the server over a persistent SSE connection.
The client reconnects automatically with exponential back-off on drop.
</p>
</div>
</section>
</div>

View File

@@ -0,0 +1,72 @@
/**
* SSE event payload types matching the Go backend (internal/handler/sse.go).
*
* Event format on the wire:
* event: <eventType>
* data: <json>
*
* The types below define the backend contract. The SSEPayloadMap maps
* each event type string to its expected payload shape. SSEMessage is a
* discriminated union on `type` — when you switch on msg.type, TypeScript
* narrows msg.data to the correct payload interface automatically.
*/
import type { AgentStatus } from '../types'
/** agent.status — agent came online, went offline, changed state */
export interface AgentStatusEvent {
agentId: string
status: AgentStatus
/** Optional human-readable reason (e.g. error message) */
reason?: string
}
/** agent.task — a task was assigned to or completed by an agent */
export interface AgentTaskEvent {
agentId: string
taskId: string
title: string
action: 'assigned' | 'completed' | 'failed'
}
/** agent.progress — incremental progress update for a running task */
export interface AgentProgressEvent {
agentId: string
taskId: string
progress: number
/** Optional description of what is currently happening */
message?: string
}
/**
* fleet.update — bulk refresh of all agents (e.g. after a deployment).
* The backend may send partial or complete agent state.
*/
export interface FleetUpdateEvent {
/** ISO timestamp of when the snapshot was taken */
timestamp: string
/** Number of agents in the fleet */
agentCount: number
}
/** Union of all SSE data payloads keyed by event type. */
export type SSEPayloadMap = {
'agent.status': AgentStatusEvent
'agent.task': AgentTaskEvent
'agent.progress': AgentProgressEvent
'fleet.update': FleetUpdateEvent
connected: { clientCount: number }
message: unknown
}
/**
* Discriminated SSE message — the `type` field narrows `data` via SSEPayloadMap.
*
* Usage:
* if (msg.type === 'agent.status') {
* msg.data.agentId // ✅ TypeScript knows this is AgentStatusEvent
* }
*/
export type SSEMessage = {
[K in keyof SSEPayloadMap]: { type: K; data: SSEPayloadMap[K] }
}[keyof SSEPayloadMap]

View File

@@ -0,0 +1 @@
import '@testing-library/jest-dom'

View File

@@ -1,4 +1,4 @@
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error'
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' | 'offline'
export interface Agent {
id: string

View File

@@ -4,7 +4,7 @@
"target": "es2023",
"lib": ["ES2023", "DOM"],
"module": "esnext",
"types": ["vite/client"],
"types": ["vite/client", "vitest/globals"],
"skipLibCheck": true,
/* Bundler mode */

View File

@@ -20,5 +20,5 @@
"erasableSyntaxOnly": true,
"noFallthroughCasesInSwitch": true
},
"include": ["vite.config.ts"]
"include": ["vite.config.ts", "vitest.config.ts"]
}

11
frontend/vitest.config.ts Normal file
View File

@@ -0,0 +1,11 @@
import { defineConfig } from 'vitest/config'
import react from '@vitejs/plugin-react'
export default defineConfig({
plugins: [react()],
test: {
environment: 'jsdom',
globals: true,
setupFiles: ['./src/test-setup.ts'],
},
})

View File

@@ -63,29 +63,30 @@ func main() {
Broker: broker,
})
// ── Gateway: WS primary + REST fallback ────────────────────────────────
// WebSocket client (primary — real-time events via OpenClaw v3 protocol)
// ── Gateway clients (WS primary, REST fallback) ───────────────────
// WS gateway client (primary path)
wsClient := gateway.NewWSClient(gateway.WSConfig{
URL: cfg.WSGatewayURL,
AuthToken: cfg.WSGatewayToken,
}, agentRepo, broker, logger)
// REST polling client (fallback — only used if WS connection fails)
restClient := gateway.NewClient(gateway.Config{
URL: cfg.GatewayURL,
PollInterval: cfg.GatewayPollInterval,
// REST gateway client (fallback — only polls if WS fails to connect)
gwClient := gateway.NewClient(gateway.Config{
URL: cfg.GatewayRestURL,
PollInterval: cfg.GatewayRestPollInterval,
}, agentRepo, broker)
// Wire them: WS notifies REST to stand down on successful connect
wsClient.SetRESTClient(restClient)
// Wire them together: REST defers to WS when WS is connected
wsClient.SetRESTClient(gwClient)
gwClient.SetWSClient(wsClient)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start WS client first (primary)
go wsClient.Start(ctx)
// Start REST client (fallback polling)
go restClient.Start(ctx)
// Start REST client (will wait for WS, then stand down or fall back)
go gwClient.Start(ctx)
// ── Server ─────────────────────────────────────────────────────────────
srv := &http.Server{

View File

@@ -10,30 +10,30 @@ import (
// Config holds all application configuration.
type Config struct {
Port int
DatabaseURL string
CORSOrigin string
LogLevel string
Environment string
GatewayURL string // REST fallback URL
GatewayPollInterval time.Duration // REST fallback poll interval
WSGatewayURL string // WebSocket gateway URL
WSGatewayToken string // WebSocket auth token
Port int
DatabaseURL string
CORSOrigin string
LogLevel string
Environment string
GatewayRestURL string
GatewayRestPollInterval time.Duration
WSGatewayURL string
WSGatewayToken string
}
// Load reads configuration from environment variables, applying defaults where
// values are not set. All secrets come from the environment — nothing is hardcoded.
func Load() *Config {
return &Config{
Port: getEnvInt("PORT", 8080),
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"),
GatewayURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("GATEWAY_WS_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
Port: getEnvInt("PORT", 8080),
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
}
}

View File

@@ -13,6 +13,7 @@ import (
"fmt"
"log/slog"
"net/http"
"sync"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
@@ -22,15 +23,17 @@ import (
// Client polls the OpenClaw gateway for agent status and keeps the database
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
// fallback that only activates if the WS connection fails.
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect after initial backoff retries.
type Client struct {
url string
pollInterval time.Duration
httpClient *http.Client
agents repository.AgentRepo
broker *handler.Broker
wsClient *Client // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established
wsClient *WSClient // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race
}
// Config holds gateway client configuration, typically loaded from environment.
@@ -63,36 +66,56 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker)
// to it. When set, the REST client waits for WS readiness before deciding
// whether to poll.
func (c *Client) SetWSClient(ws *WSClient) {
_ = ws // stored for future reconnection coordination
c.wsClient = ws
}
// MarkWSReady signals that the WS connection is live and the REST poller
// should stand down. Called by WSClient after a successful handshake.
func (c *Client) MarkWSReady() {
select {
case <-c.wsReady:
// already closed
default:
c.wsReadyOnce.Do(func() {
close(c.wsReady)
}
})
}
// Start begins the gateway client loop. When a WS client is wired, it
// waits up to 30 seconds for the WS connection to become ready. If WS
// connects, the REST poller stands down. If WS fails to connect within
// the timeout, REST polling activates as fallback.
// connects, the REST poller stands down and only logs periodically. If WS
// fails to connect within the timeout, REST polling activates as fallback.
func (c *Client) Start(ctx context.Context) {
slog.Info("gateway client starting",
"url", c.url,
"pollInterval", c.pollInterval.String())
if c.wsClient != nil {
slog.Info("gateway client waiting for WS connection", "timeout", "30s")
select {
case <-c.wsReady:
slog.Info("gateway client using WS — REST poller standing down")
// WS is live; keep this goroutine alive but idle. If WS
// disconnects later, we could re-enter polling, but for now
// the WS client handles its own reconnection.
<-ctx.Done()
slog.Info("gateway client stopped (WS mode)")
return
case <-time.After(30 * time.Second):
slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling",
"url", c.url,
"pollInterval", c.pollInterval.String())
case <-ctx.Done():
slog.Info("gateway client stopped while waiting for WS")
return
}
} else {
slog.Info("gateway client using REST polling (no WS client configured)",
"url", c.url,
"pollInterval", c.pollInterval.String())
}
// REST fallback polling
ticker := time.NewTicker(c.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("gateway client stopped")
slog.Info("gateway client stopped (REST fallback)")
return
case <-ticker.C:
c.poll(ctx)

View File

@@ -20,15 +20,15 @@ import (
// sessionChangedPayload represents a single session delta from a
// sessions.changed event.
type sessionChangedPayload struct {
SessionKey string `json:"sessionKey"`
AgentID string `json:"agentId"`
Status string `json:"status"` // running, streaming, done, error
TotalTokens int `json:"totalTokens"`
LastActivityAt string `json:"lastActivityAt"`
CurrentTask string `json:"currentTask"`
TaskProgress *int `json:"taskProgress,omitempty"`
TaskElapsed string `json:"taskElapsed"`
ErrorMessage string `json:"errorMessage"`
SessionKey string `json:"sessionKey"`
AgentID string `json:"agentId"`
Status string `json:"status"` // running, streaming, done, error
TotalTokens int `json:"totalTokens"`
LastActivityAt string `json:"lastActivityAt"`
CurrentTask string `json:"currentTask"`
TaskProgress *int `json:"taskProgress,omitempty"`
TaskElapsed string `json:"taskElapsed"`
ErrorMessage string `json:"errorMessage"`
}
// presencePayload represents a device presence update event.
@@ -51,8 +51,18 @@ type agentConfigPayload struct {
// ── Handler registration ─────────────────────────────────────────────────
// registerEventHandlers sets up all live event handlers on the WSClient.
// Called once after a successful handshake + initial sync.
// Call this once after a successful handshake + initial sync.
func (c *WSClient) registerEventHandlers() {
if c.agents == nil || c.broker == nil {
c.logger.Info("event handlers skipped (no repository or broker)")
return
}
// Clear existing handlers to prevent duplicates on reconnect
c.mu.Lock()
c.handlers = make(map[string][]eventHandler)
c.mu.Unlock()
c.OnEvent("sessions.changed", c.handleSessionsChanged)
c.OnEvent("presence", c.handlePresence)
c.OnEvent("agent.config", c.handleAgentConfig)
@@ -68,11 +78,14 @@ func (c *WSClient) registerEventHandlers() {
// For each changed session: map the gateway status to an AgentStatus, update
// the agent in the DB, then broadcast via SSE.
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
c.logger.Debug("handleSessionsChanged", "payload", string(payload))
c.logger.Debug("handleSessionsChanged start", "payload", string(payload))
// Try array first, then single object
var deltas []sessionChangedPayload
if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 {
if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 {
// Array of deltas
} else {
// Try single object
var single sessionChangedPayload
if err := json.Unmarshal(payload, &single); err != nil {
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
@@ -92,27 +105,43 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
agentStatus := mapSessionStatus(d.Status)
// Build partial update
update := models.UpdateAgentRequest{
Status: &agentStatus,
}
// Session key
if d.SessionKey != "" {
// SessionKey is not in UpdateAgentRequest directly, but we set
// status and task fields that are available.
}
// Current task
if d.CurrentTask != "" {
update.CurrentTask = &d.CurrentTask
}
// Task progress
if d.TaskProgress != nil {
update.TaskProgress = d.TaskProgress
} else if d.TotalTokens > 0 {
// Derive progress from token count as fallback
prog := min(d.TotalTokens/100, 100)
update.TaskProgress = &prog
}
// Task elapsed
if d.TaskElapsed != "" {
update.TaskElapsed = &d.TaskElapsed
}
// Error message
if d.ErrorMessage != "" {
update.ErrorMessage = &d.ErrorMessage
}
// If session ended, clear task and progress
// If session ended (done or empty status), set agent to idle and
// clear the current task
if agentStatus == models.AgentStatusIdle {
emptyTask := ""
update.CurrentTask = &emptyTask
@@ -120,7 +149,7 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
update.TaskProgress = &zeroProg
}
// DB update first
// Update DB first
updated, err := c.agents.Update(ctx, d.AgentID, update)
if err != nil {
c.logger.Warn("sessions.changed: DB update failed",
@@ -128,23 +157,27 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
continue
}
// Then SSE broadcast
// Then broadcast
c.broker.Broadcast("agent.status", updated)
if d.TaskProgress != nil || d.CurrentTask != "" {
c.broker.Broadcast("agent.progress", updated)
}
c.logger.Debug("sessions.changed: agent updated",
"agentId", d.AgentID, "status", string(agentStatus))
"agentId", d.AgentID,
"status", string(agentStatus))
}
c.logger.Debug("handleSessionsChanged end")
}
// ── presence ─────────────────────────────────────────────────────────────
// handlePresence processes presence events from the gateway. Updates the
// agent's lastActivity and broadcasts status if the connection state changed.
// agent's lastActivity timestamp and broadcasts status if the connection
// state changed.
func (c *WSClient) handlePresence(payload json.RawMessage) {
c.logger.Debug("handlePresence", "payload", string(payload))
c.logger.Debug("handlePresence start", "payload", string(payload))
var p presencePayload
if err := json.Unmarshal(payload, &p); err != nil {
@@ -153,21 +186,31 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
}
if p.AgentID == "" {
c.logger.Debug("presence: skipping event with empty agentId")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// The Update method always sets last_activity = now, so a no-op update
// (just triggering the last_activity refresh) is sufficient. We send
// an empty-ish update — the repo always bumps last_activity.
// If connection state is reported, also update status.
update := models.UpdateAgentRequest{}
// If device disconnected, set agent to idle
if p.Connected != nil && !*p.Connected {
// Device disconnected — set agent to idle
idle := models.AgentStatusIdle
update.Status = &idle
}
// DB update first (Update always bumps last_activity)
// Pass lastActivityAt from the event so DB and SSE stay consistent
if p.LastActivityAt != "" {
update.LastActivityAt = &p.LastActivityAt
}
// Update DB first
updated, err := c.agents.Update(ctx, p.AgentID, update)
if err != nil {
c.logger.Warn("presence: DB update failed",
@@ -175,24 +218,21 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
return
}
if p.LastActivityAt != "" {
updated.LastActivity = p.LastActivityAt
}
// Then SSE broadcast
// Then broadcast
c.broker.Broadcast("agent.status", updated)
c.logger.Debug("presence: agent updated",
"agentId", p.AgentID, "connected", p.Connected)
"agentId", p.AgentID,
"connected", p.Connected)
}
// ── agent.config ─────────────────────────────────────────────────────────
// handleAgentConfig processes agent.config events from the gateway. Updates
// agent metadata (channel) in the DB and broadcasts a fleet.update with the
// full fleet snapshot.
// agent metadata (name, channel) in the DB and broadcasts a fleet.update
// with the full fleet snapshot.
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
c.logger.Debug("handleAgentConfig", "payload", string(payload))
c.logger.Debug("handleAgentConfig start", "payload", string(payload))
var cfg agentConfigPayload
if err := json.Unmarshal(payload, &cfg); err != nil {
@@ -201,19 +241,27 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
}
if cfg.ID == "" {
c.logger.Debug("agent.config: skipping event with empty id")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Build partial update with available fields.
update := models.UpdateAgentRequest{}
if cfg.Name != "" {
update.DisplayName = &cfg.Name
}
if cfg.Role != "" {
update.Role = &cfg.Role
}
if cfg.Channel != "" {
update.Channel = &cfg.Channel
}
// DB update first
// Update DB first
updated, err := c.agents.Update(ctx, cfg.ID, update)
if err != nil {
c.logger.Warn("agent.config: DB update failed",
@@ -221,23 +269,19 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
return
}
// Apply display name/role from config event
if cfg.Name != "" {
updated.DisplayName = cfg.Name
}
if cfg.Role != "" {
updated.Role = cfg.Role
}
// Broadcast full fleet snapshot so frontend gets updated agent info
// Then broadcast fleet snapshot
allAgents, err := c.agents.List(ctx, "")
if err != nil {
c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err)
c.logger.Warn("agent.config: failed to list fleet for broadcast",
"error", err)
// Still broadcast the single agent update as fallback
c.broker.Broadcast("agent.status", updated)
return
}
c.broker.Broadcast("fleet.update", allAgents)
c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name)
c.logger.Debug("agent.config: fleet updated",
"agentId", cfg.ID,
"name", cfg.Name)
}

View File

@@ -0,0 +1,516 @@
package gateway
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"testing"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
// ── Mock AgentRepo ────────────────────────────────────────────────────────
type mockAgentRepo struct {
mu sync.Mutex
agents map[string]models.AgentCardData
updateCalls []updateCall
}
type updateCall struct {
id string
req models.UpdateAgentRequest
}
func (m *mockAgentRepo) Get(_ context.Context, id string) (models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[id]
if !ok {
return models.AgentCardData{}, errNotFound
}
return a, nil
}
func (m *mockAgentRepo) Update(_ context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[id]
if !ok {
return models.AgentCardData{}, errNotFound
}
if req.Status != nil {
a.Status = *req.Status
}
if req.DisplayName != nil {
a.DisplayName = *req.DisplayName
}
if req.Role != nil {
a.Role = *req.Role
}
if req.Channel != nil {
a.Channel = *req.Channel
}
if req.CurrentTask != nil {
a.CurrentTask = req.CurrentTask
}
if req.TaskProgress != nil {
a.TaskProgress = req.TaskProgress
}
if req.TaskElapsed != nil {
a.TaskElapsed = req.TaskElapsed
}
if req.ErrorMessage != nil {
a.ErrorMessage = req.ErrorMessage
}
if req.LastActivityAt != nil {
a.LastActivity = *req.LastActivityAt
}
m.agents[id] = a
m.updateCalls = append(m.updateCalls, updateCall{id, req})
return a, nil
}
func (m *mockAgentRepo) Create(_ context.Context, a models.AgentCardData) error {
m.mu.Lock()
defer m.mu.Unlock()
m.agents[a.ID] = a
return nil
}
func (m *mockAgentRepo) List(_ context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
var result []models.AgentCardData
for _, a := range m.agents {
if statusFilter == "" || a.Status == statusFilter {
result = append(result, a)
}
}
return result, nil
}
func (m *mockAgentRepo) Delete(_ context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.agents, id)
return nil
}
func (m *mockAgentRepo) Count(_ context.Context) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.agents), nil
}
// errNotFound is returned by the mock repo when an agent is not found.
var errNotFound = fmt.Errorf("not found")
// ── Broadcast capture helper ───────────────────────────────────────────────
// broadcastCapture wraps a real Broker and captures all broadcasts
// via a subscribed channel. Use captured() to retrieve events that have
// been received so far. Call close() to unsubscribe when done.
type broadcastCapture struct {
broker *handler.Broker
ch chan handler.SSEEvent
}
func newBroadcastCapture(broker *handler.Broker) *broadcastCapture {
return &broadcastCapture{
broker: broker,
ch: broker.Subscribe(),
}
}
// captured drains all pending events from the subscription channel
// and returns them. This is synchronous — it only returns events that
// have already been sent to the channel.
func (bc *broadcastCapture) captured() []handler.SSEEvent {
var events []handler.SSEEvent
for {
select {
case evt := <-bc.ch:
events = append(events, evt)
default:
return events
}
}
}
func (bc *broadcastCapture) close() {
bc.broker.Unsubscribe(bc.ch)
}
// ── Test helpers ──────────────────────────────────────────────────────────
// newTestWSClient creates a WSClient wired to a mock repo and a real broker.
// Returns the client, the mock repo, and a broadcast capture.
func newTestWSClient() (*WSClient, *mockAgentRepo, *handler.Broker, *broadcastCapture) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
client := NewWSClient(WSConfig{}, repo, broker, slog.Default())
return client, repo, broker, capture
}
// ── Tests ─────────────────────────────────────────────────────────────────
func TestHandleSessionsChanged_Active(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Status: models.AgentStatusIdle,
}
payload := json.RawMessage(`{
"sessionKey": "s1",
"agentId": "otto",
"status": "running",
"totalTokens": 500,
"currentTask": "Orchestrating tasks"
}`)
client.handleSessionsChanged(payload)
// Verify: agent status updated to active
repo.mu.Lock()
agent := repo.agents["otto"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
if agent.Status != models.AgentStatusActive {
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusActive)
}
// Verify: update was called
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
if calls[0].id != "otto" {
t.Errorf("update call agentId = %q, want %q", calls[0].id, "otto")
}
// Verify: broker broadcast "agent.status"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandleSessionsChanged_Idle(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["dex"] = models.AgentCardData{
ID: "dex",
DisplayName: "Dex",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Writing API"),
}
payload := json.RawMessage(`{
"sessionKey": "s2",
"agentId": "dex",
"status": "done",
"totalTokens": 1000
}`)
client.handleSessionsChanged(payload)
repo.mu.Lock()
agent := repo.agents["dex"]
repo.mu.Unlock()
// Verify: agent goes idle
if agent.Status != models.AgentStatusIdle {
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusIdle)
}
// Verify: current task cleared (set to empty string)
if agent.CurrentTask != nil && *agent.CurrentTask != "" {
t.Errorf("current task = %q, want empty (cleared on idle)", *agent.CurrentTask)
}
// Verify: broker fires "agent.status"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandleSessionsChanged_ArrayPayload(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusIdle}
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
payload := json.RawMessage(`[
{"sessionKey":"s1","agentId":"otto","status":"running","totalTokens":100},
{"sessionKey":"s2","agentId":"dex","status":"streaming","totalTokens":200}
]`)
client.handleSessionsChanged(payload)
repo.mu.Lock()
otto := repo.agents["otto"]
dex := repo.agents["dex"]
repo.mu.Unlock()
if otto.Status != models.AgentStatusActive {
t.Errorf("otto status = %q, want active", otto.Status)
}
if dex.Status != models.AgentStatusActive {
t.Errorf("dex status = %q, want active", dex.Status)
}
// Both should produce broadcasts
events := capture.captured()
statusCount := 0
for _, evt := range events {
if evt.EventType == "agent.status" {
statusCount++
}
}
if statusCount < 2 {
t.Errorf("expected at least 2 agent.status broadcasts, got %d", statusCount)
}
}
func TestHandleSessionsChanged_SkipsEmptyAgentID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"sessionKey":"s1","agentId":"","status":"running"}`)
client.handleSessionsChanged(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
}
}
func TestHandleSessionsChanged_UnparseablePayload(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`not json at all`)
client.handleSessionsChanged(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for unparseable payload, got %d", len(events))
}
}
func TestHandlePresence(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["pip"] = models.AgentCardData{
ID: "pip",
DisplayName: "Pip",
Status: models.AgentStatusActive,
}
payload := json.RawMessage(`{
"agentId": "pip",
"connected": true,
"lastActivityAt": "2025-01-01T00:00:00Z"
}`)
client.handlePresence(payload)
repo.mu.Lock()
agent := repo.agents["pip"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
// Agent should still be active (connected=true doesn't change status)
if agent.Status != models.AgentStatusActive {
t.Errorf("agent status = %q, want active", agent.Status)
}
// Update should have been called (for lastActivityAt)
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
// Verify broadcast
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandlePresence_Disconnect(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["pip"] = models.AgentCardData{
ID: "pip",
DisplayName: "Pip",
Status: models.AgentStatusActive,
}
payload := json.RawMessage(`{
"agentId": "pip",
"connected": false
}`)
client.handlePresence(payload)
repo.mu.Lock()
agent := repo.agents["pip"]
repo.mu.Unlock()
// Agent should go idle on disconnect
if agent.Status != models.AgentStatusIdle {
t.Errorf("agent status = %q, want idle after disconnect", agent.Status)
}
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status' on disconnect")
}
}
func TestHandlePresence_EmptyAgentID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"agentId":"","connected":true}`)
client.handlePresence(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
}
}
func TestHandleAgentConfig(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["rex"] = models.AgentCardData{
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
Channel: "discord",
}
payload := json.RawMessage(`{
"id": "rex",
"name": "Rex the Dev",
"role": "Senior Frontend",
"channel": "telegram"
}`)
client.handleAgentConfig(payload)
repo.mu.Lock()
agent := repo.agents["rex"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
// Verify DisplayName and Role updated
if agent.DisplayName != "Rex the Dev" {
t.Errorf("displayName = %q, want %q", agent.DisplayName, "Rex the Dev")
}
if agent.Role != "Senior Frontend" {
t.Errorf("role = %q, want %q", agent.Role, "Senior Frontend")
}
if agent.Channel != "telegram" {
t.Errorf("channel = %q, want %q", agent.Channel, "telegram")
}
// Verify update was called
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
// Verify broker fires "fleet.update"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "fleet.update" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'fleet.update'")
}
}
func TestHandleAgentConfig_EmptyID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"id":"","name":"Ghost"}`)
client.handleAgentConfig(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty id, got %d", len(events))
}
}
func TestHandleAgentConfig_NotFound(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"id":"unknown","name":"Ghost","role":"Phantom"}`)
client.handleAgentConfig(payload)
// Agent doesn't exist in repo, so Update will fail → handler logs warning, returns early
events := capture.captured()
for _, evt := range events {
if evt.EventType == "fleet.update" {
t.Error("fleet.update should not be broadcast when agent update fails")
}
}
}

View File

@@ -16,6 +16,8 @@ import (
// ── RPC response types ───────────────────────────────────────────────────
// agentListItem represents a single agent returned by the agents.list RPC.
// Fields are extracted gracefully from json.RawMessage so unknown fields
// from the gateway are silently ignored.
type agentListItem struct {
ID string `json:"id"`
Name string `json:"name"`
@@ -40,9 +42,14 @@ type sessionListItem struct {
// persists them, merges session state into agent cards, and broadcasts
// the merged fleet as a fleet.update event.
func (c *WSClient) initialSync(ctx context.Context) error {
if c.agents == nil {
c.logger.Info("initial sync skipped (no repository)")
return nil
}
c.logger.Info("initial sync starting")
// 1. Fetch agents via RPC
// 1. Fetch agents
agentsRaw, err := c.Send("agents.list", nil)
if err != nil {
return fmt.Errorf("agents.list RPC: %w", err)
@@ -55,7 +62,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
c.logger.Info("agents.list received", "count", len(agentItems))
// 2. Persist each agent (create if not exists, update if changed)
// 2. Persist each agent
for _, item := range agentItems {
card := agentItemToCard(item)
@@ -70,12 +77,13 @@ func (c *WSClient) initialSync(ctx context.Context) error {
continue
}
// Agent exists — update display name or role if changed
// Agent exists — update if display name or role changed
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
// Update what we can via UpdateAgentRequest
channel := card.Channel
newName := card.DisplayName
newRole := card.Role
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
Channel: &channel,
DisplayName: &newName,
Role: &newRole,
})
if updateErr != nil {
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
@@ -83,7 +91,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
}
}
// 3. Fetch sessions via RPC
// 3. Fetch sessions
sessionsRaw, err := c.Send("sessions.list", nil)
if err != nil {
return fmt.Errorf("sessions.list RPC: %w", err)
@@ -96,7 +104,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
c.logger.Info("sessions.list received", "count", len(sessionItems))
// 4. Build agentId → session map for merge
// 4. Build a map of agentId → session for merge
sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems {
if s.AgentID != "" {
@@ -104,25 +112,26 @@ func (c *WSClient) initialSync(ctx context.Context) error {
}
}
// 5. Merge session state into agents, update DB, and collect for broadcast
// 5. Merge session state into agents and update + broadcast
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems {
card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok {
// Merge session state into agent card
// Merge session state
card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt
// Use totalTokens as a rough progress indicator
if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100)
prog := min(session.TotalTokens/100, 100) // normalize to 0-100
card.TaskProgress = &prog
}
}
// Persist merged status change
// Persist merged state
existing, err := c.agents.Get(ctx, card.ID)
if err == nil && existing.Status != card.Status {
status := card.Status
@@ -146,8 +155,8 @@ func (c *WSClient) initialSync(ctx context.Context) error {
// mapSessionStatus converts a gateway session status string to an AgentStatus.
// - "running" / "streaming" → active
// - "error" → error
// - "done" / "" / other → idle
// - "error" → error
// - "done" / "" / other → idle
func mapSessionStatus(status string) models.AgentStatus {
switch status {
case "running", "streaming":
@@ -168,7 +177,7 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
}
channel := item.Channel
if channel == "" {
channel = "discord"
channel = "unknown"
}
name := item.Name
if name == "" {
@@ -176,12 +185,12 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
}
return models.AgentCardData{
ID: item.ID,
DisplayName: name,
Role: role,
Status: models.AgentStatusIdle, // default; overridden by session merge
SessionKey: "",
Channel: channel,
ID: item.ID,
DisplayName: name,
Role: role,
Status: models.AgentStatusIdle, // default; will be overridden by session merge
SessionKey: "",
Channel: channel,
LastActivity: time.Now().UTC().Format(time.RFC3339),
}
}

View File

@@ -0,0 +1,236 @@
package gateway
import (
"context"
"testing"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
func TestInitialSync(t *testing.T) {
_ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// --- Test agentItemToCard + session merge (the core of initialSync) ---
agentItems := []agentListItem{
{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"},
{ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"},
}
sessionItems := []sessionListItem{
{SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"},
{SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"},
}
// Build sessionByAgent map (mirrors initialSync logic)
sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems {
if s.AgentID != "" {
sessionByAgent[s.AgentID] = s
}
}
// Merge and verify
merged := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems {
card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok {
card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt
if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100)
card.TaskProgress = &prog
}
}
merged = append(merged, card)
}
// Verify otto: running → active
if merged[0].ID != "otto" {
t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto")
}
if merged[0].Status != models.AgentStatusActive {
t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive)
}
if merged[0].SessionKey != "s1" {
t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1")
}
if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 {
t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress)
}
// Verify dex: done → idle
if merged[1].ID != "dex" {
t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex")
}
if merged[1].Status != models.AgentStatusIdle {
t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle)
}
if merged[1].SessionKey != "s2" {
t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2")
}
}
func TestInitialSync_PersistCreatesNew(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Simulate the persist logic from initialSync:
// new agents should be created
card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"})
ctx := context.Background()
// Agent doesn't exist → create
_, err := repo.Get(ctx, card.ID)
if err == nil {
t.Fatal("expected agent to not exist yet")
}
if err := repo.Create(ctx, card); err != nil {
t.Fatalf("Create failed: %v", err)
}
got, err := repo.Get(ctx, card.ID)
if err != nil {
t.Fatalf("Get after Create failed: %v", err)
}
if got.ID != "otto" {
t.Errorf("got.ID = %q, want %q", got.ID, "otto")
}
if got.DisplayName != "Otto" {
t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto")
}
if got.Role != "Orchestrator" {
t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator")
}
}
func TestInitialSync_PersistUpdatesExisting(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
ctx := context.Background()
// Pre-populate with existing agent
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Role: "Old Role",
Status: models.AgentStatusIdle,
}
// Simulate initialSync: agent exists, name/role changed → update
newName := "Otto Prime"
newRole := "Super Orchestrator"
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
DisplayName: &newName,
Role: &newRole,
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
got, err := repo.Get(ctx, "otto")
if err != nil {
t.Fatalf("Get after Update failed: %v", err)
}
if got.DisplayName != "Otto Prime" {
t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime")
}
if got.Role != "Super Orchestrator" {
t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator")
}
}
func TestInitialSync_MergesSessionStatus(t *testing.T) {
// When initialSync merges session state, an agent whose existing status
// differs from the session-derived status should be updated.
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
ctx := context.Background()
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusIdle,
}
// Simulate session merge: session says "running" → agent should go active
activeStatus := mapSessionStatus("running")
if activeStatus != models.AgentStatusActive {
t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus)
}
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
Status: &activeStatus,
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
got, err := repo.Get(ctx, "otto")
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if got.Status != models.AgentStatusActive {
t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive)
}
}
func TestInitialSync_BroadcastsFleet(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Create some agents in the repo
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive}
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
// Simulate the final broadcast from initialSync
mergedAgents := []models.AgentCardData{
repo.agents["otto"],
repo.agents["dex"],
}
broker.Broadcast("fleet.update", mergedAgents)
events := capture.captured()
if len(events) == 0 {
t.Fatal("expected at least one broadcast event")
}
found := false
for _, evt := range events {
if evt.EventType == "fleet.update" {
found = true
// Verify data is the merged agents list
agents, ok := evt.Data.([]models.AgentCardData)
if !ok {
t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data)
}
if len(agents) != 2 {
t.Errorf("fleet.update agents count = %d, want 2", len(agents))
}
break
}
}
if !found {
t.Error("expected fleet.update broadcast event")
}
}

View File

@@ -1,7 +1,7 @@
// Package gateway provides WebSocket client integration with the OpenClaw
// gateway using WS protocol v3. The WSClient handles connection, handshake,
// frame routing, request/response correlation, and automatic reconnection
// with exponential backoff (1s → 30s max).
// with exponential backoff.
package gateway
import (
@@ -15,8 +15,8 @@ import (
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/google/uuid"
)
// WSConfig holds WebSocket client configuration, typically loaded from
@@ -41,19 +41,21 @@ type eventHandler func(json.RawMessage)
// WSClient connects to the OpenClaw gateway over WebSocket, completes the
// v3 handshake, routes incoming frames, and automatically reconnects on
// disconnect with exponential backoff (1s → 30s max).
// disconnect with exponential backoff.
type WSClient struct {
config WSConfig
conn *websocket.Conn
connMu sync.Mutex // protects conn for writes
pending map[string]chan<- json.RawMessage
mu sync.Mutex // protects pending and handlers
agents repository.AgentRepo
broker *handler.Broker
logger *slog.Logger
handlers map[string][]eventHandler
connID string // set after successful hello-ok
restClient *Client // optional REST client to notify on WS ready
config WSConfig
conn *websocket.Conn
connMu sync.Mutex // protects conn for writes
pending map[string]chan<- json.RawMessage
mu sync.Mutex // protects pending and handlers
agents repository.AgentRepo
broker *handler.Broker
logger *slog.Logger
handlers map[string][]eventHandler
connId string // set after successful hello-ok
restClient *Client // optional REST client to notify on WS ready
wsReadyOnce sync.Once // ensures MarkWSReady close is one-shot
}
// NewWSClient returns a WSClient wired to the given repository and broker.
@@ -79,7 +81,7 @@ func (c *WSClient) SetRESTClient(rest *Client) {
// OnEvent registers a handler for the given event name. Handlers are called
// when an incoming frame with type "event" and matching event name is
// received. Safe to call before Start.
// received. This is safe to call before Start.
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -90,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
type wsFrame struct {
Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req/res frames)
Event string `json:"event,omitempty"` // event name (event frames)
Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames)
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *wsError `json:"error,omitempty"`
@@ -128,7 +130,7 @@ type connectAuth struct {
// helloOKResponse represents the expected response to a successful connect.
type helloOKResponse struct {
ConnID string `json:"connId"`
ConnID string `json:"connId"`
Features struct {
Methods []string `json:"methods"`
Events []string `json:"events"`
@@ -138,11 +140,12 @@ type helloOKResponse struct {
// ── Start loop ───────────────────────────────────────────────────────────
// Start connects to the gateway, completes the handshake, and begins the
// read loop. On disconnect it reconnects with exponential backoff (1s → 30s).
// On ctx cancellation it performs a clean shutdown.
// read loop. On disconnect it reconnects with exponential backoff. On
// ctx cancellation it performs a clean shutdown.
func (c *WSClient) Start(ctx context.Context) {
backoff := 1 * time.Second
initialBackoff := 1 * time.Second
maxBackoff := 30 * time.Second
backoff := initialBackoff
for {
err := c.connectAndRun(ctx)
@@ -154,6 +157,9 @@ func (c *WSClient) Start(ctx context.Context) {
c.logger.Warn("ws client disconnected, reconnecting",
"error", err,
"backoff", backoff)
} else {
// Reset backoff on successful connect+run completion
backoff = initialBackoff
}
select {
@@ -188,14 +194,26 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
c.conn = conn
c.connMu.Unlock()
defer conn.Close()
// When context is cancelled, close the conn to unblock ReadJSON in readLoop.
go func() {
<-ctx.Done()
c.connMu.Lock()
if c.conn != nil {
c.conn.Close()
}
c.connMu.Unlock()
}()
defer func() {
conn.Close()
}()
// Step 1: Read the connect.challenge frame
if err := c.readChallenge(conn); err != nil {
return fmt.Errorf("handshake challenge: %w", err)
}
// Step 2: Send connect request and read hello-ok response
// Step 2: Send connect request
helloOK, err := c.sendConnect(conn)
if err != nil {
return fmt.Errorf("handshake connect: %w", err)
@@ -206,8 +224,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
"methods", helloOK.Features.Methods,
"events", helloOK.Features.Events)
// Store connId for reference
c.connMu.Lock()
c.connID = helloOK.ConnID
c.connId = helloOK.ConnID
c.connMu.Unlock()
// Notify REST client that WS is live so it stands down
@@ -216,15 +235,18 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
c.logger.Info("ws client notified REST fallback to stand down")
}
// Step 3: Initial sync — fetch agents + sessions from gateway
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
c.wsReadyOnce = sync.Once{}
// Step 2b: Initial sync — fetch agents + sessions from gateway
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, continuing with read loop", "error", err)
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
}
// Step 4: Register live event handlers
// Step 2c: Register live event handlers
c.registerEventHandlers()
// Step 5: Read loop — blocks until disconnect or ctx cancel
// Step 3: Read loop
return c.readLoop(ctx, conn)
}
@@ -240,7 +262,7 @@ func (c *WSClient) readChallenge(conn *websocket.Conn) error {
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
}
c.logger.Debug("received connect.challenge")
c.logger.Debug("received connect.challenge", "params", string(frame.Params))
return nil
}
@@ -293,6 +315,8 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
}
// Check for hello-ok method in the result
// The gateway responds with method "hello-ok" on success
var helloOK helloOKResponse
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
return nil, fmt.Errorf("parse hello-ok: %w", err)
@@ -302,25 +326,16 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
}
// readLoop continuously reads frames from the connection and routes them.
// It returns on read error or context cancellation.
// It returns on read error or when the connection is closed by the ctx-done
// goroutine started in connectAndRun.
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
for {
select {
case <-ctx.Done():
// Clean shutdown: send close frame
c.connMu.Lock()
c.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"),
time.Now().Add(5*time.Second),
)
c.connMu.Unlock()
return ctx.Err()
default:
}
var frame wsFrame
if err := conn.ReadJSON(&frame); err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
// Check if it's a close error
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
c.logger.Info("ws connection closed by server")
return nil
@@ -344,7 +359,7 @@ func (c *WSClient) routeFrame(frame wsFrame) {
case "event":
c.handleEvent(frame)
default:
c.logger.Debug("unknown frame type", "type", frame.Type, "id", frame.ID)
c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID)
}
}
@@ -363,6 +378,7 @@ func (c *WSClient) handleResponse(frame wsFrame) {
}
if frame.Error != nil {
// Send nil to signal error; caller checks via Send return
ch <- nil
return
}
@@ -386,20 +402,17 @@ func (c *WSClient) handleEvent(frame wsFrame) {
}
}
// ── Send (RPC) ──────────────────────────────────────────────────────────
// ── Send ─────────────────────────────────────────────────────────────────
// Send sends a JSON-RPC request to the gateway and returns the response
// payload. It is safe for concurrent use.
// Send sends a JSON request to the gateway and returns the response payload.
// It is safe for concurrent use. Returns an error if the client is not
// connected.
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
reqID := uuid.New().String()
var paramsJSON json.RawMessage
if params != nil {
var err error
paramsJSON, err = json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal params: %w", err)
}
paramsJSON, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal params: %w", err)
}
// Register pending response channel
@@ -423,7 +436,11 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
}
c.connMu.Lock()
err := c.conn.WriteJSON(frame)
if c.conn == nil {
c.connMu.Unlock()
return nil, fmt.Errorf("gateway: not connected")
}
err = c.conn.WriteJSON(frame)
c.connMu.Unlock()
if err != nil {
@@ -434,10 +451,10 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
select {
case resp := <-respCh:
if resp == nil {
return nil, fmt.Errorf("gateway returned error for request %s (%s)", reqID, method)
return nil, fmt.Errorf("gateway returned error for request %s", reqID)
}
return resp, nil
case <-time.After(30 * time.Second):
return nil, fmt.Errorf("request %s (%s) timed out", reqID, method)
return nil, fmt.Errorf("request %s timed out", reqID)
}
}

View File

@@ -0,0 +1,484 @@
package gateway
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
"github.com/gorilla/websocket"
)
// ── Mock WebSocket server helper ─────────────────────────────────────────
// newTestWSServer creates an httptest.Server that upgrades to WebSocket and
// delegates each connection to handler. The server URL can be converted to
// a ws:// URL by replacing "http" with "ws".
func newTestWSServer(t *testing.T, handler func(conn *websocket.Conn)) *httptest.Server {
t.Helper()
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
handler(conn)
}))
return srv
}
// wsURL converts an httptest.Server http URL to a ws URL.
func wsURL(srv *httptest.Server) string {
return "ws" + strings.TrimPrefix(srv.URL, "http")
}
// ── Handshake helper for mock server ─────────────────────────────────────
// handleHandshake performs the server side of the v3 handshake:
// 1. Send connect.challenge
// 2. Read connect request
// 3. Send hello-ok response
//
// Returns the connect request frame for inspection.
func handleHandshake(t *testing.T, conn *websocket.Conn) map[string]any {
t.Helper()
// 1. Send connect.challenge
challenge := map[string]any{
"type": "event",
"event": "connect.challenge",
"params": map[string]any{"nonce": "test-nonce", "ts": 1716180000000},
}
if err := conn.WriteJSON(challenge); err != nil {
t.Fatalf("server: write challenge: %v", err)
}
// 2. Read connect request
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
t.Fatalf("server: read connect request: %v", err)
}
if req["method"] != "connect" {
t.Fatalf("server: expected method=connect, got %v", req["method"])
}
// 3. Send hello-ok response
// Note: helloOKResponse expects ConnID at the top level of the result,
// matching the WSClient's JSON struct tags.
result := map[string]any{
"type": "hello-ok",
"protocol": 3,
"connId": "test-conn-123",
"features": map[string]any{"methods": []string{}, "events": []string{}},
"auth": map[string]any{"role": "operator", "scopes": []string{"operator.read"}},
}
res := map[string]any{
"type": "res",
"id": req["id"],
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
t.Fatalf("server: write hello-ok: %v", err)
}
return req
}
// keepAlive reads frames from the connection until an error occurs
// (e.g., the client disconnects). Used as the default "do nothing"
// server loop after handshake.
func keepAlive(conn *websocket.Conn) {
for {
var m map[string]any
if err := conn.ReadJSON(&m); err != nil {
break
}
}
}
// ── 1. Test: Full handshake ──────────────────────────────────────────────
func TestWSClient_Handshake(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait briefly for handshake to complete
time.Sleep(200 * time.Millisecond)
// Verify connId was set
client.connMu.Lock()
connID := client.connId
client.connMu.Unlock()
if connID != "test-conn-123" {
t.Errorf("expected connId 'test-conn-123', got %q", connID)
}
cancel()
select {
case <-done:
// Client exited cleanly
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down after context cancellation")
}
}
// ── 2. Test: Send() with response matching ───────────────────────────────
func TestWSClient_Send(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// Read RPC requests and respond to each
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
var result any
switch method {
case "agents.list":
result = map[string]any{
"agents": []map[string]any{
{"id": "otto", "name": "Otto"},
},
}
default:
result = map[string]any{}
}
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go client.Start(ctx)
// Give the client time to complete handshake
time.Sleep(300 * time.Millisecond)
resp, err := client.Send("agents.list", nil)
if err != nil {
t.Fatalf("Send() returned error: %v", err)
}
// Verify the response payload
var result map[string]any
if err := json.Unmarshal(resp, &result); err != nil {
t.Fatalf("unmarshal response: %v", err)
}
agents, ok := result["agents"].([]any)
if !ok || len(agents) != 1 {
t.Errorf("expected 1 agent in response, got %v", result)
}
cancel()
}
// ── 3. Test: Event handler routing ───────────────────────────────────────
func TestWSClient_EventRouting(t *testing.T) {
eventReceived := make(chan json.RawMessage, 1)
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// After handshake, send a test event
evt := map[string]any{
"type": "event",
"event": "test.event",
"params": map[string]any{"greeting": "hello from server"},
}
if err := conn.WriteJSON(evt); err != nil {
t.Logf("server: write event: %v", err)
return
}
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
// Register event handler BEFORE starting the client
client.OnEvent("test.event", func(payload json.RawMessage) {
eventReceived <- payload
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go client.Start(ctx)
// Wait for the event handler to fire
select {
case payload := <-eventReceived:
var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil {
t.Fatalf("unmarshal event payload: %v", err)
}
if greeting, _ := data["greeting"].(string); greeting != "hello from server" {
t.Errorf("expected greeting 'hello from server', got %q", greeting)
}
case <-time.After(3 * time.Second):
t.Fatal("timed out waiting for event handler to fire")
}
cancel()
}
// ── 4. Test: Concurrent Send ─────────────────────────────────────────────
func TestWSClient_ConcurrentSend(t *testing.T) {
var reqCount atomic.Int32
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// Read RPC requests and respond to each
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
n := reqCount.Add(1)
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": map[string]any{"index": n, "method": req["method"]},
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go client.Start(ctx)
// Give the client time to complete handshake
time.Sleep(300 * time.Millisecond)
// Fire 3 concurrent Send() calls
type sendResult struct {
method string
payload json.RawMessage
err error
}
results := make(chan sendResult, 3)
methods := []string{"agents.list", "sessions.list", "agents.config"}
for _, method := range methods {
go func(m string) {
resp, err := client.Send(m, nil)
results <- sendResult{method: m, payload: resp, err: err}
}(method)
}
// Collect all results
for i := 0; i < 3; i++ {
select {
case r := <-results:
if r.err != nil {
t.Errorf("Send(%q) returned error: %v", r.method, r.err)
continue
}
var result map[string]any
if err := json.Unmarshal(r.payload, &result); err != nil {
t.Errorf("Send(%q) unmarshal error: %v", r.method, err)
continue
}
gotMethod, _ := result["method"].(string)
if gotMethod != r.method {
t.Errorf("Send(%q) got response for %q (mismatched)", r.method, gotMethod)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for concurrent Send results")
}
}
cancel()
}
// ── 5. Test: Clean shutdown ──────────────────────────────────────────────
func TestWSClient_CleanShutdown(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Let the client connect and complete handshake
time.Sleep(200 * time.Millisecond)
// Cancel context — should trigger clean shutdown
cancel()
select {
case <-done:
// Client exited cleanly — pass
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly within timeout")
}
}
// ── Pure utility tests (from CUB-205) ─────────────────────────────────────
func TestMapSessionStatus(t *testing.T) {
tests := []struct {
input string
expected models.AgentStatus
}{
{"running", models.AgentStatusActive},
{"streaming", models.AgentStatusActive},
{"done", models.AgentStatusIdle},
{"error", models.AgentStatusError},
{"", models.AgentStatusIdle},
{"garbage", models.AgentStatusIdle},
}
for _, tt := range tests {
result := mapSessionStatus(tt.input)
if result != tt.expected {
t.Errorf("mapSessionStatus(%q) = %q, want %q", tt.input, result, tt.expected)
}
}
}
func TestAgentItemToCard(t *testing.T) {
t.Run("full fields", func(t *testing.T) {
item := agentListItem{
ID: "dex",
Name: "Dex",
Role: "backend",
Channel: "telegram",
}
card := agentItemToCard(item)
if card.ID != "dex" {
t.Errorf("ID = %q, want %q", card.ID, "dex")
}
if card.DisplayName != "Dex" {
t.Errorf("DisplayName = %q, want %q", card.DisplayName, "Dex")
}
if card.Role != "backend" {
t.Errorf("Role = %q, want %q", card.Role, "backend")
}
if card.Channel != "telegram" {
t.Errorf("Channel = %q, want %q", card.Channel, "telegram")
}
if card.Status != models.AgentStatusIdle {
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
}
})
t.Run("empty fields use defaults", func(t *testing.T) {
item := agentListItem{
ID: "otto",
}
card := agentItemToCard(item)
if card.ID != "otto" {
t.Errorf("ID = %q, want %q", card.ID, "otto")
}
if card.DisplayName != "otto" {
t.Errorf("DisplayName = %q, want %q (should fallback to ID)", card.DisplayName, "otto")
}
if card.Role != "agent" {
t.Errorf("Role = %q, want %q (default)", card.Role, "agent")
}
if card.Channel != "unknown" {
t.Errorf("Channel = %q, want %q (per Grimm requirement)", card.Channel, "unknown")
}
if card.Status != models.AgentStatusIdle {
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
}
})
t.Run("empty name falls back to ID", func(t *testing.T) {
item := agentListItem{
ID: "hex",
Name: "",
Role: "database",
}
card := agentItemToCard(item)
if card.DisplayName != "hex" {
t.Errorf("DisplayName = %q, want %q (ID fallback)", card.DisplayName, "hex")
}
})
}
func TestStrPtr(t *testing.T) {
s := "hello"
p := strPtr(s)
if p == nil {
t.Fatal("strPtr returned nil")
}
if *p != s {
t.Errorf("strPtr(%q) = %q, want %q", s, *p, s)
}
empty := ""
ep := strPtr(empty)
if *ep != empty {
t.Errorf("strPtr(empty) = %q, want %q", *ep, empty)
}
}

View File

@@ -63,12 +63,15 @@ type CreateAgentRequest struct {
// UpdateAgentRequest is the payload for PUT /api/agents/{id}.
type UpdateAgentRequest struct {
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
CurrentTask *string `json:"currentTask,omitempty"`
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
TaskElapsed *string `json:"taskElapsed,omitempty"`
Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"`
ErrorMessage *string `json:"errorMessage,omitempty"`
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
DisplayName *string `json:"displayName,omitempty"`
Role *string `json:"role,omitempty"`
LastActivityAt *string `json:"lastActivityAt,omitempty"`
CurrentTask *string `json:"currentTask,omitempty"`
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
TaskElapsed *string `json:"taskElapsed,omitempty"`
Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"`
ErrorMessage *string `json:"errorMessage,omitempty"`
}
// AgentStatusHistoryEntry represents a point-in-time status change for an agent.

View File

@@ -0,0 +1,46 @@
# Control Center — Architecture Context
## Current State
The Control Center backend uses a **dual-path gateway client** architecture:
- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff.
- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup.
## Live Gateway Connection
### Startup Sequence
1. Both WS client and REST client start concurrently
2. REST client waits 30s for WS readiness signal (`wsReady` channel)
3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down")
4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling")
5. If no WS client configured → REST client polls immediately
### WebSocket Client (Primary)
- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN`
- Protocol: v3 handshake (challenge → connect → hello-ok)
- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update`
- Live events: `sessions.changed`, `presence`, `agent.config`
- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max)
### REST Poller (Fallback)
- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s)
- Only used when WS is unavailable
- Polls the `/api/agents` endpoint and syncs agent status changes
### Wiring
```
main.go
├── wsClient = NewWSClient(...)
├── restClient = NewClient(...)
├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready
├── restClient.SetWSClient(wsClient) // REST defers to WS
├── go wsClient.Start(ctx) // primary
└── go restClient.Start(ctx) // fallback (waits for WS)
```
## Key Design Decisions
- **Push over poll**: WS is preferred for real-time updates; REST is a safety net
- **DB first, then SSE**: All event handlers persist to DB before broadcasting
- **Graceful degradation**: System works without WS; REST provides basic functionality
- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called