Compare commits
3 Commits
agent/rex/
...
agent/dex/
| Author | SHA1 | Date | |
|---|---|---|---|
| d9a1640b10 | |||
| 6fd2d9bec4 | |||
|
|
d28d6e8dac |
17
.env.example
17
.env.example
@@ -12,15 +12,16 @@ ENVIRONMENT=development
|
|||||||
# Format: postgresql://user:password@host:port/database?sslmode=disable
|
# Format: postgresql://user:password@host:port/database?sslmode=disable
|
||||||
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
|
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
|
||||||
|
|
||||||
# Gateway (OpenClaw) connection
|
# Gateway (OpenClaw) connection — WebSocket (primary)
|
||||||
# WebSocket gateway config (primary path)
|
# WebSocket URL for real-time OpenClaw gateway v3 protocol
|
||||||
WS_GATEWAY_URL=ws://host.docker.internal:18789/
|
GATEWAY_WS_URL=ws://localhost:18789/
|
||||||
# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment)
|
# Auth token for the OpenClaw gateway (operator scope)
|
||||||
GATEWAY_TOKEN=
|
OPENCLAW_GATEWAY_TOKEN=
|
||||||
|
|
||||||
# REST poller config (fallback, only used if WS fails to connect)
|
# Gateway (OpenClaw) connection — REST (fallback)
|
||||||
GATEWAY_URL=http://host.docker.internal:18789/api/agents
|
# URL to the OpenClaw gateway API for polling agent states (used only if WS fails)
|
||||||
# Polling interval for agent state updates (fallback only)
|
GATEWAY_URL=http://localhost:18789/api/agents
|
||||||
|
# Polling interval for agent state updates
|
||||||
GATEWAY_POLL_INTERVAL=5s
|
GATEWAY_POLL_INTERVAL=5s
|
||||||
|
|
||||||
# ── Frontend Variables (via Vite) ───────────────────────────────────────
|
# ── Frontend Variables (via Vite) ───────────────────────────────────────
|
||||||
|
|||||||
@@ -1,11 +0,0 @@
|
|||||||
FROM catthehacker/ubuntu:act-latest
|
|
||||||
|
|
||||||
# Install Go 1.23
|
|
||||||
RUN curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
|
|
||||||
|
|
||||||
# Install Node 22
|
|
||||||
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
|
|
||||||
&& apt-get install -y nodejs \
|
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
|
||||||
|
|
||||||
ENV PATH="/usr/local/go/bin:${PATH}"
|
|
||||||
@@ -16,8 +16,6 @@ services:
|
|||||||
- ENVIRONMENT=production
|
- ENVIRONMENT=production
|
||||||
- PORT=8080
|
- PORT=8080
|
||||||
- GATEWAY_URL=http://host.docker.internal:18789/api/agents
|
- GATEWAY_URL=http://host.docker.internal:18789/api/agents
|
||||||
- WS_GATEWAY_URL=ws://host.docker.internal:18789/
|
|
||||||
- GATEWAY_TOKEN=${GATEWAY_TOKEN:-}
|
|
||||||
depends_on:
|
depends_on:
|
||||||
db:
|
db:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
|||||||
1129
frontend/package-lock.json
generated
1129
frontend/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -7,9 +7,7 @@
|
|||||||
"dev": "vite",
|
"dev": "vite",
|
||||||
"build": "tsc -b && vite build",
|
"build": "tsc -b && vite build",
|
||||||
"lint": "eslint .",
|
"lint": "eslint .",
|
||||||
"preview": "vite preview",
|
"preview": "vite preview"
|
||||||
"test": "vitest run",
|
|
||||||
"test:watch": "vitest"
|
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@tanstack/react-query": "^5.100.9",
|
"@tanstack/react-query": "^5.100.9",
|
||||||
@@ -22,8 +20,6 @@
|
|||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@eslint/js": "^10.0.1",
|
"@eslint/js": "^10.0.1",
|
||||||
"@tailwindcss/vite": "^4.2.4",
|
"@tailwindcss/vite": "^4.2.4",
|
||||||
"@testing-library/jest-dom": "^6.9.1",
|
|
||||||
"@testing-library/react": "^16.3.2",
|
|
||||||
"@types/node": "^24.12.2",
|
"@types/node": "^24.12.2",
|
||||||
"@types/react": "^19.2.14",
|
"@types/react": "^19.2.14",
|
||||||
"@types/react-dom": "^19.2.3",
|
"@types/react-dom": "^19.2.3",
|
||||||
@@ -33,12 +29,10 @@
|
|||||||
"eslint-plugin-react-hooks": "^7.1.1",
|
"eslint-plugin-react-hooks": "^7.1.1",
|
||||||
"eslint-plugin-react-refresh": "^0.5.2",
|
"eslint-plugin-react-refresh": "^0.5.2",
|
||||||
"globals": "^17.5.0",
|
"globals": "^17.5.0",
|
||||||
"jsdom": "^29.1.1",
|
|
||||||
"postcss": "^8.5.14",
|
"postcss": "^8.5.14",
|
||||||
"tailwindcss": "^4.2.4",
|
"tailwindcss": "^4.2.4",
|
||||||
"typescript": "~6.0.2",
|
"typescript": "~6.0.2",
|
||||||
"typescript-eslint": "^8.58.2",
|
"typescript-eslint": "^8.58.2",
|
||||||
"vite": "^8.0.10",
|
"vite": "^8.0.10"
|
||||||
"vitest": "^4.1.7"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
import { useState } from 'react'
|
import { useState } from 'react'
|
||||||
import { NavLink } from 'react-router-dom'
|
import { NavLink } from 'react-router-dom'
|
||||||
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X, Wifi, WifiOff, Loader } from 'lucide-react'
|
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X } from 'lucide-react'
|
||||||
import { useSSEContext } from '../contexts/SSEContext'
|
|
||||||
import type { SSEStatus } from '../hooks/useSSE'
|
|
||||||
|
|
||||||
const navItems = [
|
const navItems = [
|
||||||
{ to: '/', icon: Command, label: 'Hub' },
|
{ to: '/', icon: Command, label: 'Hub' },
|
||||||
@@ -12,29 +10,9 @@ const navItems = [
|
|||||||
{ to: '/settings', icon: Settings, label: 'Settings' },
|
{ to: '/settings', icon: Settings, label: 'Settings' },
|
||||||
]
|
]
|
||||||
|
|
||||||
/** Small status pill shown in the sidebar footer and mobile header. */
|
|
||||||
function SSEStatusBadge({ status, showLabel = false }: { status: SSEStatus; showLabel?: boolean }) {
|
|
||||||
const cfg = {
|
|
||||||
connected: { icon: Wifi, color: 'text-green-500', label: 'Live' },
|
|
||||||
connecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Connecting' },
|
|
||||||
reconnecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Reconnecting' },
|
|
||||||
error: { icon: WifiOff, color: 'text-red-500', label: 'Disconnected' },
|
|
||||||
}[status]
|
|
||||||
|
|
||||||
const Icon = cfg.icon
|
|
||||||
|
|
||||||
return (
|
|
||||||
<div className="flex items-center gap-1.5" title={cfg.label}>
|
|
||||||
<Icon size={14} className={cfg.color} />
|
|
||||||
{showLabel && <span className={`text-xs ${cfg.color}`}>{cfg.label}</span>}
|
|
||||||
</div>
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
export default function Layout({ children }: { children: React.ReactNode }) {
|
export default function Layout({ children }: { children: React.ReactNode }) {
|
||||||
const [expanded, setExpanded] = useState(false)
|
const [expanded, setExpanded] = useState(false)
|
||||||
const [mobileOpen, setMobileOpen] = useState(false)
|
const [mobileOpen, setMobileOpen] = useState(false)
|
||||||
const { sseStatus } = useSSEContext()
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
|
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
|
||||||
@@ -68,15 +46,6 @@ export default function Layout({ children }: { children: React.ReactNode }) {
|
|||||||
</NavLink>
|
</NavLink>
|
||||||
))}
|
))}
|
||||||
</nav>
|
</nav>
|
||||||
{/* SSE connection status — footer of sidebar */}
|
|
||||||
<div className="px-4 py-3 border-t border-surface-light flex items-center gap-2">
|
|
||||||
<SSEStatusBadge status={sseStatus} />
|
|
||||||
{expanded && (
|
|
||||||
<span className="text-xs text-on-surface-muted whitespace-nowrap">
|
|
||||||
{sseStatus === 'connected' ? 'Live updates on' : sseStatus}
|
|
||||||
</span>
|
|
||||||
)}
|
|
||||||
</div>
|
|
||||||
</aside>
|
</aside>
|
||||||
|
|
||||||
{/* Mobile Header + Bottom Nav */}
|
{/* Mobile Header + Bottom Nav */}
|
||||||
@@ -85,7 +54,6 @@ export default function Layout({ children }: { children: React.ReactNode }) {
|
|||||||
<div className="flex items-center gap-2">
|
<div className="flex items-center gap-2">
|
||||||
<Command size={22} className="text-primary" />
|
<Command size={22} className="text-primary" />
|
||||||
<span className="font-bold">Control Center</span>
|
<span className="font-bold">Control Center</span>
|
||||||
<SSEStatusBadge status={sseStatus} />
|
|
||||||
</div>
|
</div>
|
||||||
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
|
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
|
||||||
{mobileOpen ? <X size={22} /> : <Menu size={22} />}
|
{mobileOpen ? <X size={22} /> : <Menu size={22} />}
|
||||||
|
|||||||
@@ -1,23 +0,0 @@
|
|||||||
/**
|
|
||||||
* SSEContext — provides SSE connection status throughout the component tree.
|
|
||||||
* Mount <SSEProvider> once inside QueryClientProvider.
|
|
||||||
*/
|
|
||||||
import { createContext, useContext, type ReactNode } from 'react'
|
|
||||||
import { useRealtimeSync } from '../hooks/useRealtimeSync'
|
|
||||||
import type { SSEStatus } from '../hooks/useSSE'
|
|
||||||
|
|
||||||
interface SSEContextValue {
|
|
||||||
sseStatus: SSEStatus
|
|
||||||
}
|
|
||||||
|
|
||||||
const SSEContext = createContext<SSEContextValue>({ sseStatus: 'connecting' })
|
|
||||||
|
|
||||||
export function SSEProvider({ children }: { children: ReactNode }) {
|
|
||||||
const { sseStatus } = useRealtimeSync()
|
|
||||||
return <SSEContext.Provider value={{ sseStatus }}>{children}</SSEContext.Provider>
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Access the SSE connection status from any component. */
|
|
||||||
export function useSSEContext(): SSEContextValue {
|
|
||||||
return useContext(SSEContext)
|
|
||||||
}
|
|
||||||
@@ -1,129 +0,0 @@
|
|||||||
/**
|
|
||||||
* Tests for useRealtimeSync — event → query invalidation mapping.
|
|
||||||
*
|
|
||||||
* Uses .tsx extension so Vite/OXC can parse JSX in the wrapper component.
|
|
||||||
*/
|
|
||||||
import { renderHook } from '@testing-library/react'
|
|
||||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
|
||||||
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
|
|
||||||
import * as useSSEModule from './useSSE'
|
|
||||||
import { useRealtimeSync } from './useRealtimeSync'
|
|
||||||
import React from 'react'
|
|
||||||
import type { SSEMessage } from '../services/sse'
|
|
||||||
|
|
||||||
describe('useRealtimeSync', () => {
|
|
||||||
let queryClient: QueryClient
|
|
||||||
let mockSSEOnMessage: ((msg: { type: string; data: unknown }) => void) | null = null
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
queryClient = new QueryClient({
|
|
||||||
defaultOptions: { queries: { retry: false } },
|
|
||||||
})
|
|
||||||
mockSSEOnMessage = null
|
|
||||||
|
|
||||||
// Spy on useSSE to capture the onMessage callback
|
|
||||||
vi.spyOn(useSSEModule, 'useSSE').mockImplementation((opts) => {
|
|
||||||
mockSSEOnMessage = opts?.onMessage ?? null
|
|
||||||
return { status: 'connected' }
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
vi.restoreAllMocks()
|
|
||||||
})
|
|
||||||
|
|
||||||
function render() {
|
|
||||||
return renderHook(() => useRealtimeSync(), {
|
|
||||||
wrapper: ({ children }: { children: React.ReactNode }) => (
|
|
||||||
React.createElement(QueryClientProvider, { client: queryClient }, children)
|
|
||||||
),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
it('invalidates ["agents"] on agent.status event', async () => {
|
|
||||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
|
||||||
render()
|
|
||||||
|
|
||||||
const msg: SSEMessage = {
|
|
||||||
type: 'agent.status',
|
|
||||||
data: { agentId: 'a1', status: 'active' },
|
|
||||||
}
|
|
||||||
mockSSEOnMessage!(msg)
|
|
||||||
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledTimes(1)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('invalidates ["tasks"] and ["agents"] on agent.task event', async () => {
|
|
||||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
|
||||||
render()
|
|
||||||
|
|
||||||
const msg: SSEMessage = {
|
|
||||||
type: 'agent.task',
|
|
||||||
data: { agentId: 'a1', taskId: 't1', title: 'Test', action: 'assigned' },
|
|
||||||
}
|
|
||||||
mockSSEOnMessage!(msg)
|
|
||||||
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledTimes(2)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('invalidates ["tasks"] and ["agents"] on agent.progress event', async () => {
|
|
||||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
|
||||||
render()
|
|
||||||
|
|
||||||
const msg: SSEMessage = {
|
|
||||||
type: 'agent.progress',
|
|
||||||
data: { agentId: 'a1', taskId: 't1', progress: 50, message: 'working' },
|
|
||||||
}
|
|
||||||
mockSSEOnMessage!(msg)
|
|
||||||
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledTimes(2)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('invalidates ["agents"], ["sessions"], ["tasks"] on fleet.update event', async () => {
|
|
||||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
|
||||||
render()
|
|
||||||
|
|
||||||
const msg: SSEMessage = {
|
|
||||||
type: 'fleet.update',
|
|
||||||
data: { timestamp: '2026-05-20T12:00:00Z', agentCount: 5 },
|
|
||||||
}
|
|
||||||
mockSSEOnMessage!(msg)
|
|
||||||
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['sessions'] })
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
|
|
||||||
expect(invalidateSpy).toHaveBeenCalledTimes(3)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('does nothing on connected event', async () => {
|
|
||||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
|
||||||
render()
|
|
||||||
|
|
||||||
const msg: SSEMessage = {
|
|
||||||
type: 'connected',
|
|
||||||
data: { clientCount: 1 },
|
|
||||||
}
|
|
||||||
mockSSEOnMessage!(msg)
|
|
||||||
|
|
||||||
expect(invalidateSpy).not.toHaveBeenCalled()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('does nothing on unknown event types', async () => {
|
|
||||||
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
|
|
||||||
render()
|
|
||||||
|
|
||||||
mockSSEOnMessage!({ type: 'unknown.event', data: {} })
|
|
||||||
|
|
||||||
expect(invalidateSpy).not.toHaveBeenCalled()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('returns sseStatus from useSSE', () => {
|
|
||||||
const { result } = render()
|
|
||||||
expect(result.current.sseStatus).toBe('connected')
|
|
||||||
})
|
|
||||||
})
|
|
||||||
@@ -1,64 +0,0 @@
|
|||||||
/**
|
|
||||||
* useRealtimeSync — mounts the SSE connection once at the app level and
|
|
||||||
* wires incoming events to React Query cache invalidation.
|
|
||||||
*
|
|
||||||
* Event → query key mapping:
|
|
||||||
* agent.status → ['agents']
|
|
||||||
* agent.task → ['tasks'], ['agents']
|
|
||||||
* agent.progress → ['tasks'], ['agents']
|
|
||||||
* fleet.update → ['agents'], ['sessions'], ['tasks']
|
|
||||||
*/
|
|
||||||
import { useQueryClient } from '@tanstack/react-query'
|
|
||||||
import { useCallback } from 'react'
|
|
||||||
import { useSSE, type SSEStatus } from './useSSE'
|
|
||||||
import type { SSEMessage } from '../services/sse'
|
|
||||||
|
|
||||||
export function useRealtimeSync(): { sseStatus: SSEStatus } {
|
|
||||||
const queryClient = useQueryClient()
|
|
||||||
|
|
||||||
const handleMessage = useCallback(
|
|
||||||
(raw: { type: string; data: unknown }) => {
|
|
||||||
// Cast to discriminated union — the backend contract guarantees these shapes
|
|
||||||
const msg = raw as SSEMessage
|
|
||||||
|
|
||||||
switch (msg.type) {
|
|
||||||
case 'agent.status':
|
|
||||||
// msg.data: AgentStatusEvent { agentId, status, reason? }
|
|
||||||
void msg.data.agentId // retained for type-narrowing — ensures payload matches contract
|
|
||||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'agent.task':
|
|
||||||
// msg.data: AgentTaskEvent { agentId, taskId, title, action }
|
|
||||||
void msg.data.agentId
|
|
||||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
|
||||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'agent.progress':
|
|
||||||
// msg.data: AgentProgressEvent { agentId, taskId, progress, message? }
|
|
||||||
void msg.data.agentId
|
|
||||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
|
||||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
|
||||||
break
|
|
||||||
|
|
||||||
case 'fleet.update':
|
|
||||||
// msg.data: FleetUpdateEvent { timestamp, agentCount }
|
|
||||||
void msg.data.agentCount
|
|
||||||
queryClient.invalidateQueries({ queryKey: ['agents'] })
|
|
||||||
queryClient.invalidateQueries({ queryKey: ['sessions'] })
|
|
||||||
queryClient.invalidateQueries({ queryKey: ['tasks'] })
|
|
||||||
break
|
|
||||||
|
|
||||||
default:
|
|
||||||
// 'connected' and unknown events — no action needed
|
|
||||||
break
|
|
||||||
}
|
|
||||||
},
|
|
||||||
[queryClient],
|
|
||||||
)
|
|
||||||
|
|
||||||
const { status: sseStatus } = useSSE({ onMessage: handleMessage })
|
|
||||||
|
|
||||||
return { sseStatus }
|
|
||||||
}
|
|
||||||
@@ -1,267 +0,0 @@
|
|||||||
/**
|
|
||||||
* Tests for useSSE — SSE connection lifecycle, back-off, event parsing, and cleanup.
|
|
||||||
*
|
|
||||||
* jsdom does not include EventSource, so we mock it completely.
|
|
||||||
*/
|
|
||||||
import { renderHook, act, waitFor } from '@testing-library/react'
|
|
||||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
|
||||||
import { useSSE } from './useSSE'
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
// Mock EventSource — defined as a plain class so `new EventSource()` works
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
class MockEventSource {
|
|
||||||
url: string
|
|
||||||
onopen: (() => void) | null = null
|
|
||||||
onerror: ((evt: Event) => void) | null = null
|
|
||||||
onmessage: ((evt: MessageEvent) => void) | null = null
|
|
||||||
private listeners: Map<string, Array<(evt: Event) => void>> = new Map()
|
|
||||||
readyState: number = 0
|
|
||||||
|
|
||||||
constructor(url: string) {
|
|
||||||
this.url = url
|
|
||||||
}
|
|
||||||
|
|
||||||
addEventListener(type: string, handler: (evt: Event) => void) {
|
|
||||||
if (!this.listeners.has(type)) this.listeners.set(type, [])
|
|
||||||
this.listeners.get(type)!.push(handler)
|
|
||||||
}
|
|
||||||
|
|
||||||
removeEventListener() { /* no-op for tests */ }
|
|
||||||
|
|
||||||
close() {
|
|
||||||
this.readyState = 2
|
|
||||||
this.onopen = null
|
|
||||||
this.onerror = null
|
|
||||||
this.onmessage = null
|
|
||||||
this.listeners.clear()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test helpers
|
|
||||||
_simulateOpen() { this.onopen?.() }
|
|
||||||
_simulateError() { this.onerror?.(new Event('error')) }
|
|
||||||
_simulateNamedEvent(type: string, data: string) {
|
|
||||||
const handlers = this.listeners.get(type)
|
|
||||||
if (handlers) {
|
|
||||||
const evt = new MessageEvent(type, { data }) as Event
|
|
||||||
handlers.forEach((h) => h(evt))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_simulateMessage(data: string) {
|
|
||||||
this.onmessage?.(new MessageEvent('message', { data }) as MessageEvent)
|
|
||||||
}
|
|
||||||
|
|
||||||
static readonly CONNECTING = 0
|
|
||||||
static readonly OPEN = 1
|
|
||||||
static readonly CLOSED = 2
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
// Tests
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
let esInstances: MockEventSource[]
|
|
||||||
|
|
||||||
describe('useSSE', () => {
|
|
||||||
beforeEach(() => {
|
|
||||||
esInstances = []
|
|
||||||
// Replace global EventSource with our mock class
|
|
||||||
Object.defineProperty(globalThis, 'EventSource', {
|
|
||||||
// The mock must use a class for `new EventSource()` to work
|
|
||||||
value: class extends MockEventSource {
|
|
||||||
constructor(url: string) {
|
|
||||||
super(url)
|
|
||||||
esInstances.push(this)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
writable: true,
|
|
||||||
configurable: true,
|
|
||||||
})
|
|
||||||
vi.useFakeTimers({ shouldAdvanceTime: true })
|
|
||||||
})
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
vi.restoreAllMocks()
|
|
||||||
vi.useRealTimers()
|
|
||||||
})
|
|
||||||
|
|
||||||
// ── Initial connection ──────────────────────────────────────────────────
|
|
||||||
it('starts in "connecting" state and creates an EventSource', () => {
|
|
||||||
const { result } = renderHook(() => useSSE({ url: '/api/events' }))
|
|
||||||
|
|
||||||
expect(result.current.status).toBe('connecting')
|
|
||||||
expect(esInstances.length).toBeGreaterThanOrEqual(1)
|
|
||||||
expect(esInstances[0].url).toBe('/api/events')
|
|
||||||
})
|
|
||||||
|
|
||||||
it('transitions to "connected" on open', async () => {
|
|
||||||
const onOpen = vi.fn()
|
|
||||||
const { result } = renderHook(() => useSSE({ url: '/api/events', onOpen }))
|
|
||||||
|
|
||||||
act(() => { esInstances[0]._simulateOpen() })
|
|
||||||
|
|
||||||
await waitFor(() => {
|
|
||||||
expect(result.current.status).toBe('connected')
|
|
||||||
})
|
|
||||||
expect(onOpen).toHaveBeenCalledTimes(1)
|
|
||||||
})
|
|
||||||
|
|
||||||
// ── Reconnection with exponential back-off ──────────────────────────────
|
|
||||||
it('retries after error with exponential back-off', async () => {
|
|
||||||
const { result } = renderHook(() =>
|
|
||||||
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 30000 }),
|
|
||||||
)
|
|
||||||
|
|
||||||
// First error → reconnecting, retry at 1s
|
|
||||||
act(() => { esInstances[0]._simulateError() })
|
|
||||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
|
||||||
expect(esInstances).toHaveLength(1)
|
|
||||||
|
|
||||||
// Advance 1000ms → second EventSource created
|
|
||||||
act(() => { vi.advanceTimersByTime(1000) })
|
|
||||||
expect(esInstances).toHaveLength(2)
|
|
||||||
|
|
||||||
// Second error → reconnecting, retry at 2s
|
|
||||||
act(() => { esInstances[1]._simulateError() })
|
|
||||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
|
||||||
act(() => { vi.advanceTimersByTime(2000) })
|
|
||||||
expect(esInstances).toHaveLength(3)
|
|
||||||
|
|
||||||
// Third error → reconnecting, retry at 4s
|
|
||||||
act(() => { esInstances[2]._simulateError() })
|
|
||||||
act(() => { vi.advanceTimersByTime(4000) })
|
|
||||||
expect(esInstances).toHaveLength(4)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('caps reconnect delay at reconnectMaxMs', async () => {
|
|
||||||
renderHook(() =>
|
|
||||||
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 10000 }),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Force 5 errors to push the exponent past the cap
|
|
||||||
for (let i = 0; i < 5; i++) {
|
|
||||||
act(() => { esInstances[i]._simulateError() })
|
|
||||||
const expectedDelay = Math.min(1000 * 2 ** i, 10000)
|
|
||||||
act(() => { vi.advanceTimersByTime(expectedDelay) })
|
|
||||||
}
|
|
||||||
|
|
||||||
// 6 ES instances created (initial + 5 retries)
|
|
||||||
expect(esInstances).toHaveLength(6)
|
|
||||||
})
|
|
||||||
|
|
||||||
// ── Circuit-breaker (max retries) ───────────────────────────────────────
|
|
||||||
it('transitions to "error" after reconnectLimit is exceeded', async () => {
|
|
||||||
const { result } = renderHook(() =>
|
|
||||||
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 2 }),
|
|
||||||
)
|
|
||||||
|
|
||||||
// First error → reconnecting
|
|
||||||
act(() => { esInstances[0]._simulateError() })
|
|
||||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
|
||||||
|
|
||||||
// Advance → retry
|
|
||||||
act(() => { vi.advanceTimersByTime(100) })
|
|
||||||
|
|
||||||
// Second error → reconnecting (attempt 2, still ≤ limit)
|
|
||||||
act(() => { esInstances[1]._simulateError() })
|
|
||||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
|
||||||
act(() => { vi.advanceTimersByTime(200) })
|
|
||||||
|
|
||||||
// Third error → limit exceeded (3 > 2) → error
|
|
||||||
act(() => { esInstances[2]._simulateError() })
|
|
||||||
await waitFor(() => { expect(result.current.status).toBe('error') })
|
|
||||||
})
|
|
||||||
|
|
||||||
it('resets reconnect counter on successful connection', async () => {
|
|
||||||
const { result } = renderHook(() =>
|
|
||||||
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 3 }),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Two errors then a successful connect
|
|
||||||
act(() => { esInstances[0]._simulateError() })
|
|
||||||
act(() => { vi.advanceTimersByTime(100) })
|
|
||||||
|
|
||||||
act(() => { esInstances[1]._simulateOpen() })
|
|
||||||
await waitFor(() => { expect(result.current.status).toBe('connected') })
|
|
||||||
|
|
||||||
// Now error again — counter should be reset, so we get fresh attempts
|
|
||||||
act(() => { esInstances[1]._simulateError() })
|
|
||||||
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
|
|
||||||
expect(result.current.status).toBe('reconnecting')
|
|
||||||
})
|
|
||||||
|
|
||||||
// ── Cleanup on unmount ───────────────────────────────────────────────────
|
|
||||||
it('closes EventSource on unmount', () => {
|
|
||||||
const closeSpy = vi.spyOn(MockEventSource.prototype, 'close')
|
|
||||||
const { unmount } = renderHook(() => useSSE({ url: '/api/events' }))
|
|
||||||
|
|
||||||
unmount()
|
|
||||||
expect(closeSpy).toHaveBeenCalled()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('does not update state after unmount', async () => {
|
|
||||||
const { result, unmount } = renderHook(() => useSSE({ url: '/api/events' }))
|
|
||||||
|
|
||||||
unmount()
|
|
||||||
|
|
||||||
// These should be no-ops after unmount (mountedRef guards)
|
|
||||||
act(() => { esInstances[0]._simulateOpen() })
|
|
||||||
act(() => { esInstances[0]._simulateError() })
|
|
||||||
|
|
||||||
// State should not have changed
|
|
||||||
expect(result.current.status).toBe('connecting')
|
|
||||||
})
|
|
||||||
|
|
||||||
// ── Event parsing ───────────────────────────────────────────────────────
|
|
||||||
it('parses valid JSON data into objects', async () => {
|
|
||||||
const onMessage = vi.fn()
|
|
||||||
renderHook(() => useSSE({ url: '/api/events', onMessage }))
|
|
||||||
|
|
||||||
act(() => {
|
|
||||||
esInstances[0]._simulateNamedEvent('agent.status', JSON.stringify({ agentId: 'a1', status: 'active' }))
|
|
||||||
})
|
|
||||||
|
|
||||||
expect(onMessage).toHaveBeenCalledWith({
|
|
||||||
type: 'agent.status',
|
|
||||||
data: { agentId: 'a1', status: 'active' },
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
it('passes invalid JSON through as raw string', async () => {
|
|
||||||
const onMessage = vi.fn()
|
|
||||||
renderHook(() => useSSE({ url: '/api/events', onMessage }))
|
|
||||||
|
|
||||||
act(() => {
|
|
||||||
esInstances[0]._simulateNamedEvent('agent.status', 'not valid json {{{')
|
|
||||||
})
|
|
||||||
|
|
||||||
expect(onMessage).toHaveBeenCalledWith({
|
|
||||||
type: 'agent.status',
|
|
||||||
data: 'not valid json {{{',
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// ── enabled=false skips connection ──────────────────────────────────────
|
|
||||||
it('does not create EventSource when enabled=false', () => {
|
|
||||||
const { result } = renderHook(() => useSSE({ url: '/api/events', enabled: false }))
|
|
||||||
|
|
||||||
expect(esInstances).toHaveLength(0)
|
|
||||||
expect(result.current.status).toBe('connecting')
|
|
||||||
})
|
|
||||||
|
|
||||||
// ── onError callback ────────────────────────────────────────────────────
|
|
||||||
it('calls onError on connection failure', async () => {
|
|
||||||
const onError = vi.fn()
|
|
||||||
renderHook(() =>
|
|
||||||
useSSE({ url: '/api/events', onError, reconnectBaseMs: 100 }),
|
|
||||||
)
|
|
||||||
|
|
||||||
act(() => { esInstances[0]._simulateError() })
|
|
||||||
expect(onError).toHaveBeenCalledTimes(1)
|
|
||||||
})
|
|
||||||
|
|
||||||
// ── Default URL ─────────────────────────────────────────────────────────
|
|
||||||
it('uses /api/events as default URL', () => {
|
|
||||||
renderHook(() => useSSE())
|
|
||||||
expect(esInstances[0].url).toBe('/api/events')
|
|
||||||
})
|
|
||||||
})
|
|
||||||
@@ -1,180 +0,0 @@
|
|||||||
import { useEffect, useRef, useCallback, useState } from 'react'
|
|
||||||
|
|
||||||
/** SSE connection state reported to consumers. */
|
|
||||||
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
|
|
||||||
|
|
||||||
/** Typed SSE event received from the backend. */
|
|
||||||
export interface SSEMessage {
|
|
||||||
/** event: field from the SSE frame */
|
|
||||||
type: string
|
|
||||||
/** parsed JSON from the data: field */
|
|
||||||
data: unknown
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface UseSSEOptions {
|
|
||||||
/** Endpoint URL — defaults to /api/events */
|
|
||||||
url?: string
|
|
||||||
/** Called for every SSE message (all event types) */
|
|
||||||
onMessage?: (msg: SSEMessage) => void
|
|
||||||
/** Called when connection opens or reconnects */
|
|
||||||
onOpen?: () => void
|
|
||||||
/** Called on every connection error (both transient and terminal) */
|
|
||||||
onError?: (err: Event) => void
|
|
||||||
/** Base delay in ms before the first reconnect attempt (default 1 000) */
|
|
||||||
reconnectBaseMs?: number
|
|
||||||
/** Maximum reconnect delay in ms (default 30 000) */
|
|
||||||
reconnectMaxMs?: number
|
|
||||||
/**
|
|
||||||
* Maximum number of consecutive reconnect attempts before giving up.
|
|
||||||
* When the limit is reached, status transitions to 'error'.
|
|
||||||
* Default undefined (unlimited).
|
|
||||||
*/
|
|
||||||
reconnectLimit?: number
|
|
||||||
/** Set false to disable auto-connect (useful in tests) */
|
|
||||||
enabled?: boolean
|
|
||||||
}
|
|
||||||
|
|
||||||
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
|
|
||||||
|
|
||||||
/**
|
|
||||||
* useSSE — mounts a persistent SSE connection to the Control Center backend.
|
|
||||||
*
|
|
||||||
* Handles:
|
|
||||||
* - Initial connection on mount
|
|
||||||
* - Exponential back-off reconnection on drop (1s → 2s → 4s … capped at reconnectMaxMs)
|
|
||||||
* - Circuit-breaker: after reconnectLimit consecutive failures, transitions to 'error'
|
|
||||||
* - Cleanup on unmount
|
|
||||||
* - All five event types: agent.status, agent.task, agent.progress, fleet.update, connected
|
|
||||||
*
|
|
||||||
* The 'connected' SSE event is an application-level handshake sent by the backend
|
|
||||||
* after the transport opens. This is distinct from onOpen, which fires at the
|
|
||||||
* transport level when the EventSource HTTP connection is established.
|
|
||||||
*/
|
|
||||||
export function useSSE({
|
|
||||||
url = '/api/events',
|
|
||||||
onMessage,
|
|
||||||
onOpen,
|
|
||||||
onError,
|
|
||||||
reconnectBaseMs = 1_000,
|
|
||||||
reconnectMaxMs = 30_000,
|
|
||||||
reconnectLimit,
|
|
||||||
enabled = true,
|
|
||||||
}: UseSSEOptions = {}): { status: SSEStatus } {
|
|
||||||
const [status, setStatus] = useState<SSEStatus>('connecting')
|
|
||||||
|
|
||||||
// Stable refs so the effect doesn't need to re-run when callbacks change
|
|
||||||
const onMessageRef = useRef(onMessage)
|
|
||||||
const onOpenRef = useRef(onOpen)
|
|
||||||
const onErrorRef = useRef(onError)
|
|
||||||
onMessageRef.current = onMessage
|
|
||||||
onOpenRef.current = onOpen
|
|
||||||
onErrorRef.current = onError
|
|
||||||
|
|
||||||
const reconnectAttemptRef = useRef(0)
|
|
||||||
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
|
|
||||||
const esRef = useRef<EventSource | null>(null)
|
|
||||||
const mountedRef = useRef(true)
|
|
||||||
|
|
||||||
const clearReconnectTimer = useCallback(() => {
|
|
||||||
if (reconnectTimerRef.current !== null) {
|
|
||||||
clearTimeout(reconnectTimerRef.current)
|
|
||||||
reconnectTimerRef.current = null
|
|
||||||
}
|
|
||||||
}, [])
|
|
||||||
|
|
||||||
const connect = useCallback(() => {
|
|
||||||
if (!mountedRef.current || !enabled) return
|
|
||||||
|
|
||||||
// Clean up any existing connection
|
|
||||||
if (esRef.current) {
|
|
||||||
esRef.current.close()
|
|
||||||
esRef.current = null
|
|
||||||
}
|
|
||||||
|
|
||||||
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
|
|
||||||
|
|
||||||
const es = new EventSource(url)
|
|
||||||
esRef.current = es
|
|
||||||
|
|
||||||
es.onopen = () => {
|
|
||||||
if (!mountedRef.current) return
|
|
||||||
reconnectAttemptRef.current = 0
|
|
||||||
setStatus('connected')
|
|
||||||
onOpenRef.current?.()
|
|
||||||
}
|
|
||||||
|
|
||||||
es.onerror = (evt) => {
|
|
||||||
if (!mountedRef.current) return
|
|
||||||
|
|
||||||
// EventSource auto-retries but we manage our own to get back-off control
|
|
||||||
es.close()
|
|
||||||
esRef.current = null
|
|
||||||
|
|
||||||
onErrorRef.current?.(evt)
|
|
||||||
|
|
||||||
reconnectAttemptRef.current += 1
|
|
||||||
|
|
||||||
// Circuit-breaker: give up after reconnectLimit consecutive failures
|
|
||||||
if (reconnectLimit !== undefined && reconnectAttemptRef.current > reconnectLimit) {
|
|
||||||
setStatus('error')
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
|
|
||||||
// Note: attempt is 1-based here (already incremented), so we use attempt-1 for the exponent
|
|
||||||
const delay = Math.min(
|
|
||||||
reconnectBaseMs * 2 ** (reconnectAttemptRef.current - 1),
|
|
||||||
reconnectMaxMs,
|
|
||||||
)
|
|
||||||
setStatus('reconnecting')
|
|
||||||
|
|
||||||
clearReconnectTimer()
|
|
||||||
reconnectTimerRef.current = setTimeout(() => {
|
|
||||||
if (mountedRef.current) connect()
|
|
||||||
}, delay)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register listeners for all known event types
|
|
||||||
for (const eventType of SSE_EVENTS) {
|
|
||||||
es.addEventListener(eventType, (evt: MessageEvent) => {
|
|
||||||
if (!mountedRef.current) return
|
|
||||||
let data: unknown = evt.data
|
|
||||||
try {
|
|
||||||
data = JSON.parse(evt.data as string)
|
|
||||||
} catch {
|
|
||||||
// leave as raw string
|
|
||||||
}
|
|
||||||
onMessageRef.current?.({ type: eventType, data })
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Catch-all for unnamed events (type == 'message').
|
|
||||||
// Won't fire for the named events registered via addEventListener above.
|
|
||||||
es.onmessage = (evt: MessageEvent) => {
|
|
||||||
if (!mountedRef.current) return
|
|
||||||
let data: unknown = evt.data
|
|
||||||
try {
|
|
||||||
data = JSON.parse(evt.data as string)
|
|
||||||
} catch {
|
|
||||||
// leave as raw string
|
|
||||||
}
|
|
||||||
onMessageRef.current?.({ type: 'message', data })
|
|
||||||
}
|
|
||||||
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, reconnectLimit, clearReconnectTimer])
|
|
||||||
|
|
||||||
useEffect(() => {
|
|
||||||
mountedRef.current = true
|
|
||||||
if (enabled) connect()
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
mountedRef.current = false
|
|
||||||
clearReconnectTimer()
|
|
||||||
if (esRef.current) {
|
|
||||||
esRef.current.close()
|
|
||||||
esRef.current = null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, [connect, enabled, clearReconnectTimer])
|
|
||||||
|
|
||||||
return { status }
|
|
||||||
}
|
|
||||||
@@ -4,16 +4,13 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
|
|||||||
import { BrowserRouter } from 'react-router-dom'
|
import { BrowserRouter } from 'react-router-dom'
|
||||||
import ErrorBoundary from './components/ErrorBoundary'
|
import ErrorBoundary from './components/ErrorBoundary'
|
||||||
import { ThemeProvider } from './hooks/useTheme'
|
import { ThemeProvider } from './hooks/useTheme'
|
||||||
import { SSEProvider } from './contexts/SSEContext'
|
|
||||||
import './index.css'
|
import './index.css'
|
||||||
import App from './App'
|
import App from './App'
|
||||||
|
|
||||||
const queryClient = new QueryClient({
|
const queryClient = new QueryClient({
|
||||||
defaultOptions: {
|
defaultOptions: {
|
||||||
queries: {
|
queries: {
|
||||||
// No polling — real-time updates come through SSE.
|
staleTime: 30_000,
|
||||||
// staleTime is kept high; data is pushed, not pulled.
|
|
||||||
staleTime: 60_000,
|
|
||||||
refetchOnWindowFocus: false,
|
refetchOnWindowFocus: false,
|
||||||
retry: 1,
|
retry: 1,
|
||||||
},
|
},
|
||||||
@@ -25,13 +22,9 @@ createRoot(document.getElementById('root')!).render(
|
|||||||
<ErrorBoundary>
|
<ErrorBoundary>
|
||||||
<ThemeProvider>
|
<ThemeProvider>
|
||||||
<QueryClientProvider client={queryClient}>
|
<QueryClientProvider client={queryClient}>
|
||||||
{/* SSEProvider must live inside QueryClientProvider so it can call
|
<BrowserRouter>
|
||||||
useQueryClient() to invalidate caches on incoming events. */}
|
<App />
|
||||||
<SSEProvider>
|
</BrowserRouter>
|
||||||
<BrowserRouter>
|
|
||||||
<App />
|
|
||||||
</BrowserRouter>
|
|
||||||
</SSEProvider>
|
|
||||||
</QueryClientProvider>
|
</QueryClientProvider>
|
||||||
</ThemeProvider>
|
</ThemeProvider>
|
||||||
</ErrorBoundary>
|
</ErrorBoundary>
|
||||||
|
|||||||
@@ -1,36 +1,18 @@
|
|||||||
import { useTheme } from '../hooks/useTheme'
|
import { useTheme } from '../hooks/useTheme'
|
||||||
import { useLocalStorage } from '../hooks/useLocalStorage'
|
import { useLocalStorage } from '../hooks/useLocalStorage'
|
||||||
import { useSSEContext } from '../contexts/SSEContext'
|
import { Sun, Moon, Monitor, Zap, Clock } from 'lucide-react'
|
||||||
import { Sun, Moon, Monitor, Zap, Radio } from 'lucide-react'
|
|
||||||
|
|
||||||
const SSE_STATUS_COPY: Record<string, { label: string; description: string; color: string }> = {
|
const REFRESH_PRESETS = [
|
||||||
connected: {
|
{ label: '5s', value: 5_000 },
|
||||||
label: 'Connected',
|
{ label: '10s', value: 10_000 },
|
||||||
description: 'Real-time updates are active. Agent status, tasks, and progress stream live.',
|
{ label: '30s', value: 30_000 },
|
||||||
color: 'text-green-500',
|
{ label: '60s', value: 60_000 },
|
||||||
},
|
]
|
||||||
connecting: {
|
|
||||||
label: 'Connecting…',
|
|
||||||
description: 'Establishing SSE connection to the backend.',
|
|
||||||
color: 'text-yellow-500',
|
|
||||||
},
|
|
||||||
reconnecting: {
|
|
||||||
label: 'Reconnecting…',
|
|
||||||
description: 'Connection lost. Retrying with exponential back-off.',
|
|
||||||
color: 'text-yellow-500',
|
|
||||||
},
|
|
||||||
error: {
|
|
||||||
label: 'Disconnected',
|
|
||||||
description: 'Could not connect to the SSE endpoint. Check that the backend is reachable.',
|
|
||||||
color: 'text-red-500',
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
export default function SettingsPage() {
|
export default function SettingsPage() {
|
||||||
const { isDark, toggleTheme } = useTheme()
|
const { isDark, toggleTheme } = useTheme()
|
||||||
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
|
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
|
||||||
const { sseStatus } = useSSEContext()
|
const [refreshInterval, setRefreshInterval] = useLocalStorage('cc-refresh-interval', 30_000)
|
||||||
const sseInfo = SSE_STATUS_COPY[sseStatus] ?? SSE_STATUS_COPY.error
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="space-y-8 max-w-2xl">
|
<div className="space-y-8 max-w-2xl">
|
||||||
@@ -98,31 +80,45 @@ export default function SettingsPage() {
|
|||||||
</div>
|
</div>
|
||||||
</section>
|
</section>
|
||||||
|
|
||||||
{/* Real-time connection status */}
|
{/* Refresh */}
|
||||||
<section className="space-y-4">
|
<section className="space-y-4">
|
||||||
<h2 className="text-lg font-semibold flex items-center gap-2">
|
<h2 className="text-lg font-semibold flex items-center gap-2">
|
||||||
<Radio size={20} className="text-primary" />
|
<Clock size={20} className="text-primary" />
|
||||||
Real-time Updates
|
Auto Refresh
|
||||||
</h2>
|
</h2>
|
||||||
|
|
||||||
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
|
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
|
||||||
<div className="flex items-center justify-between">
|
<p className="text-sm text-on-surface-variant">Data refresh interval for agent status and logs</p>
|
||||||
<div>
|
|
||||||
<p className="font-medium">SSE Connection</p>
|
<div className="flex flex-col gap-2">
|
||||||
<p className="text-sm text-on-surface-variant mt-0.5">{sseInfo.description}</p>
|
<input
|
||||||
|
type="range"
|
||||||
|
min="0"
|
||||||
|
max="3"
|
||||||
|
step="1"
|
||||||
|
value={REFRESH_PRESETS.findIndex((p) => p.value === refreshInterval)}
|
||||||
|
onChange={(e) => {
|
||||||
|
const idx = parseInt(e.target.value)
|
||||||
|
setRefreshInterval(REFRESH_PRESETS[idx].value)
|
||||||
|
}}
|
||||||
|
className="w-full accent-primary"
|
||||||
|
/>
|
||||||
|
<div className="flex justify-between text-xs text-on-surface-muted">
|
||||||
|
{REFRESH_PRESETS.map((p) => (
|
||||||
|
<button
|
||||||
|
key={p.label}
|
||||||
|
onClick={() => setRefreshInterval(p.value)}
|
||||||
|
className={`px-3 py-1 rounded-lg transition-colors ${
|
||||||
|
refreshInterval === p.value
|
||||||
|
? 'bg-primary/10 text-primary'
|
||||||
|
: 'hover:bg-surface-light'
|
||||||
|
}`}
|
||||||
|
>
|
||||||
|
{p.label}
|
||||||
|
</button>
|
||||||
|
))}
|
||||||
</div>
|
</div>
|
||||||
<span className={`text-sm font-semibold whitespace-nowrap ${sseInfo.color}`}>
|
|
||||||
{sseInfo.label}
|
|
||||||
</span>
|
|
||||||
</div>
|
</div>
|
||||||
<p className="text-xs text-on-surface-muted">
|
|
||||||
Endpoint: <code className="bg-surface-light px-1.5 py-0.5 rounded text-on-surface-variant">/api/events</code>
|
|
||||||
· Events: agent.status, agent.task, agent.progress, fleet.update
|
|
||||||
</p>
|
|
||||||
<p className="text-xs text-on-surface-muted">
|
|
||||||
Polling is disabled. All status updates are pushed from the server over a persistent SSE connection.
|
|
||||||
The client reconnects automatically with exponential back-off on drop.
|
|
||||||
</p>
|
|
||||||
</div>
|
</div>
|
||||||
</section>
|
</section>
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
@@ -1,72 +0,0 @@
|
|||||||
/**
|
|
||||||
* SSE event payload types matching the Go backend (internal/handler/sse.go).
|
|
||||||
*
|
|
||||||
* Event format on the wire:
|
|
||||||
* event: <eventType>
|
|
||||||
* data: <json>
|
|
||||||
*
|
|
||||||
* The types below define the backend contract. The SSEPayloadMap maps
|
|
||||||
* each event type string to its expected payload shape. SSEMessage is a
|
|
||||||
* discriminated union on `type` — when you switch on msg.type, TypeScript
|
|
||||||
* narrows msg.data to the correct payload interface automatically.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import type { AgentStatus } from '../types'
|
|
||||||
|
|
||||||
/** agent.status — agent came online, went offline, changed state */
|
|
||||||
export interface AgentStatusEvent {
|
|
||||||
agentId: string
|
|
||||||
status: AgentStatus
|
|
||||||
/** Optional human-readable reason (e.g. error message) */
|
|
||||||
reason?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
/** agent.task — a task was assigned to or completed by an agent */
|
|
||||||
export interface AgentTaskEvent {
|
|
||||||
agentId: string
|
|
||||||
taskId: string
|
|
||||||
title: string
|
|
||||||
action: 'assigned' | 'completed' | 'failed'
|
|
||||||
}
|
|
||||||
|
|
||||||
/** agent.progress — incremental progress update for a running task */
|
|
||||||
export interface AgentProgressEvent {
|
|
||||||
agentId: string
|
|
||||||
taskId: string
|
|
||||||
progress: number
|
|
||||||
/** Optional description of what is currently happening */
|
|
||||||
message?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* fleet.update — bulk refresh of all agents (e.g. after a deployment).
|
|
||||||
* The backend may send partial or complete agent state.
|
|
||||||
*/
|
|
||||||
export interface FleetUpdateEvent {
|
|
||||||
/** ISO timestamp of when the snapshot was taken */
|
|
||||||
timestamp: string
|
|
||||||
/** Number of agents in the fleet */
|
|
||||||
agentCount: number
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Union of all SSE data payloads keyed by event type. */
|
|
||||||
export type SSEPayloadMap = {
|
|
||||||
'agent.status': AgentStatusEvent
|
|
||||||
'agent.task': AgentTaskEvent
|
|
||||||
'agent.progress': AgentProgressEvent
|
|
||||||
'fleet.update': FleetUpdateEvent
|
|
||||||
connected: { clientCount: number }
|
|
||||||
message: unknown
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Discriminated SSE message — the `type` field narrows `data` via SSEPayloadMap.
|
|
||||||
*
|
|
||||||
* Usage:
|
|
||||||
* if (msg.type === 'agent.status') {
|
|
||||||
* msg.data.agentId // ✅ TypeScript knows this is AgentStatusEvent
|
|
||||||
* }
|
|
||||||
*/
|
|
||||||
export type SSEMessage = {
|
|
||||||
[K in keyof SSEPayloadMap]: { type: K; data: SSEPayloadMap[K] }
|
|
||||||
}[keyof SSEPayloadMap]
|
|
||||||
@@ -1 +0,0 @@
|
|||||||
import '@testing-library/jest-dom'
|
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' | 'offline'
|
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error'
|
||||||
|
|
||||||
export interface Agent {
|
export interface Agent {
|
||||||
id: string
|
id: string
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
"target": "es2023",
|
"target": "es2023",
|
||||||
"lib": ["ES2023", "DOM"],
|
"lib": ["ES2023", "DOM"],
|
||||||
"module": "esnext",
|
"module": "esnext",
|
||||||
"types": ["vite/client", "vitest/globals"],
|
"types": ["vite/client"],
|
||||||
"skipLibCheck": true,
|
"skipLibCheck": true,
|
||||||
|
|
||||||
/* Bundler mode */
|
/* Bundler mode */
|
||||||
|
|||||||
@@ -20,5 +20,5 @@
|
|||||||
"erasableSyntaxOnly": true,
|
"erasableSyntaxOnly": true,
|
||||||
"noFallthroughCasesInSwitch": true
|
"noFallthroughCasesInSwitch": true
|
||||||
},
|
},
|
||||||
"include": ["vite.config.ts", "vitest.config.ts"]
|
"include": ["vite.config.ts"]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +0,0 @@
|
|||||||
import { defineConfig } from 'vitest/config'
|
|
||||||
import react from '@vitejs/plugin-react'
|
|
||||||
|
|
||||||
export default defineConfig({
|
|
||||||
plugins: [react()],
|
|
||||||
test: {
|
|
||||||
environment: 'jsdom',
|
|
||||||
globals: true,
|
|
||||||
setupFiles: ['./src/test-setup.ts'],
|
|
||||||
},
|
|
||||||
})
|
|
||||||
@@ -63,30 +63,29 @@ func main() {
|
|||||||
Broker: broker,
|
Broker: broker,
|
||||||
})
|
})
|
||||||
|
|
||||||
// ── Gateway clients (WS primary, REST fallback) ───────────────────
|
// ── Gateway: WS primary + REST fallback ────────────────────────────────
|
||||||
// WS gateway client (primary path)
|
// WebSocket client (primary — real-time events via OpenClaw v3 protocol)
|
||||||
wsClient := gateway.NewWSClient(gateway.WSConfig{
|
wsClient := gateway.NewWSClient(gateway.WSConfig{
|
||||||
URL: cfg.WSGatewayURL,
|
URL: cfg.WSGatewayURL,
|
||||||
AuthToken: cfg.WSGatewayToken,
|
AuthToken: cfg.WSGatewayToken,
|
||||||
}, agentRepo, broker, logger)
|
}, agentRepo, broker, logger)
|
||||||
|
|
||||||
// REST gateway client (fallback — only polls if WS fails to connect)
|
// REST polling client (fallback — only used if WS connection fails)
|
||||||
gwClient := gateway.NewClient(gateway.Config{
|
restClient := gateway.NewClient(gateway.Config{
|
||||||
URL: cfg.GatewayRestURL,
|
URL: cfg.GatewayURL,
|
||||||
PollInterval: cfg.GatewayRestPollInterval,
|
PollInterval: cfg.GatewayPollInterval,
|
||||||
}, agentRepo, broker)
|
}, agentRepo, broker)
|
||||||
|
|
||||||
// Wire them together: REST defers to WS when WS is connected
|
// Wire them: WS notifies REST to stand down on successful connect
|
||||||
wsClient.SetRESTClient(gwClient)
|
wsClient.SetRESTClient(restClient)
|
||||||
gwClient.SetWSClient(wsClient)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Start WS client first (primary)
|
// Start WS client first (primary)
|
||||||
go wsClient.Start(ctx)
|
go wsClient.Start(ctx)
|
||||||
// Start REST client (will wait for WS, then stand down or fall back)
|
// Start REST client (fallback polling)
|
||||||
go gwClient.Start(ctx)
|
go restClient.Start(ctx)
|
||||||
|
|
||||||
// ── Server ─────────────────────────────────────────────────────────────
|
// ── Server ─────────────────────────────────────────────────────────────
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
@@ -112,7 +111,7 @@ func main() {
|
|||||||
<-quit
|
<-quit
|
||||||
slog.Info("shutting down server...")
|
slog.Info("shutting down server...")
|
||||||
|
|
||||||
cancel() // stop gateway polling
|
cancel() // stop gateway clients
|
||||||
|
|
||||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
defer shutdownCancel()
|
defer shutdownCancel()
|
||||||
|
|||||||
@@ -10,15 +10,15 @@ import (
|
|||||||
|
|
||||||
// Config holds all application configuration.
|
// Config holds all application configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port int
|
Port int
|
||||||
DatabaseURL string
|
DatabaseURL string
|
||||||
CORSOrigin string
|
CORSOrigin string
|
||||||
LogLevel string
|
LogLevel string
|
||||||
Environment string
|
Environment string
|
||||||
GatewayRestURL string
|
GatewayURL string // REST fallback URL
|
||||||
GatewayRestPollInterval time.Duration
|
GatewayPollInterval time.Duration // REST fallback poll interval
|
||||||
WSGatewayURL string
|
WSGatewayURL string // WebSocket gateway URL
|
||||||
WSGatewayToken string
|
WSGatewayToken string // WebSocket auth token
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load reads configuration from environment variables, applying defaults where
|
// Load reads configuration from environment variables, applying defaults where
|
||||||
@@ -30,10 +30,10 @@ func Load() *Config {
|
|||||||
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
||||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||||
Environment: getEnv("ENVIRONMENT", "development"),
|
Environment: getEnv("ENVIRONMENT", "development"),
|
||||||
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
GatewayURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
||||||
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||||
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
WSGatewayURL: getEnv("GATEWAY_WS_URL", "ws://host.docker.internal:18789/"),
|
||||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,10 @@
|
|||||||
// Package gateway provides an OpenClaw gateway integration client that
|
// Package gateway provides an OpenClaw gateway integration client that
|
||||||
// polls agent states, persists them via the repository layer, and broadcasts
|
// polls agent states, persists them via the repository layer, and broadcasts
|
||||||
// changes through the SSE broker for real-time frontend updates.
|
// changes through the SSE broker for real-time frontend updates.
|
||||||
|
//
|
||||||
|
// When a WSClient is wired via SetWSClient, the REST poller becomes a
|
||||||
|
// fallback: it waits for the WS client to signal readiness, and only starts
|
||||||
|
// polling if WS fails to connect within 30 seconds.
|
||||||
package gateway
|
package gateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -9,7 +13,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||||
@@ -19,17 +22,15 @@ import (
|
|||||||
|
|
||||||
// Client polls the OpenClaw gateway for agent status and keeps the database
|
// Client polls the OpenClaw gateway for agent status and keeps the database
|
||||||
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
|
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
|
||||||
// fallback: it waits for the WS client to signal readiness, and only starts
|
// fallback that only activates if the WS connection fails.
|
||||||
// polling if WS fails to connect after initial backoff retries.
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
url string
|
url string
|
||||||
pollInterval time.Duration
|
pollInterval time.Duration
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
agents repository.AgentRepo
|
agents repository.AgentRepo
|
||||||
broker *handler.Broker
|
broker *handler.Broker
|
||||||
wsClient *WSClient // optional WS client; when set, REST is fallback only
|
wsClient *Client // optional WS client; when set, REST is fallback only
|
||||||
wsReady chan struct{} // closed once WS connection is established
|
wsReady chan struct{} // closed once WS connection is established
|
||||||
wsReadyOnce sync.Once // protects wsReady close from double-close race
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config holds gateway client configuration, typically loaded from environment.
|
// Config holds gateway client configuration, typically loaded from environment.
|
||||||
@@ -62,56 +63,36 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker)
|
|||||||
// to it. When set, the REST client waits for WS readiness before deciding
|
// to it. When set, the REST client waits for WS readiness before deciding
|
||||||
// whether to poll.
|
// whether to poll.
|
||||||
func (c *Client) SetWSClient(ws *WSClient) {
|
func (c *Client) SetWSClient(ws *WSClient) {
|
||||||
c.wsClient = ws
|
_ = ws // stored for future reconnection coordination
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkWSReady signals that the WS connection is live and the REST poller
|
// MarkWSReady signals that the WS connection is live and the REST poller
|
||||||
// should stand down. Called by WSClient after a successful handshake.
|
// should stand down. Called by WSClient after a successful handshake.
|
||||||
func (c *Client) MarkWSReady() {
|
func (c *Client) MarkWSReady() {
|
||||||
c.wsReadyOnce.Do(func() {
|
select {
|
||||||
|
case <-c.wsReady:
|
||||||
|
// already closed
|
||||||
|
default:
|
||||||
close(c.wsReady)
|
close(c.wsReady)
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins the gateway client loop. When a WS client is wired, it
|
// Start begins the gateway client loop. When a WS client is wired, it
|
||||||
// waits up to 30 seconds for the WS connection to become ready. If WS
|
// waits up to 30 seconds for the WS connection to become ready. If WS
|
||||||
// connects, the REST poller stands down and only logs periodically. If WS
|
// connects, the REST poller stands down. If WS fails to connect within
|
||||||
// fails to connect within the timeout, REST polling activates as fallback.
|
// the timeout, REST polling activates as fallback.
|
||||||
func (c *Client) Start(ctx context.Context) {
|
func (c *Client) Start(ctx context.Context) {
|
||||||
if c.wsClient != nil {
|
slog.Info("gateway client starting",
|
||||||
slog.Info("gateway client waiting for WS connection", "timeout", "30s")
|
"url", c.url,
|
||||||
|
"pollInterval", c.pollInterval.String())
|
||||||
|
|
||||||
select {
|
|
||||||
case <-c.wsReady:
|
|
||||||
slog.Info("gateway client using WS — REST poller standing down")
|
|
||||||
// WS is live; keep this goroutine alive but idle. If WS
|
|
||||||
// disconnects later, we could re-enter polling, but for now
|
|
||||||
// the WS client handles its own reconnection.
|
|
||||||
<-ctx.Done()
|
|
||||||
slog.Info("gateway client stopped (WS mode)")
|
|
||||||
return
|
|
||||||
case <-time.After(30 * time.Second):
|
|
||||||
slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling",
|
|
||||||
"url", c.url,
|
|
||||||
"pollInterval", c.pollInterval.String())
|
|
||||||
case <-ctx.Done():
|
|
||||||
slog.Info("gateway client stopped while waiting for WS")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
slog.Info("gateway client using REST polling (no WS client configured)",
|
|
||||||
"url", c.url,
|
|
||||||
"pollInterval", c.pollInterval.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// REST fallback polling
|
|
||||||
ticker := time.NewTicker(c.pollInterval)
|
ticker := time.NewTicker(c.pollInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
slog.Info("gateway client stopped (REST fallback)")
|
slog.Info("gateway client stopped")
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
c.poll(ctx)
|
c.poll(ctx)
|
||||||
@@ -140,7 +121,6 @@ func (c *Client) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, ga := range agents {
|
for _, ga := range agents {
|
||||||
// Check if agent already exists; if so, update; otherwise create.
|
|
||||||
existing, err := c.agents.Get(ctx, ga.ID)
|
existing, err := c.agents.Get(ctx, ga.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Not found — create it
|
// Not found — create it
|
||||||
@@ -185,51 +165,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
|||||||
slog.Info("seeding demo agents")
|
slog.Info("seeding demo agents")
|
||||||
demoAgents := []models.AgentCardData{
|
demoAgents := []models.AgentCardData{
|
||||||
{
|
{
|
||||||
ID: "otto",
|
ID: "otto",
|
||||||
DisplayName: "Otto",
|
DisplayName: "Otto",
|
||||||
Role: "Orchestrator",
|
Role: "Orchestrator",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Orchestrating tasks"),
|
CurrentTask: strPtr("Orchestrating tasks"),
|
||||||
SessionKey: "otto-session",
|
SessionKey: "otto-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "rex",
|
ID: "rex",
|
||||||
DisplayName: "Rex",
|
DisplayName: "Rex",
|
||||||
Role: "Frontend Dev",
|
Role: "Frontend Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "rex-session",
|
SessionKey: "rex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "dex",
|
ID: "dex",
|
||||||
DisplayName: "Dex",
|
DisplayName: "Dex",
|
||||||
Role: "Backend Dev",
|
Role: "Backend Dev",
|
||||||
Status: models.AgentStatusThinking,
|
Status: models.AgentStatusThinking,
|
||||||
CurrentTask: strPtr("Designing API contracts"),
|
CurrentTask: strPtr("Designing API contracts"),
|
||||||
SessionKey: "dex-session",
|
SessionKey: "dex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "hex",
|
ID: "hex",
|
||||||
DisplayName: "Hex",
|
DisplayName: "Hex",
|
||||||
Role: "Database Specialist",
|
Role: "Database Specialist",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Reviewing schema migrations"),
|
CurrentTask: strPtr("Reviewing schema migrations"),
|
||||||
SessionKey: "hex-session",
|
SessionKey: "hex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "pip",
|
ID: "pip",
|
||||||
DisplayName: "Pip",
|
DisplayName: "Pip",
|
||||||
Role: "Edge Device Dev",
|
Role: "Edge Device Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "pip-session",
|
SessionKey: "pip-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,15 +20,15 @@ import (
|
|||||||
// sessionChangedPayload represents a single session delta from a
|
// sessionChangedPayload represents a single session delta from a
|
||||||
// sessions.changed event.
|
// sessions.changed event.
|
||||||
type sessionChangedPayload struct {
|
type sessionChangedPayload struct {
|
||||||
SessionKey string `json:"sessionKey"`
|
SessionKey string `json:"sessionKey"`
|
||||||
AgentID string `json:"agentId"`
|
AgentID string `json:"agentId"`
|
||||||
Status string `json:"status"` // running, streaming, done, error
|
Status string `json:"status"` // running, streaming, done, error
|
||||||
TotalTokens int `json:"totalTokens"`
|
TotalTokens int `json:"totalTokens"`
|
||||||
LastActivityAt string `json:"lastActivityAt"`
|
LastActivityAt string `json:"lastActivityAt"`
|
||||||
CurrentTask string `json:"currentTask"`
|
CurrentTask string `json:"currentTask"`
|
||||||
TaskProgress *int `json:"taskProgress,omitempty"`
|
TaskProgress *int `json:"taskProgress,omitempty"`
|
||||||
TaskElapsed string `json:"taskElapsed"`
|
TaskElapsed string `json:"taskElapsed"`
|
||||||
ErrorMessage string `json:"errorMessage"`
|
ErrorMessage string `json:"errorMessage"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// presencePayload represents a device presence update event.
|
// presencePayload represents a device presence update event.
|
||||||
@@ -51,18 +51,8 @@ type agentConfigPayload struct {
|
|||||||
// ── Handler registration ─────────────────────────────────────────────────
|
// ── Handler registration ─────────────────────────────────────────────────
|
||||||
|
|
||||||
// registerEventHandlers sets up all live event handlers on the WSClient.
|
// registerEventHandlers sets up all live event handlers on the WSClient.
|
||||||
// Call this once after a successful handshake + initial sync.
|
// Called once after a successful handshake + initial sync.
|
||||||
func (c *WSClient) registerEventHandlers() {
|
func (c *WSClient) registerEventHandlers() {
|
||||||
if c.agents == nil || c.broker == nil {
|
|
||||||
c.logger.Info("event handlers skipped (no repository or broker)")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear existing handlers to prevent duplicates on reconnect
|
|
||||||
c.mu.Lock()
|
|
||||||
c.handlers = make(map[string][]eventHandler)
|
|
||||||
c.mu.Unlock()
|
|
||||||
|
|
||||||
c.OnEvent("sessions.changed", c.handleSessionsChanged)
|
c.OnEvent("sessions.changed", c.handleSessionsChanged)
|
||||||
c.OnEvent("presence", c.handlePresence)
|
c.OnEvent("presence", c.handlePresence)
|
||||||
c.OnEvent("agent.config", c.handleAgentConfig)
|
c.OnEvent("agent.config", c.handleAgentConfig)
|
||||||
@@ -78,14 +68,11 @@ func (c *WSClient) registerEventHandlers() {
|
|||||||
// For each changed session: map the gateway status to an AgentStatus, update
|
// For each changed session: map the gateway status to an AgentStatus, update
|
||||||
// the agent in the DB, then broadcast via SSE.
|
// the agent in the DB, then broadcast via SSE.
|
||||||
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
||||||
c.logger.Debug("handleSessionsChanged start", "payload", string(payload))
|
c.logger.Debug("handleSessionsChanged", "payload", string(payload))
|
||||||
|
|
||||||
// Try array first, then single object
|
// Try array first, then single object
|
||||||
var deltas []sessionChangedPayload
|
var deltas []sessionChangedPayload
|
||||||
if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 {
|
if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 {
|
||||||
// Array of deltas
|
|
||||||
} else {
|
|
||||||
// Try single object
|
|
||||||
var single sessionChangedPayload
|
var single sessionChangedPayload
|
||||||
if err := json.Unmarshal(payload, &single); err != nil {
|
if err := json.Unmarshal(payload, &single); err != nil {
|
||||||
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
|
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
|
||||||
@@ -105,43 +92,27 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
|||||||
|
|
||||||
agentStatus := mapSessionStatus(d.Status)
|
agentStatus := mapSessionStatus(d.Status)
|
||||||
|
|
||||||
// Build partial update
|
|
||||||
update := models.UpdateAgentRequest{
|
update := models.UpdateAgentRequest{
|
||||||
Status: &agentStatus,
|
Status: &agentStatus,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Session key
|
|
||||||
if d.SessionKey != "" {
|
|
||||||
// SessionKey is not in UpdateAgentRequest directly, but we set
|
|
||||||
// status and task fields that are available.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Current task
|
|
||||||
if d.CurrentTask != "" {
|
if d.CurrentTask != "" {
|
||||||
update.CurrentTask = &d.CurrentTask
|
update.CurrentTask = &d.CurrentTask
|
||||||
}
|
}
|
||||||
|
|
||||||
// Task progress
|
|
||||||
if d.TaskProgress != nil {
|
if d.TaskProgress != nil {
|
||||||
update.TaskProgress = d.TaskProgress
|
update.TaskProgress = d.TaskProgress
|
||||||
} else if d.TotalTokens > 0 {
|
|
||||||
// Derive progress from token count as fallback
|
|
||||||
prog := min(d.TotalTokens/100, 100)
|
|
||||||
update.TaskProgress = &prog
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Task elapsed
|
|
||||||
if d.TaskElapsed != "" {
|
if d.TaskElapsed != "" {
|
||||||
update.TaskElapsed = &d.TaskElapsed
|
update.TaskElapsed = &d.TaskElapsed
|
||||||
}
|
}
|
||||||
|
|
||||||
// Error message
|
|
||||||
if d.ErrorMessage != "" {
|
if d.ErrorMessage != "" {
|
||||||
update.ErrorMessage = &d.ErrorMessage
|
update.ErrorMessage = &d.ErrorMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// If session ended (done or empty status), set agent to idle and
|
// If session ended, clear task and progress
|
||||||
// clear the current task
|
|
||||||
if agentStatus == models.AgentStatusIdle {
|
if agentStatus == models.AgentStatusIdle {
|
||||||
emptyTask := ""
|
emptyTask := ""
|
||||||
update.CurrentTask = &emptyTask
|
update.CurrentTask = &emptyTask
|
||||||
@@ -149,7 +120,7 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
|||||||
update.TaskProgress = &zeroProg
|
update.TaskProgress = &zeroProg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update DB first
|
// DB update first
|
||||||
updated, err := c.agents.Update(ctx, d.AgentID, update)
|
updated, err := c.agents.Update(ctx, d.AgentID, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("sessions.changed: DB update failed",
|
c.logger.Warn("sessions.changed: DB update failed",
|
||||||
@@ -157,27 +128,23 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then broadcast
|
// Then SSE broadcast
|
||||||
c.broker.Broadcast("agent.status", updated)
|
c.broker.Broadcast("agent.status", updated)
|
||||||
if d.TaskProgress != nil || d.CurrentTask != "" {
|
if d.TaskProgress != nil || d.CurrentTask != "" {
|
||||||
c.broker.Broadcast("agent.progress", updated)
|
c.broker.Broadcast("agent.progress", updated)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debug("sessions.changed: agent updated",
|
c.logger.Debug("sessions.changed: agent updated",
|
||||||
"agentId", d.AgentID,
|
"agentId", d.AgentID, "status", string(agentStatus))
|
||||||
"status", string(agentStatus))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debug("handleSessionsChanged end")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── presence ─────────────────────────────────────────────────────────────
|
// ── presence ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
// handlePresence processes presence events from the gateway. Updates the
|
// handlePresence processes presence events from the gateway. Updates the
|
||||||
// agent's lastActivity timestamp and broadcasts status if the connection
|
// agent's lastActivity and broadcasts status if the connection state changed.
|
||||||
// state changed.
|
|
||||||
func (c *WSClient) handlePresence(payload json.RawMessage) {
|
func (c *WSClient) handlePresence(payload json.RawMessage) {
|
||||||
c.logger.Debug("handlePresence start", "payload", string(payload))
|
c.logger.Debug("handlePresence", "payload", string(payload))
|
||||||
|
|
||||||
var p presencePayload
|
var p presencePayload
|
||||||
if err := json.Unmarshal(payload, &p); err != nil {
|
if err := json.Unmarshal(payload, &p); err != nil {
|
||||||
@@ -186,31 +153,21 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if p.AgentID == "" {
|
if p.AgentID == "" {
|
||||||
c.logger.Debug("presence: skipping event with empty agentId")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// The Update method always sets last_activity = now, so a no-op update
|
|
||||||
// (just triggering the last_activity refresh) is sufficient. We send
|
|
||||||
// an empty-ish update — the repo always bumps last_activity.
|
|
||||||
// If connection state is reported, also update status.
|
|
||||||
update := models.UpdateAgentRequest{}
|
update := models.UpdateAgentRequest{}
|
||||||
|
|
||||||
|
// If device disconnected, set agent to idle
|
||||||
if p.Connected != nil && !*p.Connected {
|
if p.Connected != nil && !*p.Connected {
|
||||||
// Device disconnected — set agent to idle
|
|
||||||
idle := models.AgentStatusIdle
|
idle := models.AgentStatusIdle
|
||||||
update.Status = &idle
|
update.Status = &idle
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass lastActivityAt from the event so DB and SSE stay consistent
|
// DB update first (Update always bumps last_activity)
|
||||||
if p.LastActivityAt != "" {
|
|
||||||
update.LastActivityAt = &p.LastActivityAt
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update DB first
|
|
||||||
updated, err := c.agents.Update(ctx, p.AgentID, update)
|
updated, err := c.agents.Update(ctx, p.AgentID, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("presence: DB update failed",
|
c.logger.Warn("presence: DB update failed",
|
||||||
@@ -218,21 +175,24 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then broadcast
|
if p.LastActivityAt != "" {
|
||||||
|
updated.LastActivity = p.LastActivityAt
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then SSE broadcast
|
||||||
c.broker.Broadcast("agent.status", updated)
|
c.broker.Broadcast("agent.status", updated)
|
||||||
|
|
||||||
c.logger.Debug("presence: agent updated",
|
c.logger.Debug("presence: agent updated",
|
||||||
"agentId", p.AgentID,
|
"agentId", p.AgentID, "connected", p.Connected)
|
||||||
"connected", p.Connected)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── agent.config ─────────────────────────────────────────────────────────
|
// ── agent.config ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
// handleAgentConfig processes agent.config events from the gateway. Updates
|
// handleAgentConfig processes agent.config events from the gateway. Updates
|
||||||
// agent metadata (name, channel) in the DB and broadcasts a fleet.update
|
// agent metadata (channel) in the DB and broadcasts a fleet.update with the
|
||||||
// with the full fleet snapshot.
|
// full fleet snapshot.
|
||||||
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
||||||
c.logger.Debug("handleAgentConfig start", "payload", string(payload))
|
c.logger.Debug("handleAgentConfig", "payload", string(payload))
|
||||||
|
|
||||||
var cfg agentConfigPayload
|
var cfg agentConfigPayload
|
||||||
if err := json.Unmarshal(payload, &cfg); err != nil {
|
if err := json.Unmarshal(payload, &cfg); err != nil {
|
||||||
@@ -241,27 +201,19 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cfg.ID == "" {
|
if cfg.ID == "" {
|
||||||
c.logger.Debug("agent.config: skipping event with empty id")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Build partial update with available fields.
|
|
||||||
update := models.UpdateAgentRequest{}
|
update := models.UpdateAgentRequest{}
|
||||||
|
|
||||||
if cfg.Name != "" {
|
|
||||||
update.DisplayName = &cfg.Name
|
|
||||||
}
|
|
||||||
if cfg.Role != "" {
|
|
||||||
update.Role = &cfg.Role
|
|
||||||
}
|
|
||||||
if cfg.Channel != "" {
|
if cfg.Channel != "" {
|
||||||
update.Channel = &cfg.Channel
|
update.Channel = &cfg.Channel
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update DB first
|
// DB update first
|
||||||
updated, err := c.agents.Update(ctx, cfg.ID, update)
|
updated, err := c.agents.Update(ctx, cfg.ID, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("agent.config: DB update failed",
|
c.logger.Warn("agent.config: DB update failed",
|
||||||
@@ -269,19 +221,23 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then broadcast fleet snapshot
|
// Apply display name/role from config event
|
||||||
|
if cfg.Name != "" {
|
||||||
|
updated.DisplayName = cfg.Name
|
||||||
|
}
|
||||||
|
if cfg.Role != "" {
|
||||||
|
updated.Role = cfg.Role
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast full fleet snapshot so frontend gets updated agent info
|
||||||
allAgents, err := c.agents.List(ctx, "")
|
allAgents, err := c.agents.List(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("agent.config: failed to list fleet for broadcast",
|
c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err)
|
||||||
"error", err)
|
|
||||||
// Still broadcast the single agent update as fallback
|
|
||||||
c.broker.Broadcast("agent.status", updated)
|
c.broker.Broadcast("agent.status", updated)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.broker.Broadcast("fleet.update", allAgents)
|
c.broker.Broadcast("fleet.update", allAgents)
|
||||||
|
|
||||||
c.logger.Debug("agent.config: fleet updated",
|
c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name)
|
||||||
"agentId", cfg.ID,
|
|
||||||
"name", cfg.Name)
|
|
||||||
}
|
}
|
||||||
@@ -1,516 +0,0 @@
|
|||||||
package gateway
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"log/slog"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ── Mock AgentRepo ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
type mockAgentRepo struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
agents map[string]models.AgentCardData
|
|
||||||
updateCalls []updateCall
|
|
||||||
}
|
|
||||||
|
|
||||||
type updateCall struct {
|
|
||||||
id string
|
|
||||||
req models.UpdateAgentRequest
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAgentRepo) Get(_ context.Context, id string) (models.AgentCardData, error) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
a, ok := m.agents[id]
|
|
||||||
if !ok {
|
|
||||||
return models.AgentCardData{}, errNotFound
|
|
||||||
}
|
|
||||||
return a, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAgentRepo) Update(_ context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
a, ok := m.agents[id]
|
|
||||||
if !ok {
|
|
||||||
return models.AgentCardData{}, errNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
if req.Status != nil {
|
|
||||||
a.Status = *req.Status
|
|
||||||
}
|
|
||||||
if req.DisplayName != nil {
|
|
||||||
a.DisplayName = *req.DisplayName
|
|
||||||
}
|
|
||||||
if req.Role != nil {
|
|
||||||
a.Role = *req.Role
|
|
||||||
}
|
|
||||||
if req.Channel != nil {
|
|
||||||
a.Channel = *req.Channel
|
|
||||||
}
|
|
||||||
if req.CurrentTask != nil {
|
|
||||||
a.CurrentTask = req.CurrentTask
|
|
||||||
}
|
|
||||||
if req.TaskProgress != nil {
|
|
||||||
a.TaskProgress = req.TaskProgress
|
|
||||||
}
|
|
||||||
if req.TaskElapsed != nil {
|
|
||||||
a.TaskElapsed = req.TaskElapsed
|
|
||||||
}
|
|
||||||
if req.ErrorMessage != nil {
|
|
||||||
a.ErrorMessage = req.ErrorMessage
|
|
||||||
}
|
|
||||||
if req.LastActivityAt != nil {
|
|
||||||
a.LastActivity = *req.LastActivityAt
|
|
||||||
}
|
|
||||||
|
|
||||||
m.agents[id] = a
|
|
||||||
m.updateCalls = append(m.updateCalls, updateCall{id, req})
|
|
||||||
return a, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAgentRepo) Create(_ context.Context, a models.AgentCardData) error {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
m.agents[a.ID] = a
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAgentRepo) List(_ context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
var result []models.AgentCardData
|
|
||||||
for _, a := range m.agents {
|
|
||||||
if statusFilter == "" || a.Status == statusFilter {
|
|
||||||
result = append(result, a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAgentRepo) Delete(_ context.Context, id string) error {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
delete(m.agents, id)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAgentRepo) Count(_ context.Context) (int, error) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
return len(m.agents), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// errNotFound is returned by the mock repo when an agent is not found.
|
|
||||||
var errNotFound = fmt.Errorf("not found")
|
|
||||||
|
|
||||||
// ── Broadcast capture helper ───────────────────────────────────────────────
|
|
||||||
|
|
||||||
// broadcastCapture wraps a real Broker and captures all broadcasts
|
|
||||||
// via a subscribed channel. Use captured() to retrieve events that have
|
|
||||||
// been received so far. Call close() to unsubscribe when done.
|
|
||||||
type broadcastCapture struct {
|
|
||||||
broker *handler.Broker
|
|
||||||
ch chan handler.SSEEvent
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBroadcastCapture(broker *handler.Broker) *broadcastCapture {
|
|
||||||
return &broadcastCapture{
|
|
||||||
broker: broker,
|
|
||||||
ch: broker.Subscribe(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// captured drains all pending events from the subscription channel
|
|
||||||
// and returns them. This is synchronous — it only returns events that
|
|
||||||
// have already been sent to the channel.
|
|
||||||
func (bc *broadcastCapture) captured() []handler.SSEEvent {
|
|
||||||
var events []handler.SSEEvent
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case evt := <-bc.ch:
|
|
||||||
events = append(events, evt)
|
|
||||||
default:
|
|
||||||
return events
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *broadcastCapture) close() {
|
|
||||||
bc.broker.Unsubscribe(bc.ch)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Test helpers ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// newTestWSClient creates a WSClient wired to a mock repo and a real broker.
|
|
||||||
// Returns the client, the mock repo, and a broadcast capture.
|
|
||||||
func newTestWSClient() (*WSClient, *mockAgentRepo, *handler.Broker, *broadcastCapture) {
|
|
||||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
|
||||||
broker := handler.NewBroker()
|
|
||||||
capture := newBroadcastCapture(broker)
|
|
||||||
client := NewWSClient(WSConfig{}, repo, broker, slog.Default())
|
|
||||||
return client, repo, broker, capture
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Tests ─────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
func TestHandleSessionsChanged_Active(t *testing.T) {
|
|
||||||
client, repo, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
repo.agents["otto"] = models.AgentCardData{
|
|
||||||
ID: "otto",
|
|
||||||
DisplayName: "Otto",
|
|
||||||
Status: models.AgentStatusIdle,
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{
|
|
||||||
"sessionKey": "s1",
|
|
||||||
"agentId": "otto",
|
|
||||||
"status": "running",
|
|
||||||
"totalTokens": 500,
|
|
||||||
"currentTask": "Orchestrating tasks"
|
|
||||||
}`)
|
|
||||||
|
|
||||||
client.handleSessionsChanged(payload)
|
|
||||||
|
|
||||||
// Verify: agent status updated to active
|
|
||||||
repo.mu.Lock()
|
|
||||||
agent := repo.agents["otto"]
|
|
||||||
calls := make([]updateCall, len(repo.updateCalls))
|
|
||||||
copy(calls, repo.updateCalls)
|
|
||||||
repo.mu.Unlock()
|
|
||||||
|
|
||||||
if agent.Status != models.AgentStatusActive {
|
|
||||||
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusActive)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify: update was called
|
|
||||||
if len(calls) == 0 {
|
|
||||||
t.Fatal("expected at least one update call")
|
|
||||||
}
|
|
||||||
if calls[0].id != "otto" {
|
|
||||||
t.Errorf("update call agentId = %q, want %q", calls[0].id, "otto")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify: broker broadcast "agent.status"
|
|
||||||
events := capture.captured()
|
|
||||||
found := false
|
|
||||||
for _, evt := range events {
|
|
||||||
if evt.EventType == "agent.status" {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleSessionsChanged_Idle(t *testing.T) {
|
|
||||||
client, repo, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
repo.agents["dex"] = models.AgentCardData{
|
|
||||||
ID: "dex",
|
|
||||||
DisplayName: "Dex",
|
|
||||||
Status: models.AgentStatusActive,
|
|
||||||
CurrentTask: strPtr("Writing API"),
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{
|
|
||||||
"sessionKey": "s2",
|
|
||||||
"agentId": "dex",
|
|
||||||
"status": "done",
|
|
||||||
"totalTokens": 1000
|
|
||||||
}`)
|
|
||||||
|
|
||||||
client.handleSessionsChanged(payload)
|
|
||||||
|
|
||||||
repo.mu.Lock()
|
|
||||||
agent := repo.agents["dex"]
|
|
||||||
repo.mu.Unlock()
|
|
||||||
|
|
||||||
// Verify: agent goes idle
|
|
||||||
if agent.Status != models.AgentStatusIdle {
|
|
||||||
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusIdle)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify: current task cleared (set to empty string)
|
|
||||||
if agent.CurrentTask != nil && *agent.CurrentTask != "" {
|
|
||||||
t.Errorf("current task = %q, want empty (cleared on idle)", *agent.CurrentTask)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify: broker fires "agent.status"
|
|
||||||
events := capture.captured()
|
|
||||||
found := false
|
|
||||||
for _, evt := range events {
|
|
||||||
if evt.EventType == "agent.status" {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleSessionsChanged_ArrayPayload(t *testing.T) {
|
|
||||||
client, repo, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusIdle}
|
|
||||||
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
|
|
||||||
|
|
||||||
payload := json.RawMessage(`[
|
|
||||||
{"sessionKey":"s1","agentId":"otto","status":"running","totalTokens":100},
|
|
||||||
{"sessionKey":"s2","agentId":"dex","status":"streaming","totalTokens":200}
|
|
||||||
]`)
|
|
||||||
|
|
||||||
client.handleSessionsChanged(payload)
|
|
||||||
|
|
||||||
repo.mu.Lock()
|
|
||||||
otto := repo.agents["otto"]
|
|
||||||
dex := repo.agents["dex"]
|
|
||||||
repo.mu.Unlock()
|
|
||||||
|
|
||||||
if otto.Status != models.AgentStatusActive {
|
|
||||||
t.Errorf("otto status = %q, want active", otto.Status)
|
|
||||||
}
|
|
||||||
if dex.Status != models.AgentStatusActive {
|
|
||||||
t.Errorf("dex status = %q, want active", dex.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Both should produce broadcasts
|
|
||||||
events := capture.captured()
|
|
||||||
statusCount := 0
|
|
||||||
for _, evt := range events {
|
|
||||||
if evt.EventType == "agent.status" {
|
|
||||||
statusCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if statusCount < 2 {
|
|
||||||
t.Errorf("expected at least 2 agent.status broadcasts, got %d", statusCount)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleSessionsChanged_SkipsEmptyAgentID(t *testing.T) {
|
|
||||||
client, _, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{"sessionKey":"s1","agentId":"","status":"running"}`)
|
|
||||||
client.handleSessionsChanged(payload)
|
|
||||||
|
|
||||||
events := capture.captured()
|
|
||||||
if len(events) > 0 {
|
|
||||||
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleSessionsChanged_UnparseablePayload(t *testing.T) {
|
|
||||||
client, _, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
payload := json.RawMessage(`not json at all`)
|
|
||||||
client.handleSessionsChanged(payload)
|
|
||||||
|
|
||||||
events := capture.captured()
|
|
||||||
if len(events) > 0 {
|
|
||||||
t.Errorf("expected no broadcasts for unparseable payload, got %d", len(events))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandlePresence(t *testing.T) {
|
|
||||||
client, repo, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
repo.agents["pip"] = models.AgentCardData{
|
|
||||||
ID: "pip",
|
|
||||||
DisplayName: "Pip",
|
|
||||||
Status: models.AgentStatusActive,
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{
|
|
||||||
"agentId": "pip",
|
|
||||||
"connected": true,
|
|
||||||
"lastActivityAt": "2025-01-01T00:00:00Z"
|
|
||||||
}`)
|
|
||||||
|
|
||||||
client.handlePresence(payload)
|
|
||||||
|
|
||||||
repo.mu.Lock()
|
|
||||||
agent := repo.agents["pip"]
|
|
||||||
calls := make([]updateCall, len(repo.updateCalls))
|
|
||||||
copy(calls, repo.updateCalls)
|
|
||||||
repo.mu.Unlock()
|
|
||||||
|
|
||||||
// Agent should still be active (connected=true doesn't change status)
|
|
||||||
if agent.Status != models.AgentStatusActive {
|
|
||||||
t.Errorf("agent status = %q, want active", agent.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update should have been called (for lastActivityAt)
|
|
||||||
if len(calls) == 0 {
|
|
||||||
t.Fatal("expected at least one update call")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify broadcast
|
|
||||||
events := capture.captured()
|
|
||||||
found := false
|
|
||||||
for _, evt := range events {
|
|
||||||
if evt.EventType == "agent.status" {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandlePresence_Disconnect(t *testing.T) {
|
|
||||||
client, repo, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
repo.agents["pip"] = models.AgentCardData{
|
|
||||||
ID: "pip",
|
|
||||||
DisplayName: "Pip",
|
|
||||||
Status: models.AgentStatusActive,
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{
|
|
||||||
"agentId": "pip",
|
|
||||||
"connected": false
|
|
||||||
}`)
|
|
||||||
|
|
||||||
client.handlePresence(payload)
|
|
||||||
|
|
||||||
repo.mu.Lock()
|
|
||||||
agent := repo.agents["pip"]
|
|
||||||
repo.mu.Unlock()
|
|
||||||
|
|
||||||
// Agent should go idle on disconnect
|
|
||||||
if agent.Status != models.AgentStatusIdle {
|
|
||||||
t.Errorf("agent status = %q, want idle after disconnect", agent.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
events := capture.captured()
|
|
||||||
found := false
|
|
||||||
for _, evt := range events {
|
|
||||||
if evt.EventType == "agent.status" {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
t.Error("expected broker broadcast with event type 'agent.status' on disconnect")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandlePresence_EmptyAgentID(t *testing.T) {
|
|
||||||
client, _, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{"agentId":"","connected":true}`)
|
|
||||||
client.handlePresence(payload)
|
|
||||||
|
|
||||||
events := capture.captured()
|
|
||||||
if len(events) > 0 {
|
|
||||||
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleAgentConfig(t *testing.T) {
|
|
||||||
client, repo, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
repo.agents["rex"] = models.AgentCardData{
|
|
||||||
ID: "rex",
|
|
||||||
DisplayName: "Rex",
|
|
||||||
Role: "Frontend Dev",
|
|
||||||
Status: models.AgentStatusIdle,
|
|
||||||
Channel: "discord",
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{
|
|
||||||
"id": "rex",
|
|
||||||
"name": "Rex the Dev",
|
|
||||||
"role": "Senior Frontend",
|
|
||||||
"channel": "telegram"
|
|
||||||
}`)
|
|
||||||
|
|
||||||
client.handleAgentConfig(payload)
|
|
||||||
|
|
||||||
repo.mu.Lock()
|
|
||||||
agent := repo.agents["rex"]
|
|
||||||
calls := make([]updateCall, len(repo.updateCalls))
|
|
||||||
copy(calls, repo.updateCalls)
|
|
||||||
repo.mu.Unlock()
|
|
||||||
|
|
||||||
// Verify DisplayName and Role updated
|
|
||||||
if agent.DisplayName != "Rex the Dev" {
|
|
||||||
t.Errorf("displayName = %q, want %q", agent.DisplayName, "Rex the Dev")
|
|
||||||
}
|
|
||||||
if agent.Role != "Senior Frontend" {
|
|
||||||
t.Errorf("role = %q, want %q", agent.Role, "Senior Frontend")
|
|
||||||
}
|
|
||||||
if agent.Channel != "telegram" {
|
|
||||||
t.Errorf("channel = %q, want %q", agent.Channel, "telegram")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify update was called
|
|
||||||
if len(calls) == 0 {
|
|
||||||
t.Fatal("expected at least one update call")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify broker fires "fleet.update"
|
|
||||||
events := capture.captured()
|
|
||||||
found := false
|
|
||||||
for _, evt := range events {
|
|
||||||
if evt.EventType == "fleet.update" {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
t.Error("expected broker broadcast with event type 'fleet.update'")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleAgentConfig_EmptyID(t *testing.T) {
|
|
||||||
client, _, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{"id":"","name":"Ghost"}`)
|
|
||||||
client.handleAgentConfig(payload)
|
|
||||||
|
|
||||||
events := capture.captured()
|
|
||||||
if len(events) > 0 {
|
|
||||||
t.Errorf("expected no broadcasts for empty id, got %d", len(events))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleAgentConfig_NotFound(t *testing.T) {
|
|
||||||
client, _, _, capture := newTestWSClient()
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
payload := json.RawMessage(`{"id":"unknown","name":"Ghost","role":"Phantom"}`)
|
|
||||||
client.handleAgentConfig(payload)
|
|
||||||
|
|
||||||
// Agent doesn't exist in repo, so Update will fail → handler logs warning, returns early
|
|
||||||
events := capture.captured()
|
|
||||||
for _, evt := range events {
|
|
||||||
if evt.EventType == "fleet.update" {
|
|
||||||
t.Error("fleet.update should not be broadcast when agent update fails")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -16,8 +16,6 @@ import (
|
|||||||
// ── RPC response types ───────────────────────────────────────────────────
|
// ── RPC response types ───────────────────────────────────────────────────
|
||||||
|
|
||||||
// agentListItem represents a single agent returned by the agents.list RPC.
|
// agentListItem represents a single agent returned by the agents.list RPC.
|
||||||
// Fields are extracted gracefully from json.RawMessage so unknown fields
|
|
||||||
// from the gateway are silently ignored.
|
|
||||||
type agentListItem struct {
|
type agentListItem struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@@ -42,14 +40,9 @@ type sessionListItem struct {
|
|||||||
// persists them, merges session state into agent cards, and broadcasts
|
// persists them, merges session state into agent cards, and broadcasts
|
||||||
// the merged fleet as a fleet.update event.
|
// the merged fleet as a fleet.update event.
|
||||||
func (c *WSClient) initialSync(ctx context.Context) error {
|
func (c *WSClient) initialSync(ctx context.Context) error {
|
||||||
if c.agents == nil {
|
|
||||||
c.logger.Info("initial sync skipped (no repository)")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Info("initial sync starting")
|
c.logger.Info("initial sync starting")
|
||||||
|
|
||||||
// 1. Fetch agents
|
// 1. Fetch agents via RPC
|
||||||
agentsRaw, err := c.Send("agents.list", nil)
|
agentsRaw, err := c.Send("agents.list", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("agents.list RPC: %w", err)
|
return fmt.Errorf("agents.list RPC: %w", err)
|
||||||
@@ -62,7 +55,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
|
|
||||||
c.logger.Info("agents.list received", "count", len(agentItems))
|
c.logger.Info("agents.list received", "count", len(agentItems))
|
||||||
|
|
||||||
// 2. Persist each agent
|
// 2. Persist each agent (create if not exists, update if changed)
|
||||||
for _, item := range agentItems {
|
for _, item := range agentItems {
|
||||||
card := agentItemToCard(item)
|
card := agentItemToCard(item)
|
||||||
|
|
||||||
@@ -77,13 +70,12 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Agent exists — update if display name or role changed
|
// Agent exists — update display name or role if changed
|
||||||
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
|
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
|
||||||
newName := card.DisplayName
|
// Update what we can via UpdateAgentRequest
|
||||||
newRole := card.Role
|
channel := card.Channel
|
||||||
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
||||||
DisplayName: &newName,
|
Channel: &channel,
|
||||||
Role: &newRole,
|
|
||||||
})
|
})
|
||||||
if updateErr != nil {
|
if updateErr != nil {
|
||||||
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
|
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
|
||||||
@@ -91,7 +83,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Fetch sessions
|
// 3. Fetch sessions via RPC
|
||||||
sessionsRaw, err := c.Send("sessions.list", nil)
|
sessionsRaw, err := c.Send("sessions.list", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sessions.list RPC: %w", err)
|
return fmt.Errorf("sessions.list RPC: %w", err)
|
||||||
@@ -104,7 +96,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
|
|
||||||
c.logger.Info("sessions.list received", "count", len(sessionItems))
|
c.logger.Info("sessions.list received", "count", len(sessionItems))
|
||||||
|
|
||||||
// 4. Build a map of agentId → session for merge
|
// 4. Build agentId → session map for merge
|
||||||
sessionByAgent := make(map[string]sessionListItem)
|
sessionByAgent := make(map[string]sessionListItem)
|
||||||
for _, s := range sessionItems {
|
for _, s := range sessionItems {
|
||||||
if s.AgentID != "" {
|
if s.AgentID != "" {
|
||||||
@@ -112,26 +104,25 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Merge session state into agents and update + broadcast
|
// 5. Merge session state into agents, update DB, and collect for broadcast
|
||||||
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
|
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
|
||||||
|
|
||||||
for _, item := range agentItems {
|
for _, item := range agentItems {
|
||||||
card := agentItemToCard(item)
|
card := agentItemToCard(item)
|
||||||
|
|
||||||
if session, ok := sessionByAgent[item.ID]; ok {
|
if session, ok := sessionByAgent[item.ID]; ok {
|
||||||
// Merge session state
|
// Merge session state into agent card
|
||||||
card.SessionKey = session.SessionKey
|
card.SessionKey = session.SessionKey
|
||||||
card.Status = mapSessionStatus(session.Status)
|
card.Status = mapSessionStatus(session.Status)
|
||||||
card.LastActivity = session.LastActivityAt
|
card.LastActivity = session.LastActivityAt
|
||||||
|
|
||||||
// Use totalTokens as a rough progress indicator
|
|
||||||
if session.TotalTokens > 0 {
|
if session.TotalTokens > 0 {
|
||||||
prog := min(session.TotalTokens/100, 100) // normalize to 0-100
|
prog := min(session.TotalTokens/100, 100)
|
||||||
card.TaskProgress = &prog
|
card.TaskProgress = &prog
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persist merged state
|
// Persist merged status change
|
||||||
existing, err := c.agents.Get(ctx, card.ID)
|
existing, err := c.agents.Get(ctx, card.ID)
|
||||||
if err == nil && existing.Status != card.Status {
|
if err == nil && existing.Status != card.Status {
|
||||||
status := card.Status
|
status := card.Status
|
||||||
@@ -155,8 +146,8 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
|
|
||||||
// mapSessionStatus converts a gateway session status string to an AgentStatus.
|
// mapSessionStatus converts a gateway session status string to an AgentStatus.
|
||||||
// - "running" / "streaming" → active
|
// - "running" / "streaming" → active
|
||||||
// - "error" → error
|
// - "error" → error
|
||||||
// - "done" / "" / other → idle
|
// - "done" / "" / other → idle
|
||||||
func mapSessionStatus(status string) models.AgentStatus {
|
func mapSessionStatus(status string) models.AgentStatus {
|
||||||
switch status {
|
switch status {
|
||||||
case "running", "streaming":
|
case "running", "streaming":
|
||||||
@@ -177,7 +168,7 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
|
|||||||
}
|
}
|
||||||
channel := item.Channel
|
channel := item.Channel
|
||||||
if channel == "" {
|
if channel == "" {
|
||||||
channel = "unknown"
|
channel = "discord"
|
||||||
}
|
}
|
||||||
name := item.Name
|
name := item.Name
|
||||||
if name == "" {
|
if name == "" {
|
||||||
@@ -185,12 +176,12 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return models.AgentCardData{
|
return models.AgentCardData{
|
||||||
ID: item.ID,
|
ID: item.ID,
|
||||||
DisplayName: name,
|
DisplayName: name,
|
||||||
Role: role,
|
Role: role,
|
||||||
Status: models.AgentStatusIdle, // default; will be overridden by session merge
|
Status: models.AgentStatusIdle, // default; overridden by session merge
|
||||||
SessionKey: "",
|
SessionKey: "",
|
||||||
Channel: channel,
|
Channel: channel,
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,236 +0,0 @@
|
|||||||
package gateway
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestInitialSync(t *testing.T) {
|
|
||||||
_ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles
|
|
||||||
broker := handler.NewBroker()
|
|
||||||
capture := newBroadcastCapture(broker)
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
// --- Test agentItemToCard + session merge (the core of initialSync) ---
|
|
||||||
|
|
||||||
agentItems := []agentListItem{
|
|
||||||
{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"},
|
|
||||||
{ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"},
|
|
||||||
}
|
|
||||||
|
|
||||||
sessionItems := []sessionListItem{
|
|
||||||
{SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"},
|
|
||||||
{SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build sessionByAgent map (mirrors initialSync logic)
|
|
||||||
sessionByAgent := make(map[string]sessionListItem)
|
|
||||||
for _, s := range sessionItems {
|
|
||||||
if s.AgentID != "" {
|
|
||||||
sessionByAgent[s.AgentID] = s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge and verify
|
|
||||||
merged := make([]models.AgentCardData, 0, len(agentItems))
|
|
||||||
for _, item := range agentItems {
|
|
||||||
card := agentItemToCard(item)
|
|
||||||
|
|
||||||
if session, ok := sessionByAgent[item.ID]; ok {
|
|
||||||
card.SessionKey = session.SessionKey
|
|
||||||
card.Status = mapSessionStatus(session.Status)
|
|
||||||
card.LastActivity = session.LastActivityAt
|
|
||||||
|
|
||||||
if session.TotalTokens > 0 {
|
|
||||||
prog := min(session.TotalTokens/100, 100)
|
|
||||||
card.TaskProgress = &prog
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
merged = append(merged, card)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify otto: running → active
|
|
||||||
if merged[0].ID != "otto" {
|
|
||||||
t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto")
|
|
||||||
}
|
|
||||||
if merged[0].Status != models.AgentStatusActive {
|
|
||||||
t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive)
|
|
||||||
}
|
|
||||||
if merged[0].SessionKey != "s1" {
|
|
||||||
t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1")
|
|
||||||
}
|
|
||||||
if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 {
|
|
||||||
t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify dex: done → idle
|
|
||||||
if merged[1].ID != "dex" {
|
|
||||||
t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex")
|
|
||||||
}
|
|
||||||
if merged[1].Status != models.AgentStatusIdle {
|
|
||||||
t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle)
|
|
||||||
}
|
|
||||||
if merged[1].SessionKey != "s2" {
|
|
||||||
t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInitialSync_PersistCreatesNew(t *testing.T) {
|
|
||||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
|
||||||
broker := handler.NewBroker()
|
|
||||||
capture := newBroadcastCapture(broker)
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
// Simulate the persist logic from initialSync:
|
|
||||||
// new agents should be created
|
|
||||||
card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"})
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// Agent doesn't exist → create
|
|
||||||
_, err := repo.Get(ctx, card.ID)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("expected agent to not exist yet")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := repo.Create(ctx, card); err != nil {
|
|
||||||
t.Fatalf("Create failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
got, err := repo.Get(ctx, card.ID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Get after Create failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if got.ID != "otto" {
|
|
||||||
t.Errorf("got.ID = %q, want %q", got.ID, "otto")
|
|
||||||
}
|
|
||||||
if got.DisplayName != "Otto" {
|
|
||||||
t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto")
|
|
||||||
}
|
|
||||||
if got.Role != "Orchestrator" {
|
|
||||||
t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInitialSync_PersistUpdatesExisting(t *testing.T) {
|
|
||||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
|
||||||
broker := handler.NewBroker()
|
|
||||||
capture := newBroadcastCapture(broker)
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// Pre-populate with existing agent
|
|
||||||
repo.agents["otto"] = models.AgentCardData{
|
|
||||||
ID: "otto",
|
|
||||||
DisplayName: "Otto",
|
|
||||||
Role: "Old Role",
|
|
||||||
Status: models.AgentStatusIdle,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simulate initialSync: agent exists, name/role changed → update
|
|
||||||
newName := "Otto Prime"
|
|
||||||
newRole := "Super Orchestrator"
|
|
||||||
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
|
|
||||||
DisplayName: &newName,
|
|
||||||
Role: &newRole,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Update failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
got, err := repo.Get(ctx, "otto")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Get after Update failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if got.DisplayName != "Otto Prime" {
|
|
||||||
t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime")
|
|
||||||
}
|
|
||||||
if got.Role != "Super Orchestrator" {
|
|
||||||
t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInitialSync_MergesSessionStatus(t *testing.T) {
|
|
||||||
// When initialSync merges session state, an agent whose existing status
|
|
||||||
// differs from the session-derived status should be updated.
|
|
||||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
repo.agents["otto"] = models.AgentCardData{
|
|
||||||
ID: "otto",
|
|
||||||
DisplayName: "Otto",
|
|
||||||
Role: "Orchestrator",
|
|
||||||
Status: models.AgentStatusIdle,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simulate session merge: session says "running" → agent should go active
|
|
||||||
activeStatus := mapSessionStatus("running")
|
|
||||||
if activeStatus != models.AgentStatusActive {
|
|
||||||
t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
|
|
||||||
Status: &activeStatus,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Update failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
got, err := repo.Get(ctx, "otto")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Get failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if got.Status != models.AgentStatusActive {
|
|
||||||
t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInitialSync_BroadcastsFleet(t *testing.T) {
|
|
||||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
|
||||||
broker := handler.NewBroker()
|
|
||||||
capture := newBroadcastCapture(broker)
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
// Create some agents in the repo
|
|
||||||
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive}
|
|
||||||
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
|
|
||||||
|
|
||||||
// Simulate the final broadcast from initialSync
|
|
||||||
mergedAgents := []models.AgentCardData{
|
|
||||||
repo.agents["otto"],
|
|
||||||
repo.agents["dex"],
|
|
||||||
}
|
|
||||||
broker.Broadcast("fleet.update", mergedAgents)
|
|
||||||
|
|
||||||
events := capture.captured()
|
|
||||||
if len(events) == 0 {
|
|
||||||
t.Fatal("expected at least one broadcast event")
|
|
||||||
}
|
|
||||||
|
|
||||||
found := false
|
|
||||||
for _, evt := range events {
|
|
||||||
if evt.EventType == "fleet.update" {
|
|
||||||
found = true
|
|
||||||
// Verify data is the merged agents list
|
|
||||||
agents, ok := evt.Data.([]models.AgentCardData)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data)
|
|
||||||
}
|
|
||||||
if len(agents) != 2 {
|
|
||||||
t.Errorf("fleet.update agents count = %d, want 2", len(agents))
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
t.Error("expected fleet.update broadcast event")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
// Package gateway provides WebSocket client integration with the OpenClaw
|
// Package gateway provides WebSocket client integration with the OpenClaw
|
||||||
// gateway using WS protocol v3. The WSClient handles connection, handshake,
|
// gateway using WS protocol v3. The WSClient handles connection, handshake,
|
||||||
// frame routing, request/response correlation, and automatic reconnection
|
// frame routing, request/response correlation, and automatic reconnection
|
||||||
// with exponential backoff.
|
// with exponential backoff (1s → 30s max).
|
||||||
package gateway
|
package gateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -15,8 +15,8 @@ import (
|
|||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WSConfig holds WebSocket client configuration, typically loaded from
|
// WSConfig holds WebSocket client configuration, typically loaded from
|
||||||
@@ -41,21 +41,19 @@ type eventHandler func(json.RawMessage)
|
|||||||
|
|
||||||
// WSClient connects to the OpenClaw gateway over WebSocket, completes the
|
// WSClient connects to the OpenClaw gateway over WebSocket, completes the
|
||||||
// v3 handshake, routes incoming frames, and automatically reconnects on
|
// v3 handshake, routes incoming frames, and automatically reconnects on
|
||||||
// disconnect with exponential backoff.
|
// disconnect with exponential backoff (1s → 30s max).
|
||||||
type WSClient struct {
|
type WSClient struct {
|
||||||
config WSConfig
|
config WSConfig
|
||||||
conn *websocket.Conn
|
conn *websocket.Conn
|
||||||
connMu sync.Mutex // protects conn for writes
|
connMu sync.Mutex // protects conn for writes
|
||||||
pending map[string]chan<- json.RawMessage
|
pending map[string]chan<- json.RawMessage
|
||||||
mu sync.Mutex // protects pending and handlers
|
mu sync.Mutex // protects pending and handlers
|
||||||
agents repository.AgentRepo
|
agents repository.AgentRepo
|
||||||
broker *handler.Broker
|
broker *handler.Broker
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
handlers map[string][]eventHandler
|
||||||
handlers map[string][]eventHandler
|
connID string // set after successful hello-ok
|
||||||
connId string // set after successful hello-ok
|
restClient *Client // optional REST client to notify on WS ready
|
||||||
restClient *Client // optional REST client to notify on WS ready
|
|
||||||
wsReadyOnce sync.Once // ensures MarkWSReady close is one-shot
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWSClient returns a WSClient wired to the given repository and broker.
|
// NewWSClient returns a WSClient wired to the given repository and broker.
|
||||||
@@ -81,7 +79,7 @@ func (c *WSClient) SetRESTClient(rest *Client) {
|
|||||||
|
|
||||||
// OnEvent registers a handler for the given event name. Handlers are called
|
// OnEvent registers a handler for the given event name. Handlers are called
|
||||||
// when an incoming frame with type "event" and matching event name is
|
// when an incoming frame with type "event" and matching event name is
|
||||||
// received. This is safe to call before Start.
|
// received. Safe to call before Start.
|
||||||
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
@@ -94,7 +92,7 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
|||||||
type wsFrame struct {
|
type wsFrame struct {
|
||||||
Type string `json:"type"` // "req", "res", "event"
|
Type string `json:"type"` // "req", "res", "event"
|
||||||
ID string `json:"id,omitempty"` // request/response correlation
|
ID string `json:"id,omitempty"` // request/response correlation
|
||||||
Method string `json:"method,omitempty"` // method name (req frames)
|
Method string `json:"method,omitempty"` // method name (req/res frames)
|
||||||
Event string `json:"event,omitempty"` // event name (event frames)
|
Event string `json:"event,omitempty"` // event name (event frames)
|
||||||
Params json.RawMessage `json:"params,omitempty"`
|
Params json.RawMessage `json:"params,omitempty"`
|
||||||
Result json.RawMessage `json:"result,omitempty"`
|
Result json.RawMessage `json:"result,omitempty"`
|
||||||
@@ -130,7 +128,7 @@ type connectAuth struct {
|
|||||||
|
|
||||||
// helloOKResponse represents the expected response to a successful connect.
|
// helloOKResponse represents the expected response to a successful connect.
|
||||||
type helloOKResponse struct {
|
type helloOKResponse struct {
|
||||||
ConnID string `json:"connId"`
|
ConnID string `json:"connId"`
|
||||||
Features struct {
|
Features struct {
|
||||||
Methods []string `json:"methods"`
|
Methods []string `json:"methods"`
|
||||||
Events []string `json:"events"`
|
Events []string `json:"events"`
|
||||||
@@ -140,12 +138,11 @@ type helloOKResponse struct {
|
|||||||
// ── Start loop ───────────────────────────────────────────────────────────
|
// ── Start loop ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
// Start connects to the gateway, completes the handshake, and begins the
|
// Start connects to the gateway, completes the handshake, and begins the
|
||||||
// read loop. On disconnect it reconnects with exponential backoff. On
|
// read loop. On disconnect it reconnects with exponential backoff (1s → 30s).
|
||||||
// ctx cancellation it performs a clean shutdown.
|
// On ctx cancellation it performs a clean shutdown.
|
||||||
func (c *WSClient) Start(ctx context.Context) {
|
func (c *WSClient) Start(ctx context.Context) {
|
||||||
initialBackoff := 1 * time.Second
|
backoff := 1 * time.Second
|
||||||
maxBackoff := 30 * time.Second
|
maxBackoff := 30 * time.Second
|
||||||
backoff := initialBackoff
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
err := c.connectAndRun(ctx)
|
err := c.connectAndRun(ctx)
|
||||||
@@ -157,9 +154,6 @@ func (c *WSClient) Start(ctx context.Context) {
|
|||||||
c.logger.Warn("ws client disconnected, reconnecting",
|
c.logger.Warn("ws client disconnected, reconnecting",
|
||||||
"error", err,
|
"error", err,
|
||||||
"backoff", backoff)
|
"backoff", backoff)
|
||||||
} else {
|
|
||||||
// Reset backoff on successful connect+run completion
|
|
||||||
backoff = initialBackoff
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@@ -194,26 +188,14 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
c.conn = conn
|
c.conn = conn
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
// When context is cancelled, close the conn to unblock ReadJSON in readLoop.
|
defer conn.Close()
|
||||||
go func() {
|
|
||||||
<-ctx.Done()
|
|
||||||
c.connMu.Lock()
|
|
||||||
if c.conn != nil {
|
|
||||||
c.conn.Close()
|
|
||||||
}
|
|
||||||
c.connMu.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
conn.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Step 1: Read the connect.challenge frame
|
// Step 1: Read the connect.challenge frame
|
||||||
if err := c.readChallenge(conn); err != nil {
|
if err := c.readChallenge(conn); err != nil {
|
||||||
return fmt.Errorf("handshake challenge: %w", err)
|
return fmt.Errorf("handshake challenge: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 2: Send connect request
|
// Step 2: Send connect request and read hello-ok response
|
||||||
helloOK, err := c.sendConnect(conn)
|
helloOK, err := c.sendConnect(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("handshake connect: %w", err)
|
return fmt.Errorf("handshake connect: %w", err)
|
||||||
@@ -224,9 +206,8 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
"methods", helloOK.Features.Methods,
|
"methods", helloOK.Features.Methods,
|
||||||
"events", helloOK.Features.Events)
|
"events", helloOK.Features.Events)
|
||||||
|
|
||||||
// Store connId for reference
|
|
||||||
c.connMu.Lock()
|
c.connMu.Lock()
|
||||||
c.connId = helloOK.ConnID
|
c.connID = helloOK.ConnID
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
// Notify REST client that WS is live so it stands down
|
// Notify REST client that WS is live so it stands down
|
||||||
@@ -235,18 +216,15 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
c.logger.Info("ws client notified REST fallback to stand down")
|
c.logger.Info("ws client notified REST fallback to stand down")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
// Step 3: Initial sync — fetch agents + sessions from gateway
|
||||||
c.wsReadyOnce = sync.Once{}
|
|
||||||
|
|
||||||
// Step 2b: Initial sync — fetch agents + sessions from gateway
|
|
||||||
if err := c.initialSync(ctx); err != nil {
|
if err := c.initialSync(ctx); err != nil {
|
||||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
c.logger.Warn("initial sync failed, continuing with read loop", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 2c: Register live event handlers
|
// Step 4: Register live event handlers
|
||||||
c.registerEventHandlers()
|
c.registerEventHandlers()
|
||||||
|
|
||||||
// Step 3: Read loop
|
// Step 5: Read loop — blocks until disconnect or ctx cancel
|
||||||
return c.readLoop(ctx, conn)
|
return c.readLoop(ctx, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -262,7 +240,7 @@ func (c *WSClient) readChallenge(conn *websocket.Conn) error {
|
|||||||
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
|
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debug("received connect.challenge", "params", string(frame.Params))
|
c.logger.Debug("received connect.challenge")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,8 +293,6 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
|
|||||||
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
|
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for hello-ok method in the result
|
|
||||||
// The gateway responds with method "hello-ok" on success
|
|
||||||
var helloOK helloOKResponse
|
var helloOK helloOKResponse
|
||||||
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
|
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
|
||||||
return nil, fmt.Errorf("parse hello-ok: %w", err)
|
return nil, fmt.Errorf("parse hello-ok: %w", err)
|
||||||
@@ -326,16 +302,25 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// readLoop continuously reads frames from the connection and routes them.
|
// readLoop continuously reads frames from the connection and routes them.
|
||||||
// It returns on read error or when the connection is closed by the ctx-done
|
// It returns on read error or context cancellation.
|
||||||
// goroutine started in connectAndRun.
|
|
||||||
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
|
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Clean shutdown: send close frame
|
||||||
|
c.connMu.Lock()
|
||||||
|
c.conn.WriteControl(
|
||||||
|
websocket.CloseMessage,
|
||||||
|
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"),
|
||||||
|
time.Now().Add(5*time.Second),
|
||||||
|
)
|
||||||
|
c.connMu.Unlock()
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
var frame wsFrame
|
var frame wsFrame
|
||||||
if err := conn.ReadJSON(&frame); err != nil {
|
if err := conn.ReadJSON(&frame); err != nil {
|
||||||
if ctx.Err() != nil {
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
// Check if it's a close error
|
|
||||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
||||||
c.logger.Info("ws connection closed by server")
|
c.logger.Info("ws connection closed by server")
|
||||||
return nil
|
return nil
|
||||||
@@ -359,7 +344,7 @@ func (c *WSClient) routeFrame(frame wsFrame) {
|
|||||||
case "event":
|
case "event":
|
||||||
c.handleEvent(frame)
|
c.handleEvent(frame)
|
||||||
default:
|
default:
|
||||||
c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID)
|
c.logger.Debug("unknown frame type", "type", frame.Type, "id", frame.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -378,7 +363,6 @@ func (c *WSClient) handleResponse(frame wsFrame) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if frame.Error != nil {
|
if frame.Error != nil {
|
||||||
// Send nil to signal error; caller checks via Send return
|
|
||||||
ch <- nil
|
ch <- nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -402,17 +386,20 @@ func (c *WSClient) handleEvent(frame wsFrame) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Send ─────────────────────────────────────────────────────────────────
|
// ── Send (RPC) ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
// Send sends a JSON request to the gateway and returns the response payload.
|
// Send sends a JSON-RPC request to the gateway and returns the response
|
||||||
// It is safe for concurrent use. Returns an error if the client is not
|
// payload. It is safe for concurrent use.
|
||||||
// connected.
|
|
||||||
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
||||||
reqID := uuid.New().String()
|
reqID := uuid.New().String()
|
||||||
|
|
||||||
paramsJSON, err := json.Marshal(params)
|
var paramsJSON json.RawMessage
|
||||||
if err != nil {
|
if params != nil {
|
||||||
return nil, fmt.Errorf("marshal params: %w", err)
|
var err error
|
||||||
|
paramsJSON, err = json.Marshal(params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal params: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register pending response channel
|
// Register pending response channel
|
||||||
@@ -436,11 +423,7 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c.connMu.Lock()
|
c.connMu.Lock()
|
||||||
if c.conn == nil {
|
err := c.conn.WriteJSON(frame)
|
||||||
c.connMu.Unlock()
|
|
||||||
return nil, fmt.Errorf("gateway: not connected")
|
|
||||||
}
|
|
||||||
err = c.conn.WriteJSON(frame)
|
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -451,10 +434,10 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
|||||||
select {
|
select {
|
||||||
case resp := <-respCh:
|
case resp := <-respCh:
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
return nil, fmt.Errorf("gateway returned error for request %s", reqID)
|
return nil, fmt.Errorf("gateway returned error for request %s (%s)", reqID, method)
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
case <-time.After(30 * time.Second):
|
case <-time.After(30 * time.Second):
|
||||||
return nil, fmt.Errorf("request %s timed out", reqID)
|
return nil, fmt.Errorf("request %s (%s) timed out", reqID, method)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,484 +0,0 @@
|
|||||||
package gateway
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"log/slog"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"strings"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ── Mock WebSocket server helper ─────────────────────────────────────────
|
|
||||||
|
|
||||||
// newTestWSServer creates an httptest.Server that upgrades to WebSocket and
|
|
||||||
// delegates each connection to handler. The server URL can be converted to
|
|
||||||
// a ws:// URL by replacing "http" with "ws".
|
|
||||||
func newTestWSServer(t *testing.T, handler func(conn *websocket.Conn)) *httptest.Server {
|
|
||||||
t.Helper()
|
|
||||||
upgrader := websocket.Upgrader{
|
|
||||||
CheckOrigin: func(r *http.Request) bool { return true },
|
|
||||||
}
|
|
||||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
conn, err := upgrader.Upgrade(w, r, nil)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
handler(conn)
|
|
||||||
}))
|
|
||||||
return srv
|
|
||||||
}
|
|
||||||
|
|
||||||
// wsURL converts an httptest.Server http URL to a ws URL.
|
|
||||||
func wsURL(srv *httptest.Server) string {
|
|
||||||
return "ws" + strings.TrimPrefix(srv.URL, "http")
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Handshake helper for mock server ─────────────────────────────────────
|
|
||||||
|
|
||||||
// handleHandshake performs the server side of the v3 handshake:
|
|
||||||
// 1. Send connect.challenge
|
|
||||||
// 2. Read connect request
|
|
||||||
// 3. Send hello-ok response
|
|
||||||
//
|
|
||||||
// Returns the connect request frame for inspection.
|
|
||||||
func handleHandshake(t *testing.T, conn *websocket.Conn) map[string]any {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
// 1. Send connect.challenge
|
|
||||||
challenge := map[string]any{
|
|
||||||
"type": "event",
|
|
||||||
"event": "connect.challenge",
|
|
||||||
"params": map[string]any{"nonce": "test-nonce", "ts": 1716180000000},
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(challenge); err != nil {
|
|
||||||
t.Fatalf("server: write challenge: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Read connect request
|
|
||||||
var req map[string]any
|
|
||||||
if err := conn.ReadJSON(&req); err != nil {
|
|
||||||
t.Fatalf("server: read connect request: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if req["method"] != "connect" {
|
|
||||||
t.Fatalf("server: expected method=connect, got %v", req["method"])
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Send hello-ok response
|
|
||||||
// Note: helloOKResponse expects ConnID at the top level of the result,
|
|
||||||
// matching the WSClient's JSON struct tags.
|
|
||||||
result := map[string]any{
|
|
||||||
"type": "hello-ok",
|
|
||||||
"protocol": 3,
|
|
||||||
"connId": "test-conn-123",
|
|
||||||
"features": map[string]any{"methods": []string{}, "events": []string{}},
|
|
||||||
"auth": map[string]any{"role": "operator", "scopes": []string{"operator.read"}},
|
|
||||||
}
|
|
||||||
res := map[string]any{
|
|
||||||
"type": "res",
|
|
||||||
"id": req["id"],
|
|
||||||
"ok": true,
|
|
||||||
"result": result,
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(res); err != nil {
|
|
||||||
t.Fatalf("server: write hello-ok: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return req
|
|
||||||
}
|
|
||||||
|
|
||||||
// keepAlive reads frames from the connection until an error occurs
|
|
||||||
// (e.g., the client disconnects). Used as the default "do nothing"
|
|
||||||
// server loop after handshake.
|
|
||||||
func keepAlive(conn *websocket.Conn) {
|
|
||||||
for {
|
|
||||||
var m map[string]any
|
|
||||||
if err := conn.ReadJSON(&m); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── 1. Test: Full handshake ──────────────────────────────────────────────
|
|
||||||
|
|
||||||
func TestWSClient_Handshake(t *testing.T) {
|
|
||||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
|
||||||
handleHandshake(t, conn)
|
|
||||||
keepAlive(conn)
|
|
||||||
})
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
client.Start(ctx)
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait briefly for handshake to complete
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
|
|
||||||
// Verify connId was set
|
|
||||||
client.connMu.Lock()
|
|
||||||
connID := client.connId
|
|
||||||
client.connMu.Unlock()
|
|
||||||
|
|
||||||
if connID != "test-conn-123" {
|
|
||||||
t.Errorf("expected connId 'test-conn-123', got %q", connID)
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Client exited cleanly
|
|
||||||
case <-time.After(3 * time.Second):
|
|
||||||
t.Fatal("WSClient did not shut down after context cancellation")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── 2. Test: Send() with response matching ───────────────────────────────
|
|
||||||
|
|
||||||
func TestWSClient_Send(t *testing.T) {
|
|
||||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
|
||||||
handleHandshake(t, conn)
|
|
||||||
|
|
||||||
// Read RPC requests and respond to each
|
|
||||||
for {
|
|
||||||
var req map[string]any
|
|
||||||
if err := conn.ReadJSON(&req); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
reqID, _ := req["id"].(string)
|
|
||||||
method, _ := req["method"].(string)
|
|
||||||
|
|
||||||
var result any
|
|
||||||
switch method {
|
|
||||||
case "agents.list":
|
|
||||||
result = map[string]any{
|
|
||||||
"agents": []map[string]any{
|
|
||||||
{"id": "otto", "name": "Otto"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
result = map[string]any{}
|
|
||||||
}
|
|
||||||
|
|
||||||
res := map[string]any{
|
|
||||||
"type": "res",
|
|
||||||
"id": reqID,
|
|
||||||
"ok": true,
|
|
||||||
"result": result,
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(res); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
go client.Start(ctx)
|
|
||||||
|
|
||||||
// Give the client time to complete handshake
|
|
||||||
time.Sleep(300 * time.Millisecond)
|
|
||||||
|
|
||||||
resp, err := client.Send("agents.list", nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Send() returned error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify the response payload
|
|
||||||
var result map[string]any
|
|
||||||
if err := json.Unmarshal(resp, &result); err != nil {
|
|
||||||
t.Fatalf("unmarshal response: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
agents, ok := result["agents"].([]any)
|
|
||||||
if !ok || len(agents) != 1 {
|
|
||||||
t.Errorf("expected 1 agent in response, got %v", result)
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── 3. Test: Event handler routing ───────────────────────────────────────
|
|
||||||
|
|
||||||
func TestWSClient_EventRouting(t *testing.T) {
|
|
||||||
eventReceived := make(chan json.RawMessage, 1)
|
|
||||||
|
|
||||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
|
||||||
handleHandshake(t, conn)
|
|
||||||
|
|
||||||
// After handshake, send a test event
|
|
||||||
evt := map[string]any{
|
|
||||||
"type": "event",
|
|
||||||
"event": "test.event",
|
|
||||||
"params": map[string]any{"greeting": "hello from server"},
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(evt); err != nil {
|
|
||||||
t.Logf("server: write event: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
keepAlive(conn)
|
|
||||||
})
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
|
||||||
|
|
||||||
// Register event handler BEFORE starting the client
|
|
||||||
client.OnEvent("test.event", func(payload json.RawMessage) {
|
|
||||||
eventReceived <- payload
|
|
||||||
})
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
go client.Start(ctx)
|
|
||||||
|
|
||||||
// Wait for the event handler to fire
|
|
||||||
select {
|
|
||||||
case payload := <-eventReceived:
|
|
||||||
var data map[string]any
|
|
||||||
if err := json.Unmarshal(payload, &data); err != nil {
|
|
||||||
t.Fatalf("unmarshal event payload: %v", err)
|
|
||||||
}
|
|
||||||
if greeting, _ := data["greeting"].(string); greeting != "hello from server" {
|
|
||||||
t.Errorf("expected greeting 'hello from server', got %q", greeting)
|
|
||||||
}
|
|
||||||
case <-time.After(3 * time.Second):
|
|
||||||
t.Fatal("timed out waiting for event handler to fire")
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── 4. Test: Concurrent Send ─────────────────────────────────────────────
|
|
||||||
|
|
||||||
func TestWSClient_ConcurrentSend(t *testing.T) {
|
|
||||||
var reqCount atomic.Int32
|
|
||||||
|
|
||||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
|
||||||
handleHandshake(t, conn)
|
|
||||||
|
|
||||||
// Read RPC requests and respond to each
|
|
||||||
for {
|
|
||||||
var req map[string]any
|
|
||||||
if err := conn.ReadJSON(&req); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
reqID, _ := req["id"].(string)
|
|
||||||
n := reqCount.Add(1)
|
|
||||||
|
|
||||||
res := map[string]any{
|
|
||||||
"type": "res",
|
|
||||||
"id": reqID,
|
|
||||||
"ok": true,
|
|
||||||
"result": map[string]any{"index": n, "method": req["method"]},
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(res); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
go client.Start(ctx)
|
|
||||||
|
|
||||||
// Give the client time to complete handshake
|
|
||||||
time.Sleep(300 * time.Millisecond)
|
|
||||||
|
|
||||||
// Fire 3 concurrent Send() calls
|
|
||||||
type sendResult struct {
|
|
||||||
method string
|
|
||||||
payload json.RawMessage
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
results := make(chan sendResult, 3)
|
|
||||||
|
|
||||||
methods := []string{"agents.list", "sessions.list", "agents.config"}
|
|
||||||
for _, method := range methods {
|
|
||||||
go func(m string) {
|
|
||||||
resp, err := client.Send(m, nil)
|
|
||||||
results <- sendResult{method: m, payload: resp, err: err}
|
|
||||||
}(method)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect all results
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
select {
|
|
||||||
case r := <-results:
|
|
||||||
if r.err != nil {
|
|
||||||
t.Errorf("Send(%q) returned error: %v", r.method, r.err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var result map[string]any
|
|
||||||
if err := json.Unmarshal(r.payload, &result); err != nil {
|
|
||||||
t.Errorf("Send(%q) unmarshal error: %v", r.method, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
gotMethod, _ := result["method"].(string)
|
|
||||||
if gotMethod != r.method {
|
|
||||||
t.Errorf("Send(%q) got response for %q (mismatched)", r.method, gotMethod)
|
|
||||||
}
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
t.Fatal("timed out waiting for concurrent Send results")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── 5. Test: Clean shutdown ──────────────────────────────────────────────
|
|
||||||
|
|
||||||
func TestWSClient_CleanShutdown(t *testing.T) {
|
|
||||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
|
||||||
handleHandshake(t, conn)
|
|
||||||
keepAlive(conn)
|
|
||||||
})
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
client.Start(ctx)
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Let the client connect and complete handshake
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
|
|
||||||
// Cancel context — should trigger clean shutdown
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Client exited cleanly — pass
|
|
||||||
case <-time.After(3 * time.Second):
|
|
||||||
t.Fatal("WSClient did not shut down cleanly within timeout")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Pure utility tests (from CUB-205) ─────────────────────────────────────
|
|
||||||
|
|
||||||
func TestMapSessionStatus(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
input string
|
|
||||||
expected models.AgentStatus
|
|
||||||
}{
|
|
||||||
{"running", models.AgentStatusActive},
|
|
||||||
{"streaming", models.AgentStatusActive},
|
|
||||||
{"done", models.AgentStatusIdle},
|
|
||||||
{"error", models.AgentStatusError},
|
|
||||||
{"", models.AgentStatusIdle},
|
|
||||||
{"garbage", models.AgentStatusIdle},
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
result := mapSessionStatus(tt.input)
|
|
||||||
if result != tt.expected {
|
|
||||||
t.Errorf("mapSessionStatus(%q) = %q, want %q", tt.input, result, tt.expected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAgentItemToCard(t *testing.T) {
|
|
||||||
t.Run("full fields", func(t *testing.T) {
|
|
||||||
item := agentListItem{
|
|
||||||
ID: "dex",
|
|
||||||
Name: "Dex",
|
|
||||||
Role: "backend",
|
|
||||||
Channel: "telegram",
|
|
||||||
}
|
|
||||||
card := agentItemToCard(item)
|
|
||||||
if card.ID != "dex" {
|
|
||||||
t.Errorf("ID = %q, want %q", card.ID, "dex")
|
|
||||||
}
|
|
||||||
if card.DisplayName != "Dex" {
|
|
||||||
t.Errorf("DisplayName = %q, want %q", card.DisplayName, "Dex")
|
|
||||||
}
|
|
||||||
if card.Role != "backend" {
|
|
||||||
t.Errorf("Role = %q, want %q", card.Role, "backend")
|
|
||||||
}
|
|
||||||
if card.Channel != "telegram" {
|
|
||||||
t.Errorf("Channel = %q, want %q", card.Channel, "telegram")
|
|
||||||
}
|
|
||||||
if card.Status != models.AgentStatusIdle {
|
|
||||||
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("empty fields use defaults", func(t *testing.T) {
|
|
||||||
item := agentListItem{
|
|
||||||
ID: "otto",
|
|
||||||
}
|
|
||||||
card := agentItemToCard(item)
|
|
||||||
if card.ID != "otto" {
|
|
||||||
t.Errorf("ID = %q, want %q", card.ID, "otto")
|
|
||||||
}
|
|
||||||
if card.DisplayName != "otto" {
|
|
||||||
t.Errorf("DisplayName = %q, want %q (should fallback to ID)", card.DisplayName, "otto")
|
|
||||||
}
|
|
||||||
if card.Role != "agent" {
|
|
||||||
t.Errorf("Role = %q, want %q (default)", card.Role, "agent")
|
|
||||||
}
|
|
||||||
if card.Channel != "unknown" {
|
|
||||||
t.Errorf("Channel = %q, want %q (per Grimm requirement)", card.Channel, "unknown")
|
|
||||||
}
|
|
||||||
if card.Status != models.AgentStatusIdle {
|
|
||||||
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("empty name falls back to ID", func(t *testing.T) {
|
|
||||||
item := agentListItem{
|
|
||||||
ID: "hex",
|
|
||||||
Name: "",
|
|
||||||
Role: "database",
|
|
||||||
}
|
|
||||||
card := agentItemToCard(item)
|
|
||||||
if card.DisplayName != "hex" {
|
|
||||||
t.Errorf("DisplayName = %q, want %q (ID fallback)", card.DisplayName, "hex")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStrPtr(t *testing.T) {
|
|
||||||
s := "hello"
|
|
||||||
p := strPtr(s)
|
|
||||||
if p == nil {
|
|
||||||
t.Fatal("strPtr returned nil")
|
|
||||||
}
|
|
||||||
if *p != s {
|
|
||||||
t.Errorf("strPtr(%q) = %q, want %q", s, *p, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
empty := ""
|
|
||||||
ep := strPtr(empty)
|
|
||||||
if *ep != empty {
|
|
||||||
t.Errorf("strPtr(empty) = %q, want %q", *ep, empty)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -63,15 +63,12 @@ type CreateAgentRequest struct {
|
|||||||
|
|
||||||
// UpdateAgentRequest is the payload for PUT /api/agents/{id}.
|
// UpdateAgentRequest is the payload for PUT /api/agents/{id}.
|
||||||
type UpdateAgentRequest struct {
|
type UpdateAgentRequest struct {
|
||||||
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
|
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
|
||||||
DisplayName *string `json:"displayName,omitempty"`
|
CurrentTask *string `json:"currentTask,omitempty"`
|
||||||
Role *string `json:"role,omitempty"`
|
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
|
||||||
LastActivityAt *string `json:"lastActivityAt,omitempty"`
|
TaskElapsed *string `json:"taskElapsed,omitempty"`
|
||||||
CurrentTask *string `json:"currentTask,omitempty"`
|
Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"`
|
||||||
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
|
ErrorMessage *string `json:"errorMessage,omitempty"`
|
||||||
TaskElapsed *string `json:"taskElapsed,omitempty"`
|
|
||||||
Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"`
|
|
||||||
ErrorMessage *string `json:"errorMessage,omitempty"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AgentStatusHistoryEntry represents a point-in-time status change for an agent.
|
// AgentStatusHistoryEntry represents a point-in-time status change for an agent.
|
||||||
|
|||||||
@@ -1,46 +0,0 @@
|
|||||||
# Control Center — Architecture Context
|
|
||||||
|
|
||||||
## Current State
|
|
||||||
|
|
||||||
The Control Center backend uses a **dual-path gateway client** architecture:
|
|
||||||
|
|
||||||
- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff.
|
|
||||||
- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup.
|
|
||||||
|
|
||||||
## Live Gateway Connection
|
|
||||||
|
|
||||||
### Startup Sequence
|
|
||||||
1. Both WS client and REST client start concurrently
|
|
||||||
2. REST client waits 30s for WS readiness signal (`wsReady` channel)
|
|
||||||
3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down")
|
|
||||||
4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling")
|
|
||||||
5. If no WS client configured → REST client polls immediately
|
|
||||||
|
|
||||||
### WebSocket Client (Primary)
|
|
||||||
- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN`
|
|
||||||
- Protocol: v3 handshake (challenge → connect → hello-ok)
|
|
||||||
- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update`
|
|
||||||
- Live events: `sessions.changed`, `presence`, `agent.config`
|
|
||||||
- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max)
|
|
||||||
|
|
||||||
### REST Poller (Fallback)
|
|
||||||
- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s)
|
|
||||||
- Only used when WS is unavailable
|
|
||||||
- Polls the `/api/agents` endpoint and syncs agent status changes
|
|
||||||
|
|
||||||
### Wiring
|
|
||||||
```
|
|
||||||
main.go
|
|
||||||
├── wsClient = NewWSClient(...)
|
|
||||||
├── restClient = NewClient(...)
|
|
||||||
├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready
|
|
||||||
├── restClient.SetWSClient(wsClient) // REST defers to WS
|
|
||||||
├── go wsClient.Start(ctx) // primary
|
|
||||||
└── go restClient.Start(ctx) // fallback (waits for WS)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Key Design Decisions
|
|
||||||
- **Push over poll**: WS is preferred for real-time updates; REST is a safety net
|
|
||||||
- **DB first, then SSE**: All event handlers persist to DB before broadcasting
|
|
||||||
- **Graceful degradation**: System works without WS; REST provides basic functionality
|
|
||||||
- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called
|
|
||||||
Reference in New Issue
Block a user