Compare commits
15 Commits
agent/dex/
...
agent/otto
| Author | SHA1 | Date | |
|---|---|---|---|
| a0eb393c6c | |||
| d294818581 | |||
| 9e0366e780 | |||
| b7a54c8461 | |||
| b6e44cb4f8 | |||
|
|
49b959aee5 | ||
|
|
ae37d79aa8 | ||
|
|
8fb4183abe | ||
|
|
4569fef11d | ||
|
|
7a93d43b7e | ||
| efcedde649 | |||
|
|
e131798f3b | ||
|
|
9062f8fa8d | ||
|
|
60ba3e5b4f | ||
|
|
70d39b87d1 |
17
.env.example
17
.env.example
@@ -12,16 +12,15 @@ ENVIRONMENT=development
|
|||||||
# Format: postgresql://user:password@host:port/database?sslmode=disable
|
# Format: postgresql://user:password@host:port/database?sslmode=disable
|
||||||
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
|
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
|
||||||
|
|
||||||
# Gateway (OpenClaw) connection — WebSocket (primary)
|
# Gateway (OpenClaw) connection
|
||||||
# WebSocket URL for real-time OpenClaw gateway v3 protocol
|
# WebSocket gateway config (primary path)
|
||||||
GATEWAY_WS_URL=ws://localhost:18789/
|
WS_GATEWAY_URL=ws://host.docker.internal:18789/
|
||||||
# Auth token for the OpenClaw gateway (operator scope)
|
# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment)
|
||||||
OPENCLAW_GATEWAY_TOKEN=
|
GATEWAY_TOKEN=
|
||||||
|
|
||||||
# Gateway (OpenClaw) connection — REST (fallback)
|
# REST poller config (fallback, only used if WS fails to connect)
|
||||||
# URL to the OpenClaw gateway API for polling agent states (used only if WS fails)
|
GATEWAY_URL=http://host.docker.internal:18789/api/agents
|
||||||
GATEWAY_URL=http://localhost:18789/api/agents
|
# Polling interval for agent state updates (fallback only)
|
||||||
# Polling interval for agent state updates
|
|
||||||
GATEWAY_POLL_INTERVAL=5s
|
GATEWAY_POLL_INTERVAL=5s
|
||||||
|
|
||||||
# ── Frontend Variables (via Vite) ───────────────────────────────────────
|
# ── Frontend Variables (via Vite) ───────────────────────────────────────
|
||||||
|
|||||||
11
ci-image/Dockerfile
Normal file
11
ci-image/Dockerfile
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
FROM catthehacker/ubuntu:act-latest
|
||||||
|
|
||||||
|
# Install Go 1.23
|
||||||
|
RUN curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
|
||||||
|
|
||||||
|
# Install Node 22
|
||||||
|
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
|
||||||
|
&& apt-get install -y nodejs \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
ENV PATH="/usr/local/go/bin:${PATH}"
|
||||||
@@ -16,6 +16,8 @@ services:
|
|||||||
- ENVIRONMENT=production
|
- ENVIRONMENT=production
|
||||||
- PORT=8080
|
- PORT=8080
|
||||||
- GATEWAY_URL=http://host.docker.internal:18789/api/agents
|
- GATEWAY_URL=http://host.docker.internal:18789/api/agents
|
||||||
|
- WS_GATEWAY_URL=ws://host.docker.internal:18789/
|
||||||
|
- GATEWAY_TOKEN=${GATEWAY_TOKEN:-}
|
||||||
depends_on:
|
depends_on:
|
||||||
db:
|
db:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
|||||||
@@ -63,29 +63,30 @@ func main() {
|
|||||||
Broker: broker,
|
Broker: broker,
|
||||||
})
|
})
|
||||||
|
|
||||||
// ── Gateway: WS primary + REST fallback ────────────────────────────────
|
// ── Gateway clients (WS primary, REST fallback) ───────────────────
|
||||||
// WebSocket client (primary — real-time events via OpenClaw v3 protocol)
|
// WS gateway client (primary path)
|
||||||
wsClient := gateway.NewWSClient(gateway.WSConfig{
|
wsClient := gateway.NewWSClient(gateway.WSConfig{
|
||||||
URL: cfg.WSGatewayURL,
|
URL: cfg.WSGatewayURL,
|
||||||
AuthToken: cfg.WSGatewayToken,
|
AuthToken: cfg.WSGatewayToken,
|
||||||
}, agentRepo, broker, logger)
|
}, agentRepo, broker, logger)
|
||||||
|
|
||||||
// REST polling client (fallback — only used if WS connection fails)
|
// REST gateway client (fallback — only polls if WS fails to connect)
|
||||||
restClient := gateway.NewClient(gateway.Config{
|
gwClient := gateway.NewClient(gateway.Config{
|
||||||
URL: cfg.GatewayURL,
|
URL: cfg.GatewayRestURL,
|
||||||
PollInterval: cfg.GatewayPollInterval,
|
PollInterval: cfg.GatewayRestPollInterval,
|
||||||
}, agentRepo, broker)
|
}, agentRepo, broker)
|
||||||
|
|
||||||
// Wire them: WS notifies REST to stand down on successful connect
|
// Wire them together: REST defers to WS when WS is connected
|
||||||
wsClient.SetRESTClient(restClient)
|
wsClient.SetRESTClient(gwClient)
|
||||||
|
gwClient.SetWSClient(wsClient)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Start WS client first (primary)
|
// Start WS client first (primary)
|
||||||
go wsClient.Start(ctx)
|
go wsClient.Start(ctx)
|
||||||
// Start REST client (fallback polling)
|
// Start REST client (will wait for WS, then stand down or fall back)
|
||||||
go restClient.Start(ctx)
|
go gwClient.Start(ctx)
|
||||||
|
|
||||||
// ── Server ─────────────────────────────────────────────────────────────
|
// ── Server ─────────────────────────────────────────────────────────────
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
@@ -111,7 +112,7 @@ func main() {
|
|||||||
<-quit
|
<-quit
|
||||||
slog.Info("shutting down server...")
|
slog.Info("shutting down server...")
|
||||||
|
|
||||||
cancel() // stop gateway clients
|
cancel() // stop gateway polling
|
||||||
|
|
||||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
defer shutdownCancel()
|
defer shutdownCancel()
|
||||||
|
|||||||
@@ -10,15 +10,15 @@ import (
|
|||||||
|
|
||||||
// Config holds all application configuration.
|
// Config holds all application configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port int
|
Port int
|
||||||
DatabaseURL string
|
DatabaseURL string
|
||||||
CORSOrigin string
|
CORSOrigin string
|
||||||
LogLevel string
|
LogLevel string
|
||||||
Environment string
|
Environment string
|
||||||
GatewayURL string // REST fallback URL
|
GatewayRestURL string
|
||||||
GatewayPollInterval time.Duration // REST fallback poll interval
|
GatewayRestPollInterval time.Duration
|
||||||
WSGatewayURL string // WebSocket gateway URL
|
WSGatewayURL string
|
||||||
WSGatewayToken string // WebSocket auth token
|
WSGatewayToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load reads configuration from environment variables, applying defaults where
|
// Load reads configuration from environment variables, applying defaults where
|
||||||
@@ -30,10 +30,10 @@ func Load() *Config {
|
|||||||
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
||||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||||
Environment: getEnv("ENVIRONMENT", "development"),
|
Environment: getEnv("ENVIRONMENT", "development"),
|
||||||
GatewayURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
||||||
GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||||
WSGatewayURL: getEnv("GATEWAY_WS_URL", "ws://host.docker.internal:18789/"),
|
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
||||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,6 @@
|
|||||||
// Package gateway provides an OpenClaw gateway integration client that
|
// Package gateway provides an OpenClaw gateway integration client that
|
||||||
// polls agent states, persists them via the repository layer, and broadcasts
|
// polls agent states, persists them via the repository layer, and broadcasts
|
||||||
// changes through the SSE broker for real-time frontend updates.
|
// changes through the SSE broker for real-time frontend updates.
|
||||||
//
|
|
||||||
// When a WSClient is wired via SetWSClient, the REST poller becomes a
|
|
||||||
// fallback: it waits for the WS client to signal readiness, and only starts
|
|
||||||
// polling if WS fails to connect within 30 seconds.
|
|
||||||
package gateway
|
package gateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -13,6 +9,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||||
@@ -22,15 +19,17 @@ import (
|
|||||||
|
|
||||||
// Client polls the OpenClaw gateway for agent status and keeps the database
|
// Client polls the OpenClaw gateway for agent status and keeps the database
|
||||||
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
|
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
|
||||||
// fallback that only activates if the WS connection fails.
|
// fallback: it waits for the WS client to signal readiness, and only starts
|
||||||
|
// polling if WS fails to connect after initial backoff retries.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
url string
|
url string
|
||||||
pollInterval time.Duration
|
pollInterval time.Duration
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
agents repository.AgentRepo
|
agents repository.AgentRepo
|
||||||
broker *handler.Broker
|
broker *handler.Broker
|
||||||
wsClient *Client // optional WS client; when set, REST is fallback only
|
wsClient *WSClient // optional WS client; when set, REST is fallback only
|
||||||
wsReady chan struct{} // closed once WS connection is established
|
wsReady chan struct{} // closed once WS connection is established
|
||||||
|
wsReadyOnce sync.Once // protects wsReady close from double-close race
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config holds gateway client configuration, typically loaded from environment.
|
// Config holds gateway client configuration, typically loaded from environment.
|
||||||
@@ -63,36 +62,56 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker)
|
|||||||
// to it. When set, the REST client waits for WS readiness before deciding
|
// to it. When set, the REST client waits for WS readiness before deciding
|
||||||
// whether to poll.
|
// whether to poll.
|
||||||
func (c *Client) SetWSClient(ws *WSClient) {
|
func (c *Client) SetWSClient(ws *WSClient) {
|
||||||
_ = ws // stored for future reconnection coordination
|
c.wsClient = ws
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkWSReady signals that the WS connection is live and the REST poller
|
// MarkWSReady signals that the WS connection is live and the REST poller
|
||||||
// should stand down. Called by WSClient after a successful handshake.
|
// should stand down. Called by WSClient after a successful handshake.
|
||||||
func (c *Client) MarkWSReady() {
|
func (c *Client) MarkWSReady() {
|
||||||
select {
|
c.wsReadyOnce.Do(func() {
|
||||||
case <-c.wsReady:
|
|
||||||
// already closed
|
|
||||||
default:
|
|
||||||
close(c.wsReady)
|
close(c.wsReady)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins the gateway client loop. When a WS client is wired, it
|
// Start begins the gateway client loop. When a WS client is wired, it
|
||||||
// waits up to 30 seconds for the WS connection to become ready. If WS
|
// waits up to 30 seconds for the WS connection to become ready. If WS
|
||||||
// connects, the REST poller stands down. If WS fails to connect within
|
// connects, the REST poller stands down and only logs periodically. If WS
|
||||||
// the timeout, REST polling activates as fallback.
|
// fails to connect within the timeout, REST polling activates as fallback.
|
||||||
func (c *Client) Start(ctx context.Context) {
|
func (c *Client) Start(ctx context.Context) {
|
||||||
slog.Info("gateway client starting",
|
if c.wsClient != nil {
|
||||||
"url", c.url,
|
slog.Info("gateway client waiting for WS connection", "timeout", "30s")
|
||||||
"pollInterval", c.pollInterval.String())
|
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.wsReady:
|
||||||
|
slog.Info("gateway client using WS — REST poller standing down")
|
||||||
|
// WS is live; keep this goroutine alive but idle. If WS
|
||||||
|
// disconnects later, we could re-enter polling, but for now
|
||||||
|
// the WS client handles its own reconnection.
|
||||||
|
<-ctx.Done()
|
||||||
|
slog.Info("gateway client stopped (WS mode)")
|
||||||
|
return
|
||||||
|
case <-time.After(30 * time.Second):
|
||||||
|
slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling",
|
||||||
|
"url", c.url,
|
||||||
|
"pollInterval", c.pollInterval.String())
|
||||||
|
case <-ctx.Done():
|
||||||
|
slog.Info("gateway client stopped while waiting for WS")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
slog.Info("gateway client using REST polling (no WS client configured)",
|
||||||
|
"url", c.url,
|
||||||
|
"pollInterval", c.pollInterval.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// REST fallback polling
|
||||||
ticker := time.NewTicker(c.pollInterval)
|
ticker := time.NewTicker(c.pollInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
slog.Info("gateway client stopped")
|
slog.Info("gateway client stopped (REST fallback)")
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
c.poll(ctx)
|
c.poll(ctx)
|
||||||
@@ -121,6 +140,7 @@ func (c *Client) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, ga := range agents {
|
for _, ga := range agents {
|
||||||
|
// Check if agent already exists; if so, update; otherwise create.
|
||||||
existing, err := c.agents.Get(ctx, ga.ID)
|
existing, err := c.agents.Get(ctx, ga.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Not found — create it
|
// Not found — create it
|
||||||
@@ -165,51 +185,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
|||||||
slog.Info("seeding demo agents")
|
slog.Info("seeding demo agents")
|
||||||
demoAgents := []models.AgentCardData{
|
demoAgents := []models.AgentCardData{
|
||||||
{
|
{
|
||||||
ID: "otto",
|
ID: "otto",
|
||||||
DisplayName: "Otto",
|
DisplayName: "Otto",
|
||||||
Role: "Orchestrator",
|
Role: "Orchestrator",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Orchestrating tasks"),
|
CurrentTask: strPtr("Orchestrating tasks"),
|
||||||
SessionKey: "otto-session",
|
SessionKey: "otto-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "rex",
|
ID: "rex",
|
||||||
DisplayName: "Rex",
|
DisplayName: "Rex",
|
||||||
Role: "Frontend Dev",
|
Role: "Frontend Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "rex-session",
|
SessionKey: "rex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "dex",
|
ID: "dex",
|
||||||
DisplayName: "Dex",
|
DisplayName: "Dex",
|
||||||
Role: "Backend Dev",
|
Role: "Backend Dev",
|
||||||
Status: models.AgentStatusThinking,
|
Status: models.AgentStatusThinking,
|
||||||
CurrentTask: strPtr("Designing API contracts"),
|
CurrentTask: strPtr("Designing API contracts"),
|
||||||
SessionKey: "dex-session",
|
SessionKey: "dex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "hex",
|
ID: "hex",
|
||||||
DisplayName: "Hex",
|
DisplayName: "Hex",
|
||||||
Role: "Database Specialist",
|
Role: "Database Specialist",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Reviewing schema migrations"),
|
CurrentTask: strPtr("Reviewing schema migrations"),
|
||||||
SessionKey: "hex-session",
|
SessionKey: "hex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "pip",
|
ID: "pip",
|
||||||
DisplayName: "Pip",
|
DisplayName: "Pip",
|
||||||
Role: "Edge Device Dev",
|
Role: "Edge Device Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "pip-session",
|
SessionKey: "pip-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,15 +20,15 @@ import (
|
|||||||
// sessionChangedPayload represents a single session delta from a
|
// sessionChangedPayload represents a single session delta from a
|
||||||
// sessions.changed event.
|
// sessions.changed event.
|
||||||
type sessionChangedPayload struct {
|
type sessionChangedPayload struct {
|
||||||
SessionKey string `json:"sessionKey"`
|
SessionKey string `json:"sessionKey"`
|
||||||
AgentID string `json:"agentId"`
|
AgentID string `json:"agentId"`
|
||||||
Status string `json:"status"` // running, streaming, done, error
|
Status string `json:"status"` // running, streaming, done, error
|
||||||
TotalTokens int `json:"totalTokens"`
|
TotalTokens int `json:"totalTokens"`
|
||||||
LastActivityAt string `json:"lastActivityAt"`
|
LastActivityAt string `json:"lastActivityAt"`
|
||||||
CurrentTask string `json:"currentTask"`
|
CurrentTask string `json:"currentTask"`
|
||||||
TaskProgress *int `json:"taskProgress,omitempty"`
|
TaskProgress *int `json:"taskProgress,omitempty"`
|
||||||
TaskElapsed string `json:"taskElapsed"`
|
TaskElapsed string `json:"taskElapsed"`
|
||||||
ErrorMessage string `json:"errorMessage"`
|
ErrorMessage string `json:"errorMessage"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// presencePayload represents a device presence update event.
|
// presencePayload represents a device presence update event.
|
||||||
@@ -51,8 +51,18 @@ type agentConfigPayload struct {
|
|||||||
// ── Handler registration ─────────────────────────────────────────────────
|
// ── Handler registration ─────────────────────────────────────────────────
|
||||||
|
|
||||||
// registerEventHandlers sets up all live event handlers on the WSClient.
|
// registerEventHandlers sets up all live event handlers on the WSClient.
|
||||||
// Called once after a successful handshake + initial sync.
|
// Call this once after a successful handshake + initial sync.
|
||||||
func (c *WSClient) registerEventHandlers() {
|
func (c *WSClient) registerEventHandlers() {
|
||||||
|
if c.agents == nil || c.broker == nil {
|
||||||
|
c.logger.Info("event handlers skipped (no repository or broker)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear existing handlers to prevent duplicates on reconnect
|
||||||
|
c.mu.Lock()
|
||||||
|
c.handlers = make(map[string][]eventHandler)
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
c.OnEvent("sessions.changed", c.handleSessionsChanged)
|
c.OnEvent("sessions.changed", c.handleSessionsChanged)
|
||||||
c.OnEvent("presence", c.handlePresence)
|
c.OnEvent("presence", c.handlePresence)
|
||||||
c.OnEvent("agent.config", c.handleAgentConfig)
|
c.OnEvent("agent.config", c.handleAgentConfig)
|
||||||
@@ -68,11 +78,14 @@ func (c *WSClient) registerEventHandlers() {
|
|||||||
// For each changed session: map the gateway status to an AgentStatus, update
|
// For each changed session: map the gateway status to an AgentStatus, update
|
||||||
// the agent in the DB, then broadcast via SSE.
|
// the agent in the DB, then broadcast via SSE.
|
||||||
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
||||||
c.logger.Debug("handleSessionsChanged", "payload", string(payload))
|
c.logger.Debug("handleSessionsChanged start", "payload", string(payload))
|
||||||
|
|
||||||
// Try array first, then single object
|
// Try array first, then single object
|
||||||
var deltas []sessionChangedPayload
|
var deltas []sessionChangedPayload
|
||||||
if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 {
|
if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 {
|
||||||
|
// Array of deltas
|
||||||
|
} else {
|
||||||
|
// Try single object
|
||||||
var single sessionChangedPayload
|
var single sessionChangedPayload
|
||||||
if err := json.Unmarshal(payload, &single); err != nil {
|
if err := json.Unmarshal(payload, &single); err != nil {
|
||||||
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
|
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
|
||||||
@@ -92,27 +105,43 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
|||||||
|
|
||||||
agentStatus := mapSessionStatus(d.Status)
|
agentStatus := mapSessionStatus(d.Status)
|
||||||
|
|
||||||
|
// Build partial update
|
||||||
update := models.UpdateAgentRequest{
|
update := models.UpdateAgentRequest{
|
||||||
Status: &agentStatus,
|
Status: &agentStatus,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Session key
|
||||||
|
if d.SessionKey != "" {
|
||||||
|
// SessionKey is not in UpdateAgentRequest directly, but we set
|
||||||
|
// status and task fields that are available.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Current task
|
||||||
if d.CurrentTask != "" {
|
if d.CurrentTask != "" {
|
||||||
update.CurrentTask = &d.CurrentTask
|
update.CurrentTask = &d.CurrentTask
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Task progress
|
||||||
if d.TaskProgress != nil {
|
if d.TaskProgress != nil {
|
||||||
update.TaskProgress = d.TaskProgress
|
update.TaskProgress = d.TaskProgress
|
||||||
|
} else if d.TotalTokens > 0 {
|
||||||
|
// Derive progress from token count as fallback
|
||||||
|
prog := min(d.TotalTokens/100, 100)
|
||||||
|
update.TaskProgress = &prog
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Task elapsed
|
||||||
if d.TaskElapsed != "" {
|
if d.TaskElapsed != "" {
|
||||||
update.TaskElapsed = &d.TaskElapsed
|
update.TaskElapsed = &d.TaskElapsed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Error message
|
||||||
if d.ErrorMessage != "" {
|
if d.ErrorMessage != "" {
|
||||||
update.ErrorMessage = &d.ErrorMessage
|
update.ErrorMessage = &d.ErrorMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// If session ended, clear task and progress
|
// If session ended (done or empty status), set agent to idle and
|
||||||
|
// clear the current task
|
||||||
if agentStatus == models.AgentStatusIdle {
|
if agentStatus == models.AgentStatusIdle {
|
||||||
emptyTask := ""
|
emptyTask := ""
|
||||||
update.CurrentTask = &emptyTask
|
update.CurrentTask = &emptyTask
|
||||||
@@ -120,7 +149,7 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
|||||||
update.TaskProgress = &zeroProg
|
update.TaskProgress = &zeroProg
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB update first
|
// Update DB first
|
||||||
updated, err := c.agents.Update(ctx, d.AgentID, update)
|
updated, err := c.agents.Update(ctx, d.AgentID, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("sessions.changed: DB update failed",
|
c.logger.Warn("sessions.changed: DB update failed",
|
||||||
@@ -128,23 +157,27 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then SSE broadcast
|
// Then broadcast
|
||||||
c.broker.Broadcast("agent.status", updated)
|
c.broker.Broadcast("agent.status", updated)
|
||||||
if d.TaskProgress != nil || d.CurrentTask != "" {
|
if d.TaskProgress != nil || d.CurrentTask != "" {
|
||||||
c.broker.Broadcast("agent.progress", updated)
|
c.broker.Broadcast("agent.progress", updated)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debug("sessions.changed: agent updated",
|
c.logger.Debug("sessions.changed: agent updated",
|
||||||
"agentId", d.AgentID, "status", string(agentStatus))
|
"agentId", d.AgentID,
|
||||||
|
"status", string(agentStatus))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.logger.Debug("handleSessionsChanged end")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── presence ─────────────────────────────────────────────────────────────
|
// ── presence ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
// handlePresence processes presence events from the gateway. Updates the
|
// handlePresence processes presence events from the gateway. Updates the
|
||||||
// agent's lastActivity and broadcasts status if the connection state changed.
|
// agent's lastActivity timestamp and broadcasts status if the connection
|
||||||
|
// state changed.
|
||||||
func (c *WSClient) handlePresence(payload json.RawMessage) {
|
func (c *WSClient) handlePresence(payload json.RawMessage) {
|
||||||
c.logger.Debug("handlePresence", "payload", string(payload))
|
c.logger.Debug("handlePresence start", "payload", string(payload))
|
||||||
|
|
||||||
var p presencePayload
|
var p presencePayload
|
||||||
if err := json.Unmarshal(payload, &p); err != nil {
|
if err := json.Unmarshal(payload, &p); err != nil {
|
||||||
@@ -153,21 +186,31 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if p.AgentID == "" {
|
if p.AgentID == "" {
|
||||||
|
c.logger.Debug("presence: skipping event with empty agentId")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
// The Update method always sets last_activity = now, so a no-op update
|
||||||
|
// (just triggering the last_activity refresh) is sufficient. We send
|
||||||
|
// an empty-ish update — the repo always bumps last_activity.
|
||||||
|
// If connection state is reported, also update status.
|
||||||
update := models.UpdateAgentRequest{}
|
update := models.UpdateAgentRequest{}
|
||||||
|
|
||||||
// If device disconnected, set agent to idle
|
|
||||||
if p.Connected != nil && !*p.Connected {
|
if p.Connected != nil && !*p.Connected {
|
||||||
|
// Device disconnected — set agent to idle
|
||||||
idle := models.AgentStatusIdle
|
idle := models.AgentStatusIdle
|
||||||
update.Status = &idle
|
update.Status = &idle
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB update first (Update always bumps last_activity)
|
// Pass lastActivityAt from the event so DB and SSE stay consistent
|
||||||
|
if p.LastActivityAt != "" {
|
||||||
|
update.LastActivityAt = &p.LastActivityAt
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update DB first
|
||||||
updated, err := c.agents.Update(ctx, p.AgentID, update)
|
updated, err := c.agents.Update(ctx, p.AgentID, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("presence: DB update failed",
|
c.logger.Warn("presence: DB update failed",
|
||||||
@@ -175,24 +218,21 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.LastActivityAt != "" {
|
// Then broadcast
|
||||||
updated.LastActivity = p.LastActivityAt
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then SSE broadcast
|
|
||||||
c.broker.Broadcast("agent.status", updated)
|
c.broker.Broadcast("agent.status", updated)
|
||||||
|
|
||||||
c.logger.Debug("presence: agent updated",
|
c.logger.Debug("presence: agent updated",
|
||||||
"agentId", p.AgentID, "connected", p.Connected)
|
"agentId", p.AgentID,
|
||||||
|
"connected", p.Connected)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── agent.config ─────────────────────────────────────────────────────────
|
// ── agent.config ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
// handleAgentConfig processes agent.config events from the gateway. Updates
|
// handleAgentConfig processes agent.config events from the gateway. Updates
|
||||||
// agent metadata (channel) in the DB and broadcasts a fleet.update with the
|
// agent metadata (name, channel) in the DB and broadcasts a fleet.update
|
||||||
// full fleet snapshot.
|
// with the full fleet snapshot.
|
||||||
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
||||||
c.logger.Debug("handleAgentConfig", "payload", string(payload))
|
c.logger.Debug("handleAgentConfig start", "payload", string(payload))
|
||||||
|
|
||||||
var cfg agentConfigPayload
|
var cfg agentConfigPayload
|
||||||
if err := json.Unmarshal(payload, &cfg); err != nil {
|
if err := json.Unmarshal(payload, &cfg); err != nil {
|
||||||
@@ -201,19 +241,27 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cfg.ID == "" {
|
if cfg.ID == "" {
|
||||||
|
c.logger.Debug("agent.config: skipping event with empty id")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
// Build partial update with available fields.
|
||||||
update := models.UpdateAgentRequest{}
|
update := models.UpdateAgentRequest{}
|
||||||
|
|
||||||
|
if cfg.Name != "" {
|
||||||
|
update.DisplayName = &cfg.Name
|
||||||
|
}
|
||||||
|
if cfg.Role != "" {
|
||||||
|
update.Role = &cfg.Role
|
||||||
|
}
|
||||||
if cfg.Channel != "" {
|
if cfg.Channel != "" {
|
||||||
update.Channel = &cfg.Channel
|
update.Channel = &cfg.Channel
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB update first
|
// Update DB first
|
||||||
updated, err := c.agents.Update(ctx, cfg.ID, update)
|
updated, err := c.agents.Update(ctx, cfg.ID, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("agent.config: DB update failed",
|
c.logger.Warn("agent.config: DB update failed",
|
||||||
@@ -221,23 +269,19 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply display name/role from config event
|
// Then broadcast fleet snapshot
|
||||||
if cfg.Name != "" {
|
|
||||||
updated.DisplayName = cfg.Name
|
|
||||||
}
|
|
||||||
if cfg.Role != "" {
|
|
||||||
updated.Role = cfg.Role
|
|
||||||
}
|
|
||||||
|
|
||||||
// Broadcast full fleet snapshot so frontend gets updated agent info
|
|
||||||
allAgents, err := c.agents.List(ctx, "")
|
allAgents, err := c.agents.List(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err)
|
c.logger.Warn("agent.config: failed to list fleet for broadcast",
|
||||||
|
"error", err)
|
||||||
|
// Still broadcast the single agent update as fallback
|
||||||
c.broker.Broadcast("agent.status", updated)
|
c.broker.Broadcast("agent.status", updated)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.broker.Broadcast("fleet.update", allAgents)
|
c.broker.Broadcast("fleet.update", allAgents)
|
||||||
|
|
||||||
c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name)
|
c.logger.Debug("agent.config: fleet updated",
|
||||||
|
"agentId", cfg.ID,
|
||||||
|
"name", cfg.Name)
|
||||||
}
|
}
|
||||||
@@ -16,6 +16,8 @@ import (
|
|||||||
// ── RPC response types ───────────────────────────────────────────────────
|
// ── RPC response types ───────────────────────────────────────────────────
|
||||||
|
|
||||||
// agentListItem represents a single agent returned by the agents.list RPC.
|
// agentListItem represents a single agent returned by the agents.list RPC.
|
||||||
|
// Fields are extracted gracefully from json.RawMessage so unknown fields
|
||||||
|
// from the gateway are silently ignored.
|
||||||
type agentListItem struct {
|
type agentListItem struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@@ -40,9 +42,14 @@ type sessionListItem struct {
|
|||||||
// persists them, merges session state into agent cards, and broadcasts
|
// persists them, merges session state into agent cards, and broadcasts
|
||||||
// the merged fleet as a fleet.update event.
|
// the merged fleet as a fleet.update event.
|
||||||
func (c *WSClient) initialSync(ctx context.Context) error {
|
func (c *WSClient) initialSync(ctx context.Context) error {
|
||||||
|
if c.agents == nil {
|
||||||
|
c.logger.Info("initial sync skipped (no repository)")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
c.logger.Info("initial sync starting")
|
c.logger.Info("initial sync starting")
|
||||||
|
|
||||||
// 1. Fetch agents via RPC
|
// 1. Fetch agents
|
||||||
agentsRaw, err := c.Send("agents.list", nil)
|
agentsRaw, err := c.Send("agents.list", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("agents.list RPC: %w", err)
|
return fmt.Errorf("agents.list RPC: %w", err)
|
||||||
@@ -55,7 +62,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
|
|
||||||
c.logger.Info("agents.list received", "count", len(agentItems))
|
c.logger.Info("agents.list received", "count", len(agentItems))
|
||||||
|
|
||||||
// 2. Persist each agent (create if not exists, update if changed)
|
// 2. Persist each agent
|
||||||
for _, item := range agentItems {
|
for _, item := range agentItems {
|
||||||
card := agentItemToCard(item)
|
card := agentItemToCard(item)
|
||||||
|
|
||||||
@@ -70,12 +77,13 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Agent exists — update display name or role if changed
|
// Agent exists — update if display name or role changed
|
||||||
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
|
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
|
||||||
// Update what we can via UpdateAgentRequest
|
newName := card.DisplayName
|
||||||
channel := card.Channel
|
newRole := card.Role
|
||||||
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
||||||
Channel: &channel,
|
DisplayName: &newName,
|
||||||
|
Role: &newRole,
|
||||||
})
|
})
|
||||||
if updateErr != nil {
|
if updateErr != nil {
|
||||||
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
|
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
|
||||||
@@ -83,7 +91,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Fetch sessions via RPC
|
// 3. Fetch sessions
|
||||||
sessionsRaw, err := c.Send("sessions.list", nil)
|
sessionsRaw, err := c.Send("sessions.list", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sessions.list RPC: %w", err)
|
return fmt.Errorf("sessions.list RPC: %w", err)
|
||||||
@@ -96,7 +104,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
|
|
||||||
c.logger.Info("sessions.list received", "count", len(sessionItems))
|
c.logger.Info("sessions.list received", "count", len(sessionItems))
|
||||||
|
|
||||||
// 4. Build agentId → session map for merge
|
// 4. Build a map of agentId → session for merge
|
||||||
sessionByAgent := make(map[string]sessionListItem)
|
sessionByAgent := make(map[string]sessionListItem)
|
||||||
for _, s := range sessionItems {
|
for _, s := range sessionItems {
|
||||||
if s.AgentID != "" {
|
if s.AgentID != "" {
|
||||||
@@ -104,25 +112,26 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Merge session state into agents, update DB, and collect for broadcast
|
// 5. Merge session state into agents and update + broadcast
|
||||||
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
|
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
|
||||||
|
|
||||||
for _, item := range agentItems {
|
for _, item := range agentItems {
|
||||||
card := agentItemToCard(item)
|
card := agentItemToCard(item)
|
||||||
|
|
||||||
if session, ok := sessionByAgent[item.ID]; ok {
|
if session, ok := sessionByAgent[item.ID]; ok {
|
||||||
// Merge session state into agent card
|
// Merge session state
|
||||||
card.SessionKey = session.SessionKey
|
card.SessionKey = session.SessionKey
|
||||||
card.Status = mapSessionStatus(session.Status)
|
card.Status = mapSessionStatus(session.Status)
|
||||||
card.LastActivity = session.LastActivityAt
|
card.LastActivity = session.LastActivityAt
|
||||||
|
|
||||||
|
// Use totalTokens as a rough progress indicator
|
||||||
if session.TotalTokens > 0 {
|
if session.TotalTokens > 0 {
|
||||||
prog := min(session.TotalTokens/100, 100)
|
prog := min(session.TotalTokens/100, 100) // normalize to 0-100
|
||||||
card.TaskProgress = &prog
|
card.TaskProgress = &prog
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persist merged status change
|
// Persist merged state
|
||||||
existing, err := c.agents.Get(ctx, card.ID)
|
existing, err := c.agents.Get(ctx, card.ID)
|
||||||
if err == nil && existing.Status != card.Status {
|
if err == nil && existing.Status != card.Status {
|
||||||
status := card.Status
|
status := card.Status
|
||||||
@@ -146,8 +155,8 @@ func (c *WSClient) initialSync(ctx context.Context) error {
|
|||||||
|
|
||||||
// mapSessionStatus converts a gateway session status string to an AgentStatus.
|
// mapSessionStatus converts a gateway session status string to an AgentStatus.
|
||||||
// - "running" / "streaming" → active
|
// - "running" / "streaming" → active
|
||||||
// - "error" → error
|
// - "error" → error
|
||||||
// - "done" / "" / other → idle
|
// - "done" / "" / other → idle
|
||||||
func mapSessionStatus(status string) models.AgentStatus {
|
func mapSessionStatus(status string) models.AgentStatus {
|
||||||
switch status {
|
switch status {
|
||||||
case "running", "streaming":
|
case "running", "streaming":
|
||||||
@@ -168,7 +177,7 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
|
|||||||
}
|
}
|
||||||
channel := item.Channel
|
channel := item.Channel
|
||||||
if channel == "" {
|
if channel == "" {
|
||||||
channel = "discord"
|
channel = "unknown"
|
||||||
}
|
}
|
||||||
name := item.Name
|
name := item.Name
|
||||||
if name == "" {
|
if name == "" {
|
||||||
@@ -176,12 +185,12 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return models.AgentCardData{
|
return models.AgentCardData{
|
||||||
ID: item.ID,
|
ID: item.ID,
|
||||||
DisplayName: name,
|
DisplayName: name,
|
||||||
Role: role,
|
Role: role,
|
||||||
Status: models.AgentStatusIdle, // default; overridden by session merge
|
Status: models.AgentStatusIdle, // default; will be overridden by session merge
|
||||||
SessionKey: "",
|
SessionKey: "",
|
||||||
Channel: channel,
|
Channel: channel,
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
// Package gateway provides WebSocket client integration with the OpenClaw
|
// Package gateway provides WebSocket client integration with the OpenClaw
|
||||||
// gateway using WS protocol v3. The WSClient handles connection, handshake,
|
// gateway using WS protocol v3. The WSClient handles connection, handshake,
|
||||||
// frame routing, request/response correlation, and automatic reconnection
|
// frame routing, request/response correlation, and automatic reconnection
|
||||||
// with exponential backoff (1s → 30s max).
|
// with exponential backoff.
|
||||||
package gateway
|
package gateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -15,8 +15,8 @@ import (
|
|||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WSConfig holds WebSocket client configuration, typically loaded from
|
// WSConfig holds WebSocket client configuration, typically loaded from
|
||||||
@@ -41,19 +41,21 @@ type eventHandler func(json.RawMessage)
|
|||||||
|
|
||||||
// WSClient connects to the OpenClaw gateway over WebSocket, completes the
|
// WSClient connects to the OpenClaw gateway over WebSocket, completes the
|
||||||
// v3 handshake, routes incoming frames, and automatically reconnects on
|
// v3 handshake, routes incoming frames, and automatically reconnects on
|
||||||
// disconnect with exponential backoff (1s → 30s max).
|
// disconnect with exponential backoff.
|
||||||
type WSClient struct {
|
type WSClient struct {
|
||||||
config WSConfig
|
config WSConfig
|
||||||
conn *websocket.Conn
|
conn *websocket.Conn
|
||||||
connMu sync.Mutex // protects conn for writes
|
connMu sync.Mutex // protects conn for writes
|
||||||
pending map[string]chan<- json.RawMessage
|
pending map[string]chan<- json.RawMessage
|
||||||
mu sync.Mutex // protects pending and handlers
|
mu sync.Mutex // protects pending and handlers
|
||||||
agents repository.AgentRepo
|
agents repository.AgentRepo
|
||||||
broker *handler.Broker
|
broker *handler.Broker
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
handlers map[string][]eventHandler
|
|
||||||
connID string // set after successful hello-ok
|
handlers map[string][]eventHandler
|
||||||
restClient *Client // optional REST client to notify on WS ready
|
connId string // set after successful hello-ok
|
||||||
|
restClient *Client // optional REST client to notify on WS ready
|
||||||
|
wsReadyOnce sync.Once // ensures MarkWSReady close is one-shot
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWSClient returns a WSClient wired to the given repository and broker.
|
// NewWSClient returns a WSClient wired to the given repository and broker.
|
||||||
@@ -79,7 +81,7 @@ func (c *WSClient) SetRESTClient(rest *Client) {
|
|||||||
|
|
||||||
// OnEvent registers a handler for the given event name. Handlers are called
|
// OnEvent registers a handler for the given event name. Handlers are called
|
||||||
// when an incoming frame with type "event" and matching event name is
|
// when an incoming frame with type "event" and matching event name is
|
||||||
// received. Safe to call before Start.
|
// received. This is safe to call before Start.
|
||||||
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
@@ -92,7 +94,7 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
|||||||
type wsFrame struct {
|
type wsFrame struct {
|
||||||
Type string `json:"type"` // "req", "res", "event"
|
Type string `json:"type"` // "req", "res", "event"
|
||||||
ID string `json:"id,omitempty"` // request/response correlation
|
ID string `json:"id,omitempty"` // request/response correlation
|
||||||
Method string `json:"method,omitempty"` // method name (req/res frames)
|
Method string `json:"method,omitempty"` // method name (req frames)
|
||||||
Event string `json:"event,omitempty"` // event name (event frames)
|
Event string `json:"event,omitempty"` // event name (event frames)
|
||||||
Params json.RawMessage `json:"params,omitempty"`
|
Params json.RawMessage `json:"params,omitempty"`
|
||||||
Result json.RawMessage `json:"result,omitempty"`
|
Result json.RawMessage `json:"result,omitempty"`
|
||||||
@@ -128,7 +130,7 @@ type connectAuth struct {
|
|||||||
|
|
||||||
// helloOKResponse represents the expected response to a successful connect.
|
// helloOKResponse represents the expected response to a successful connect.
|
||||||
type helloOKResponse struct {
|
type helloOKResponse struct {
|
||||||
ConnID string `json:"connId"`
|
ConnID string `json:"connId"`
|
||||||
Features struct {
|
Features struct {
|
||||||
Methods []string `json:"methods"`
|
Methods []string `json:"methods"`
|
||||||
Events []string `json:"events"`
|
Events []string `json:"events"`
|
||||||
@@ -138,11 +140,12 @@ type helloOKResponse struct {
|
|||||||
// ── Start loop ───────────────────────────────────────────────────────────
|
// ── Start loop ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
// Start connects to the gateway, completes the handshake, and begins the
|
// Start connects to the gateway, completes the handshake, and begins the
|
||||||
// read loop. On disconnect it reconnects with exponential backoff (1s → 30s).
|
// read loop. On disconnect it reconnects with exponential backoff. On
|
||||||
// On ctx cancellation it performs a clean shutdown.
|
// ctx cancellation it performs a clean shutdown.
|
||||||
func (c *WSClient) Start(ctx context.Context) {
|
func (c *WSClient) Start(ctx context.Context) {
|
||||||
backoff := 1 * time.Second
|
initialBackoff := 1 * time.Second
|
||||||
maxBackoff := 30 * time.Second
|
maxBackoff := 30 * time.Second
|
||||||
|
backoff := initialBackoff
|
||||||
|
|
||||||
for {
|
for {
|
||||||
err := c.connectAndRun(ctx)
|
err := c.connectAndRun(ctx)
|
||||||
@@ -154,6 +157,9 @@ func (c *WSClient) Start(ctx context.Context) {
|
|||||||
c.logger.Warn("ws client disconnected, reconnecting",
|
c.logger.Warn("ws client disconnected, reconnecting",
|
||||||
"error", err,
|
"error", err,
|
||||||
"backoff", backoff)
|
"backoff", backoff)
|
||||||
|
} else {
|
||||||
|
// Reset backoff on successful connect+run completion
|
||||||
|
backoff = initialBackoff
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@@ -188,14 +194,26 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
c.conn = conn
|
c.conn = conn
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
defer conn.Close()
|
// When context is cancelled, close the conn to unblock ReadJSON in readLoop.
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
c.connMu.Lock()
|
||||||
|
if c.conn != nil {
|
||||||
|
c.conn.Close()
|
||||||
|
}
|
||||||
|
c.connMu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
// Step 1: Read the connect.challenge frame
|
// Step 1: Read the connect.challenge frame
|
||||||
if err := c.readChallenge(conn); err != nil {
|
if err := c.readChallenge(conn); err != nil {
|
||||||
return fmt.Errorf("handshake challenge: %w", err)
|
return fmt.Errorf("handshake challenge: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 2: Send connect request and read hello-ok response
|
// Step 2: Send connect request
|
||||||
helloOK, err := c.sendConnect(conn)
|
helloOK, err := c.sendConnect(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("handshake connect: %w", err)
|
return fmt.Errorf("handshake connect: %w", err)
|
||||||
@@ -206,8 +224,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
"methods", helloOK.Features.Methods,
|
"methods", helloOK.Features.Methods,
|
||||||
"events", helloOK.Features.Events)
|
"events", helloOK.Features.Events)
|
||||||
|
|
||||||
|
// Store connId for reference
|
||||||
c.connMu.Lock()
|
c.connMu.Lock()
|
||||||
c.connID = helloOK.ConnID
|
c.connId = helloOK.ConnID
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
// Notify REST client that WS is live so it stands down
|
// Notify REST client that WS is live so it stands down
|
||||||
@@ -216,15 +235,18 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
c.logger.Info("ws client notified REST fallback to stand down")
|
c.logger.Info("ws client notified REST fallback to stand down")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 3: Initial sync — fetch agents + sessions from gateway
|
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
||||||
|
c.wsReadyOnce = sync.Once{}
|
||||||
|
|
||||||
|
// Step 2b: Initial sync — fetch agents + sessions from gateway
|
||||||
if err := c.initialSync(ctx); err != nil {
|
if err := c.initialSync(ctx); err != nil {
|
||||||
c.logger.Warn("initial sync failed, continuing with read loop", "error", err)
|
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 4: Register live event handlers
|
// Step 2c: Register live event handlers
|
||||||
c.registerEventHandlers()
|
c.registerEventHandlers()
|
||||||
|
|
||||||
// Step 5: Read loop — blocks until disconnect or ctx cancel
|
// Step 3: Read loop
|
||||||
return c.readLoop(ctx, conn)
|
return c.readLoop(ctx, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -240,7 +262,7 @@ func (c *WSClient) readChallenge(conn *websocket.Conn) error {
|
|||||||
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
|
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debug("received connect.challenge")
|
c.logger.Debug("received connect.challenge", "params", string(frame.Params))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -293,6 +315,8 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
|
|||||||
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
|
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check for hello-ok method in the result
|
||||||
|
// The gateway responds with method "hello-ok" on success
|
||||||
var helloOK helloOKResponse
|
var helloOK helloOKResponse
|
||||||
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
|
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
|
||||||
return nil, fmt.Errorf("parse hello-ok: %w", err)
|
return nil, fmt.Errorf("parse hello-ok: %w", err)
|
||||||
@@ -302,25 +326,16 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// readLoop continuously reads frames from the connection and routes them.
|
// readLoop continuously reads frames from the connection and routes them.
|
||||||
// It returns on read error or context cancellation.
|
// It returns on read error or when the connection is closed by the ctx-done
|
||||||
|
// goroutine started in connectAndRun.
|
||||||
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
|
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
|
||||||
for {
|
for {
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
// Clean shutdown: send close frame
|
|
||||||
c.connMu.Lock()
|
|
||||||
c.conn.WriteControl(
|
|
||||||
websocket.CloseMessage,
|
|
||||||
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"),
|
|
||||||
time.Now().Add(5*time.Second),
|
|
||||||
)
|
|
||||||
c.connMu.Unlock()
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
var frame wsFrame
|
var frame wsFrame
|
||||||
if err := conn.ReadJSON(&frame); err != nil {
|
if err := conn.ReadJSON(&frame); err != nil {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
// Check if it's a close error
|
||||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
||||||
c.logger.Info("ws connection closed by server")
|
c.logger.Info("ws connection closed by server")
|
||||||
return nil
|
return nil
|
||||||
@@ -344,7 +359,7 @@ func (c *WSClient) routeFrame(frame wsFrame) {
|
|||||||
case "event":
|
case "event":
|
||||||
c.handleEvent(frame)
|
c.handleEvent(frame)
|
||||||
default:
|
default:
|
||||||
c.logger.Debug("unknown frame type", "type", frame.Type, "id", frame.ID)
|
c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -363,6 +378,7 @@ func (c *WSClient) handleResponse(frame wsFrame) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if frame.Error != nil {
|
if frame.Error != nil {
|
||||||
|
// Send nil to signal error; caller checks via Send return
|
||||||
ch <- nil
|
ch <- nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -386,20 +402,17 @@ func (c *WSClient) handleEvent(frame wsFrame) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Send (RPC) ──────────────────────────────────────────────────────────
|
// ── Send ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
// Send sends a JSON-RPC request to the gateway and returns the response
|
// Send sends a JSON request to the gateway and returns the response payload.
|
||||||
// payload. It is safe for concurrent use.
|
// It is safe for concurrent use. Returns an error if the client is not
|
||||||
|
// connected.
|
||||||
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
||||||
reqID := uuid.New().String()
|
reqID := uuid.New().String()
|
||||||
|
|
||||||
var paramsJSON json.RawMessage
|
paramsJSON, err := json.Marshal(params)
|
||||||
if params != nil {
|
if err != nil {
|
||||||
var err error
|
return nil, fmt.Errorf("marshal params: %w", err)
|
||||||
paramsJSON, err = json.Marshal(params)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshal params: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register pending response channel
|
// Register pending response channel
|
||||||
@@ -423,7 +436,11 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c.connMu.Lock()
|
c.connMu.Lock()
|
||||||
err := c.conn.WriteJSON(frame)
|
if c.conn == nil {
|
||||||
|
c.connMu.Unlock()
|
||||||
|
return nil, fmt.Errorf("gateway: not connected")
|
||||||
|
}
|
||||||
|
err = c.conn.WriteJSON(frame)
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -434,10 +451,10 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
|||||||
select {
|
select {
|
||||||
case resp := <-respCh:
|
case resp := <-respCh:
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
return nil, fmt.Errorf("gateway returned error for request %s (%s)", reqID, method)
|
return nil, fmt.Errorf("gateway returned error for request %s", reqID)
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
case <-time.After(30 * time.Second):
|
case <-time.After(30 * time.Second):
|
||||||
return nil, fmt.Errorf("request %s (%s) timed out", reqID, method)
|
return nil, fmt.Errorf("request %s timed out", reqID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
484
go-backend/internal/gateway/wsclient_test.go
Normal file
484
go-backend/internal/gateway/wsclient_test.go
Normal file
@@ -0,0 +1,484 @@
|
|||||||
|
package gateway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ── Mock WebSocket server helper ─────────────────────────────────────────
|
||||||
|
|
||||||
|
// newTestWSServer creates an httptest.Server that upgrades to WebSocket and
|
||||||
|
// delegates each connection to handler. The server URL can be converted to
|
||||||
|
// a ws:// URL by replacing "http" with "ws".
|
||||||
|
func newTestWSServer(t *testing.T, handler func(conn *websocket.Conn)) *httptest.Server {
|
||||||
|
t.Helper()
|
||||||
|
upgrader := websocket.Upgrader{
|
||||||
|
CheckOrigin: func(r *http.Request) bool { return true },
|
||||||
|
}
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
handler(conn)
|
||||||
|
}))
|
||||||
|
return srv
|
||||||
|
}
|
||||||
|
|
||||||
|
// wsURL converts an httptest.Server http URL to a ws URL.
|
||||||
|
func wsURL(srv *httptest.Server) string {
|
||||||
|
return "ws" + strings.TrimPrefix(srv.URL, "http")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Handshake helper for mock server ─────────────────────────────────────
|
||||||
|
|
||||||
|
// handleHandshake performs the server side of the v3 handshake:
|
||||||
|
// 1. Send connect.challenge
|
||||||
|
// 2. Read connect request
|
||||||
|
// 3. Send hello-ok response
|
||||||
|
//
|
||||||
|
// Returns the connect request frame for inspection.
|
||||||
|
func handleHandshake(t *testing.T, conn *websocket.Conn) map[string]any {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
// 1. Send connect.challenge
|
||||||
|
challenge := map[string]any{
|
||||||
|
"type": "event",
|
||||||
|
"event": "connect.challenge",
|
||||||
|
"params": map[string]any{"nonce": "test-nonce", "ts": 1716180000000},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(challenge); err != nil {
|
||||||
|
t.Fatalf("server: write challenge: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Read connect request
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
t.Fatalf("server: read connect request: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if req["method"] != "connect" {
|
||||||
|
t.Fatalf("server: expected method=connect, got %v", req["method"])
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Send hello-ok response
|
||||||
|
// Note: helloOKResponse expects ConnID at the top level of the result,
|
||||||
|
// matching the WSClient's JSON struct tags.
|
||||||
|
result := map[string]any{
|
||||||
|
"type": "hello-ok",
|
||||||
|
"protocol": 3,
|
||||||
|
"connId": "test-conn-123",
|
||||||
|
"features": map[string]any{"methods": []string{}, "events": []string{}},
|
||||||
|
"auth": map[string]any{"role": "operator", "scopes": []string{"operator.read"}},
|
||||||
|
}
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": req["id"],
|
||||||
|
"ok": true,
|
||||||
|
"result": result,
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
t.Fatalf("server: write hello-ok: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
// keepAlive reads frames from the connection until an error occurs
|
||||||
|
// (e.g., the client disconnects). Used as the default "do nothing"
|
||||||
|
// server loop after handshake.
|
||||||
|
func keepAlive(conn *websocket.Conn) {
|
||||||
|
for {
|
||||||
|
var m map[string]any
|
||||||
|
if err := conn.ReadJSON(&m); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 1. Test: Full handshake ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
func TestWSClient_Handshake(t *testing.T) {
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
keepAlive(conn)
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
client.Start(ctx)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait briefly for handshake to complete
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Verify connId was set
|
||||||
|
client.connMu.Lock()
|
||||||
|
connID := client.connId
|
||||||
|
client.connMu.Unlock()
|
||||||
|
|
||||||
|
if connID != "test-conn-123" {
|
||||||
|
t.Errorf("expected connId 'test-conn-123', got %q", connID)
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// Client exited cleanly
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("WSClient did not shut down after context cancellation")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 2. Test: Send() with response matching ───────────────────────────────
|
||||||
|
|
||||||
|
func TestWSClient_Send(t *testing.T) {
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
|
||||||
|
// Read RPC requests and respond to each
|
||||||
|
for {
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
reqID, _ := req["id"].(string)
|
||||||
|
method, _ := req["method"].(string)
|
||||||
|
|
||||||
|
var result any
|
||||||
|
switch method {
|
||||||
|
case "agents.list":
|
||||||
|
result = map[string]any{
|
||||||
|
"agents": []map[string]any{
|
||||||
|
{"id": "otto", "name": "Otto"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
result = map[string]any{}
|
||||||
|
}
|
||||||
|
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": result,
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go client.Start(ctx)
|
||||||
|
|
||||||
|
// Give the client time to complete handshake
|
||||||
|
time.Sleep(300 * time.Millisecond)
|
||||||
|
|
||||||
|
resp, err := client.Send("agents.list", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Send() returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the response payload
|
||||||
|
var result map[string]any
|
||||||
|
if err := json.Unmarshal(resp, &result); err != nil {
|
||||||
|
t.Fatalf("unmarshal response: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
agents, ok := result["agents"].([]any)
|
||||||
|
if !ok || len(agents) != 1 {
|
||||||
|
t.Errorf("expected 1 agent in response, got %v", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 3. Test: Event handler routing ───────────────────────────────────────
|
||||||
|
|
||||||
|
func TestWSClient_EventRouting(t *testing.T) {
|
||||||
|
eventReceived := make(chan json.RawMessage, 1)
|
||||||
|
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
|
||||||
|
// After handshake, send a test event
|
||||||
|
evt := map[string]any{
|
||||||
|
"type": "event",
|
||||||
|
"event": "test.event",
|
||||||
|
"params": map[string]any{"greeting": "hello from server"},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(evt); err != nil {
|
||||||
|
t.Logf("server: write event: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
keepAlive(conn)
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
||||||
|
|
||||||
|
// Register event handler BEFORE starting the client
|
||||||
|
client.OnEvent("test.event", func(payload json.RawMessage) {
|
||||||
|
eventReceived <- payload
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go client.Start(ctx)
|
||||||
|
|
||||||
|
// Wait for the event handler to fire
|
||||||
|
select {
|
||||||
|
case payload := <-eventReceived:
|
||||||
|
var data map[string]any
|
||||||
|
if err := json.Unmarshal(payload, &data); err != nil {
|
||||||
|
t.Fatalf("unmarshal event payload: %v", err)
|
||||||
|
}
|
||||||
|
if greeting, _ := data["greeting"].(string); greeting != "hello from server" {
|
||||||
|
t.Errorf("expected greeting 'hello from server', got %q", greeting)
|
||||||
|
}
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("timed out waiting for event handler to fire")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 4. Test: Concurrent Send ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
func TestWSClient_ConcurrentSend(t *testing.T) {
|
||||||
|
var reqCount atomic.Int32
|
||||||
|
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
|
||||||
|
// Read RPC requests and respond to each
|
||||||
|
for {
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
reqID, _ := req["id"].(string)
|
||||||
|
n := reqCount.Add(1)
|
||||||
|
|
||||||
|
res := map[string]any{
|
||||||
|
"type": "res",
|
||||||
|
"id": reqID,
|
||||||
|
"ok": true,
|
||||||
|
"result": map[string]any{"index": n, "method": req["method"]},
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(res); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go client.Start(ctx)
|
||||||
|
|
||||||
|
// Give the client time to complete handshake
|
||||||
|
time.Sleep(300 * time.Millisecond)
|
||||||
|
|
||||||
|
// Fire 3 concurrent Send() calls
|
||||||
|
type sendResult struct {
|
||||||
|
method string
|
||||||
|
payload json.RawMessage
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
results := make(chan sendResult, 3)
|
||||||
|
|
||||||
|
methods := []string{"agents.list", "sessions.list", "agents.config"}
|
||||||
|
for _, method := range methods {
|
||||||
|
go func(m string) {
|
||||||
|
resp, err := client.Send(m, nil)
|
||||||
|
results <- sendResult{method: m, payload: resp, err: err}
|
||||||
|
}(method)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect all results
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
select {
|
||||||
|
case r := <-results:
|
||||||
|
if r.err != nil {
|
||||||
|
t.Errorf("Send(%q) returned error: %v", r.method, r.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var result map[string]any
|
||||||
|
if err := json.Unmarshal(r.payload, &result); err != nil {
|
||||||
|
t.Errorf("Send(%q) unmarshal error: %v", r.method, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
gotMethod, _ := result["method"].(string)
|
||||||
|
if gotMethod != r.method {
|
||||||
|
t.Errorf("Send(%q) got response for %q (mismatched)", r.method, gotMethod)
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("timed out waiting for concurrent Send results")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 5. Test: Clean shutdown ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
func TestWSClient_CleanShutdown(t *testing.T) {
|
||||||
|
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||||
|
handleHandshake(t, conn)
|
||||||
|
keepAlive(conn)
|
||||||
|
})
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
client.Start(ctx)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Let the client connect and complete handshake
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Cancel context — should trigger clean shutdown
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// Client exited cleanly — pass
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("WSClient did not shut down cleanly within timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Pure utility tests (from CUB-205) ─────────────────────────────────────
|
||||||
|
|
||||||
|
func TestMapSessionStatus(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
expected models.AgentStatus
|
||||||
|
}{
|
||||||
|
{"running", models.AgentStatusActive},
|
||||||
|
{"streaming", models.AgentStatusActive},
|
||||||
|
{"done", models.AgentStatusIdle},
|
||||||
|
{"error", models.AgentStatusError},
|
||||||
|
{"", models.AgentStatusIdle},
|
||||||
|
{"garbage", models.AgentStatusIdle},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
result := mapSessionStatus(tt.input)
|
||||||
|
if result != tt.expected {
|
||||||
|
t.Errorf("mapSessionStatus(%q) = %q, want %q", tt.input, result, tt.expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAgentItemToCard(t *testing.T) {
|
||||||
|
t.Run("full fields", func(t *testing.T) {
|
||||||
|
item := agentListItem{
|
||||||
|
ID: "dex",
|
||||||
|
Name: "Dex",
|
||||||
|
Role: "backend",
|
||||||
|
Channel: "telegram",
|
||||||
|
}
|
||||||
|
card := agentItemToCard(item)
|
||||||
|
if card.ID != "dex" {
|
||||||
|
t.Errorf("ID = %q, want %q", card.ID, "dex")
|
||||||
|
}
|
||||||
|
if card.DisplayName != "Dex" {
|
||||||
|
t.Errorf("DisplayName = %q, want %q", card.DisplayName, "Dex")
|
||||||
|
}
|
||||||
|
if card.Role != "backend" {
|
||||||
|
t.Errorf("Role = %q, want %q", card.Role, "backend")
|
||||||
|
}
|
||||||
|
if card.Channel != "telegram" {
|
||||||
|
t.Errorf("Channel = %q, want %q", card.Channel, "telegram")
|
||||||
|
}
|
||||||
|
if card.Status != models.AgentStatusIdle {
|
||||||
|
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("empty fields use defaults", func(t *testing.T) {
|
||||||
|
item := agentListItem{
|
||||||
|
ID: "otto",
|
||||||
|
}
|
||||||
|
card := agentItemToCard(item)
|
||||||
|
if card.ID != "otto" {
|
||||||
|
t.Errorf("ID = %q, want %q", card.ID, "otto")
|
||||||
|
}
|
||||||
|
if card.DisplayName != "otto" {
|
||||||
|
t.Errorf("DisplayName = %q, want %q (should fallback to ID)", card.DisplayName, "otto")
|
||||||
|
}
|
||||||
|
if card.Role != "agent" {
|
||||||
|
t.Errorf("Role = %q, want %q (default)", card.Role, "agent")
|
||||||
|
}
|
||||||
|
if card.Channel != "unknown" {
|
||||||
|
t.Errorf("Channel = %q, want %q (per Grimm requirement)", card.Channel, "unknown")
|
||||||
|
}
|
||||||
|
if card.Status != models.AgentStatusIdle {
|
||||||
|
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("empty name falls back to ID", func(t *testing.T) {
|
||||||
|
item := agentListItem{
|
||||||
|
ID: "hex",
|
||||||
|
Name: "",
|
||||||
|
Role: "database",
|
||||||
|
}
|
||||||
|
card := agentItemToCard(item)
|
||||||
|
if card.DisplayName != "hex" {
|
||||||
|
t.Errorf("DisplayName = %q, want %q (ID fallback)", card.DisplayName, "hex")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStrPtr(t *testing.T) {
|
||||||
|
s := "hello"
|
||||||
|
p := strPtr(s)
|
||||||
|
if p == nil {
|
||||||
|
t.Fatal("strPtr returned nil")
|
||||||
|
}
|
||||||
|
if *p != s {
|
||||||
|
t.Errorf("strPtr(%q) = %q, want %q", s, *p, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
empty := ""
|
||||||
|
ep := strPtr(empty)
|
||||||
|
if *ep != empty {
|
||||||
|
t.Errorf("strPtr(empty) = %q, want %q", *ep, empty)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -63,12 +63,15 @@ type CreateAgentRequest struct {
|
|||||||
|
|
||||||
// UpdateAgentRequest is the payload for PUT /api/agents/{id}.
|
// UpdateAgentRequest is the payload for PUT /api/agents/{id}.
|
||||||
type UpdateAgentRequest struct {
|
type UpdateAgentRequest struct {
|
||||||
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
|
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
|
||||||
CurrentTask *string `json:"currentTask,omitempty"`
|
DisplayName *string `json:"displayName,omitempty"`
|
||||||
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
|
Role *string `json:"role,omitempty"`
|
||||||
TaskElapsed *string `json:"taskElapsed,omitempty"`
|
LastActivityAt *string `json:"lastActivityAt,omitempty"`
|
||||||
Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"`
|
CurrentTask *string `json:"currentTask,omitempty"`
|
||||||
ErrorMessage *string `json:"errorMessage,omitempty"`
|
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
|
||||||
|
TaskElapsed *string `json:"taskElapsed,omitempty"`
|
||||||
|
Channel *string `json:"channel,omitempty" validate:"omitempty,min=1,max=32"`
|
||||||
|
ErrorMessage *string `json:"errorMessage,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// AgentStatusHistoryEntry represents a point-in-time status change for an agent.
|
// AgentStatusHistoryEntry represents a point-in-time status change for an agent.
|
||||||
|
|||||||
46
reference/CONTROL_CENTER_CONTEXT.md
Normal file
46
reference/CONTROL_CENTER_CONTEXT.md
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
# Control Center — Architecture Context
|
||||||
|
|
||||||
|
## Current State
|
||||||
|
|
||||||
|
The Control Center backend uses a **dual-path gateway client** architecture:
|
||||||
|
|
||||||
|
- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff.
|
||||||
|
- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup.
|
||||||
|
|
||||||
|
## Live Gateway Connection
|
||||||
|
|
||||||
|
### Startup Sequence
|
||||||
|
1. Both WS client and REST client start concurrently
|
||||||
|
2. REST client waits 30s for WS readiness signal (`wsReady` channel)
|
||||||
|
3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down")
|
||||||
|
4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling")
|
||||||
|
5. If no WS client configured → REST client polls immediately
|
||||||
|
|
||||||
|
### WebSocket Client (Primary)
|
||||||
|
- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN`
|
||||||
|
- Protocol: v3 handshake (challenge → connect → hello-ok)
|
||||||
|
- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update`
|
||||||
|
- Live events: `sessions.changed`, `presence`, `agent.config`
|
||||||
|
- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max)
|
||||||
|
|
||||||
|
### REST Poller (Fallback)
|
||||||
|
- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s)
|
||||||
|
- Only used when WS is unavailable
|
||||||
|
- Polls the `/api/agents` endpoint and syncs agent status changes
|
||||||
|
|
||||||
|
### Wiring
|
||||||
|
```
|
||||||
|
main.go
|
||||||
|
├── wsClient = NewWSClient(...)
|
||||||
|
├── restClient = NewClient(...)
|
||||||
|
├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready
|
||||||
|
├── restClient.SetWSClient(wsClient) // REST defers to WS
|
||||||
|
├── go wsClient.Start(ctx) // primary
|
||||||
|
└── go restClient.Start(ctx) // fallback (waits for WS)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Key Design Decisions
|
||||||
|
- **Push over poll**: WS is preferred for real-time updates; REST is a safety net
|
||||||
|
- **DB first, then SSE**: All event handlers persist to DB before broadcasting
|
||||||
|
- **Graceful degradation**: System works without WS; REST provides basic functionality
|
||||||
|
- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called
|
||||||
Reference in New Issue
Block a user