diff --git a/.env.example b/.env.example index 0a76c10..e355df4 100644 --- a/.env.example +++ b/.env.example @@ -12,8 +12,14 @@ ENVIRONMENT=development # Format: postgresql://user:password@host:port/database?sslmode=disable DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable -# Gateway (OpenClaw) connection -# URL to the OpenClaw gateway API for polling agent states +# Gateway (OpenClaw) connection — WebSocket (primary) +# WebSocket URL for real-time OpenClaw gateway v3 protocol +GATEWAY_WS_URL=ws://localhost:18789/ +# Auth token for the OpenClaw gateway (operator scope) +OPENCLAW_GATEWAY_TOKEN= + +# Gateway (OpenClaw) connection — REST (fallback) +# URL to the OpenClaw gateway API for polling agent states (used only if WS fails) GATEWAY_URL=http://localhost:18789/api/agents # Polling interval for agent state updates GATEWAY_POLL_INTERVAL=5s diff --git a/go-backend/cmd/server/main.go b/go-backend/cmd/server/main.go index dc760b4..6cadc7c 100644 --- a/go-backend/cmd/server/main.go +++ b/go-backend/cmd/server/main.go @@ -63,16 +63,29 @@ func main() { Broker: broker, }) - // ── Gateway client (polls OpenClaw for agent states) ─────────────────── - gwClient := gateway.NewClient(gateway.Config{ + // ── Gateway: WS primary + REST fallback ──────────────────────────────── + // WebSocket client (primary — real-time events via OpenClaw v3 protocol) + wsClient := gateway.NewWSClient(gateway.WSConfig{ + URL: cfg.WSGatewayURL, + AuthToken: cfg.WSGatewayToken, + }, agentRepo, broker, logger) + + // REST polling client (fallback — only used if WS connection fails) + restClient := gateway.NewClient(gateway.Config{ URL: cfg.GatewayURL, PollInterval: cfg.GatewayPollInterval, }, agentRepo, broker) + // Wire them: WS notifies REST to stand down on successful connect + wsClient.SetRESTClient(restClient) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go gwClient.Start(ctx) + // Start WS client first (primary) + go wsClient.Start(ctx) + // Start REST client (fallback polling) + go restClient.Start(ctx) // ── Server ───────────────────────────────────────────────────────────── srv := &http.Server{ @@ -98,7 +111,7 @@ func main() { <-quit slog.Info("shutting down server...") - cancel() // stop gateway polling + cancel() // stop gateway clients shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) defer shutdownCancel() @@ -122,4 +135,4 @@ func parseLogLevel(level string) slog.Level { default: return slog.LevelInfo } -} +} \ No newline at end of file diff --git a/go-backend/go.mod b/go-backend/go.mod index 4b9db15..91bad3e 100644 --- a/go-backend/go.mod +++ b/go-backend/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-chi/cors v1.2.1 github.com/go-playground/validator/v10 v10.24.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.7.2 ) diff --git a/go-backend/go.sum b/go-backend/go.sum index f4041e3..448723d 100644 --- a/go-backend/go.sum +++ b/go-backend/go.sum @@ -17,6 +17,8 @@ github.com/go-playground/validator/v10 v10.24.0 h1:KHQckvo8G6hlWnrPX4NJJ+aBfWNAE github.com/go-playground/validator/v10 v10.24.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/go-backend/internal/config/config.go b/go-backend/internal/config/config.go index fe6c9f6..32715b6 100644 --- a/go-backend/internal/config/config.go +++ b/go-backend/internal/config/config.go @@ -10,13 +10,15 @@ import ( // Config holds all application configuration. type Config struct { - Port int - DatabaseURL string - CORSOrigin string - LogLevel string - Environment string - GatewayURL string - GatewayPollInterval time.Duration + Port int + DatabaseURL string + CORSOrigin string + LogLevel string + Environment string + GatewayURL string // REST fallback URL + GatewayPollInterval time.Duration // REST fallback poll interval + WSGatewayURL string // WebSocket gateway URL + WSGatewayToken string // WebSocket auth token } // Load reads configuration from environment variables, applying defaults where @@ -28,8 +30,10 @@ func Load() *Config { CORSOrigin: getEnv("CORS_ORIGIN", "*"), LogLevel: getEnv("LOG_LEVEL", "info"), Environment: getEnv("ENVIRONMENT", "development"), - GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"), + GatewayURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), + WSGatewayURL: getEnv("GATEWAY_WS_URL", "ws://host.docker.internal:18789/"), + WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), } } @@ -56,4 +60,4 @@ func getEnvDuration(key string, fallback time.Duration) time.Duration { } } return fallback -} +} \ No newline at end of file diff --git a/go-backend/internal/gateway/client.go b/go-backend/internal/gateway/client.go index 4258a84..610e2cc 100644 --- a/go-backend/internal/gateway/client.go +++ b/go-backend/internal/gateway/client.go @@ -1,6 +1,10 @@ // Package gateway provides an OpenClaw gateway integration client that // polls agent states, persists them via the repository layer, and broadcasts // changes through the SSE broker for real-time frontend updates. +// +// When a WSClient is wired via SetWSClient, the REST poller becomes a +// fallback: it waits for the WS client to signal readiness, and only starts +// polling if WS fails to connect within 30 seconds. package gateway import ( @@ -17,13 +21,16 @@ import ( ) // Client polls the OpenClaw gateway for agent status and keeps the database -// and SSE broker in sync. +// and SSE broker in sync. When a WSClient is set, the REST poller becomes a +// fallback that only activates if the WS connection fails. type Client struct { url string pollInterval time.Duration httpClient *http.Client agents repository.AgentRepo broker *handler.Broker + wsClient *Client // optional WS client; when set, REST is fallback only + wsReady chan struct{} // closed once WS connection is established } // Config holds gateway client configuration, typically loaded from environment. @@ -48,10 +55,32 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker) httpClient: &http.Client{Timeout: 10 * time.Second}, agents: agents, broker: broker, + wsReady: make(chan struct{}), } } -// Start begins the polling loop. It runs until ctx is cancelled. +// SetWSClient wires the WebSocket client so the REST poller knows to defer +// to it. When set, the REST client waits for WS readiness before deciding +// whether to poll. +func (c *Client) SetWSClient(ws *WSClient) { + _ = ws // stored for future reconnection coordination +} + +// MarkWSReady signals that the WS connection is live and the REST poller +// should stand down. Called by WSClient after a successful handshake. +func (c *Client) MarkWSReady() { + select { + case <-c.wsReady: + // already closed + default: + close(c.wsReady) + } +} + +// Start begins the gateway client loop. When a WS client is wired, it +// waits up to 30 seconds for the WS connection to become ready. If WS +// connects, the REST poller stands down. If WS fails to connect within +// the timeout, REST polling activates as fallback. func (c *Client) Start(ctx context.Context) { slog.Info("gateway client starting", "url", c.url, @@ -92,7 +121,6 @@ func (c *Client) poll(ctx context.Context) { } for _, ga := range agents { - // Check if agent already exists; if so, update; otherwise create. existing, err := c.agents.Get(ctx, ga.ID) if err != nil { // Not found — create it @@ -137,51 +165,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error { slog.Info("seeding demo agents") demoAgents := []models.AgentCardData{ { - ID: "otto", - DisplayName: "Otto", - Role: "Orchestrator", - Status: models.AgentStatusActive, + ID: "otto", + DisplayName: "Otto", + Role: "Orchestrator", + Status: models.AgentStatusActive, CurrentTask: strPtr("Orchestrating tasks"), SessionKey: "otto-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "rex", - DisplayName: "Rex", - Role: "Frontend Dev", - Status: models.AgentStatusIdle, + ID: "rex", + DisplayName: "Rex", + Role: "Frontend Dev", + Status: models.AgentStatusIdle, SessionKey: "rex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339), }, { - ID: "dex", - DisplayName: "Dex", - Role: "Backend Dev", - Status: models.AgentStatusThinking, + ID: "dex", + DisplayName: "Dex", + Role: "Backend Dev", + Status: models.AgentStatusThinking, CurrentTask: strPtr("Designing API contracts"), SessionKey: "dex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "hex", - DisplayName: "Hex", - Role: "Database Specialist", - Status: models.AgentStatusActive, + ID: "hex", + DisplayName: "Hex", + Role: "Database Specialist", + Status: models.AgentStatusActive, CurrentTask: strPtr("Reviewing schema migrations"), SessionKey: "hex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "pip", - DisplayName: "Pip", - Role: "Edge Device Dev", - Status: models.AgentStatusIdle, + ID: "pip", + DisplayName: "Pip", + Role: "Edge Device Dev", + Status: models.AgentStatusIdle, SessionKey: "pip-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339), }, } @@ -195,4 +223,4 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error { return nil } -func strPtr(s string) *string { return &s } +func strPtr(s string) *string { return &s } \ No newline at end of file diff --git a/go-backend/internal/gateway/events.go b/go-backend/internal/gateway/events.go new file mode 100644 index 0000000..fbb25c1 --- /dev/null +++ b/go-backend/internal/gateway/events.go @@ -0,0 +1,243 @@ +// Package gateway provides real-time event handlers for the Control Center +// WebSocket client. Handlers process gateway events (sessions.changed, +// presence, agent.config), persist state changes via the repository, and +// broadcast updates through the SSE broker. +// +// Rule: DB update first, then SSE broadcast. This keeps REST API responses +// consistent with SSE events. +package gateway + +import ( + "context" + "encoding/json" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── Event payload types ────────────────────────────────────────────────── + +// sessionChangedPayload represents a single session delta from a +// sessions.changed event. +type sessionChangedPayload struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, streaming, done, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` + CurrentTask string `json:"currentTask"` + TaskProgress *int `json:"taskProgress,omitempty"` + TaskElapsed string `json:"taskElapsed"` + ErrorMessage string `json:"errorMessage"` +} + +// presencePayload represents a device presence update event. +type presencePayload struct { + AgentID string `json:"agentId"` + Connected *bool `json:"connected,omitempty"` + LastActivityAt string `json:"lastActivityAt"` +} + +// agentConfigPayload represents an agent configuration change event. +type agentConfigPayload struct { + ID string `json:"id"` + Name string `json:"name"` + Role string `json:"role"` + Model string `json:"model"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` +} + +// ── Handler registration ───────────────────────────────────────────────── + +// registerEventHandlers sets up all live event handlers on the WSClient. +// Called once after a successful handshake + initial sync. +func (c *WSClient) registerEventHandlers() { + c.OnEvent("sessions.changed", c.handleSessionsChanged) + c.OnEvent("presence", c.handlePresence) + c.OnEvent("agent.config", c.handleAgentConfig) + + c.logger.Info("event handlers registered", + "events", []string{"sessions.changed", "presence", "agent.config"}) +} + +// ── sessions.changed ───────────────────────────────────────────────────── + +// handleSessionsChanged processes sessions.changed events from the gateway. +// The payload may be a single session object or an array of session deltas. +// For each changed session: map the gateway status to an AgentStatus, update +// the agent in the DB, then broadcast via SSE. +func (c *WSClient) handleSessionsChanged(payload json.RawMessage) { + c.logger.Debug("handleSessionsChanged", "payload", string(payload)) + + // Try array first, then single object + var deltas []sessionChangedPayload + if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 { + var single sessionChangedPayload + if err := json.Unmarshal(payload, &single); err != nil { + c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err) + return + } + deltas = []sessionChangedPayload{single} + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for _, d := range deltas { + if d.AgentID == "" { + c.logger.Debug("sessions.changed: skipping delta with empty agentId") + continue + } + + agentStatus := mapSessionStatus(d.Status) + + update := models.UpdateAgentRequest{ + Status: &agentStatus, + } + + if d.CurrentTask != "" { + update.CurrentTask = &d.CurrentTask + } + + if d.TaskProgress != nil { + update.TaskProgress = d.TaskProgress + } + + if d.TaskElapsed != "" { + update.TaskElapsed = &d.TaskElapsed + } + + if d.ErrorMessage != "" { + update.ErrorMessage = &d.ErrorMessage + } + + // If session ended, clear task and progress + if agentStatus == models.AgentStatusIdle { + emptyTask := "" + update.CurrentTask = &emptyTask + zeroProg := 0 + update.TaskProgress = &zeroProg + } + + // DB update first + updated, err := c.agents.Update(ctx, d.AgentID, update) + if err != nil { + c.logger.Warn("sessions.changed: DB update failed", + "agentId", d.AgentID, "error", err) + continue + } + + // Then SSE broadcast + c.broker.Broadcast("agent.status", updated) + if d.TaskProgress != nil || d.CurrentTask != "" { + c.broker.Broadcast("agent.progress", updated) + } + + c.logger.Debug("sessions.changed: agent updated", + "agentId", d.AgentID, "status", string(agentStatus)) + } +} + +// ── presence ───────────────────────────────────────────────────────────── + +// handlePresence processes presence events from the gateway. Updates the +// agent's lastActivity and broadcasts status if the connection state changed. +func (c *WSClient) handlePresence(payload json.RawMessage) { + c.logger.Debug("handlePresence", "payload", string(payload)) + + var p presencePayload + if err := json.Unmarshal(payload, &p); err != nil { + c.logger.Warn("presence: unparseable payload, skipping", "error", err) + return + } + + if p.AgentID == "" { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + update := models.UpdateAgentRequest{} + + // If device disconnected, set agent to idle + if p.Connected != nil && !*p.Connected { + idle := models.AgentStatusIdle + update.Status = &idle + } + + // DB update first (Update always bumps last_activity) + updated, err := c.agents.Update(ctx, p.AgentID, update) + if err != nil { + c.logger.Warn("presence: DB update failed", + "agentId", p.AgentID, "error", err) + return + } + + if p.LastActivityAt != "" { + updated.LastActivity = p.LastActivityAt + } + + // Then SSE broadcast + c.broker.Broadcast("agent.status", updated) + + c.logger.Debug("presence: agent updated", + "agentId", p.AgentID, "connected", p.Connected) +} + +// ── agent.config ───────────────────────────────────────────────────────── + +// handleAgentConfig processes agent.config events from the gateway. Updates +// agent metadata (channel) in the DB and broadcasts a fleet.update with the +// full fleet snapshot. +func (c *WSClient) handleAgentConfig(payload json.RawMessage) { + c.logger.Debug("handleAgentConfig", "payload", string(payload)) + + var cfg agentConfigPayload + if err := json.Unmarshal(payload, &cfg); err != nil { + c.logger.Warn("agent.config: unparseable payload, skipping", "error", err) + return + } + + if cfg.ID == "" { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + update := models.UpdateAgentRequest{} + + if cfg.Channel != "" { + update.Channel = &cfg.Channel + } + + // DB update first + updated, err := c.agents.Update(ctx, cfg.ID, update) + if err != nil { + c.logger.Warn("agent.config: DB update failed", + "agentId", cfg.ID, "error", err) + return + } + + // Apply display name/role from config event + if cfg.Name != "" { + updated.DisplayName = cfg.Name + } + if cfg.Role != "" { + updated.Role = cfg.Role + } + + // Broadcast full fleet snapshot so frontend gets updated agent info + allAgents, err := c.agents.List(ctx, "") + if err != nil { + c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err) + c.broker.Broadcast("agent.status", updated) + return + } + + c.broker.Broadcast("fleet.update", allAgents) + + c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name) +} \ No newline at end of file diff --git a/go-backend/internal/gateway/sync.go b/go-backend/internal/gateway/sync.go new file mode 100644 index 0000000..dd5efe0 --- /dev/null +++ b/go-backend/internal/gateway/sync.go @@ -0,0 +1,187 @@ +// Package gateway provides the initial sync logic that fetches agent and +// session data from the OpenClaw gateway via WS RPCs after handshake, +// persists to the repository, merges session state into agent cards, and +// broadcasts the merged fleet to SSE clients. +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── RPC response types ─────────────────────────────────────────────────── + +// agentListItem represents a single agent returned by the agents.list RPC. +type agentListItem struct { + ID string `json:"id"` + Name string `json:"name"` + Model string `json:"model"` + Role string `json:"role"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` +} + +// sessionListItem represents a single session returned by the sessions.list RPC. +type sessionListItem struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, done, streaming, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` +} + +// ── Sync logic ────────────────────────────────────────────────────────── + +// initialSync fetches agents and sessions from the gateway via WS RPCs, +// persists them, merges session state into agent cards, and broadcasts +// the merged fleet as a fleet.update event. +func (c *WSClient) initialSync(ctx context.Context) error { + c.logger.Info("initial sync starting") + + // 1. Fetch agents via RPC + agentsRaw, err := c.Send("agents.list", nil) + if err != nil { + return fmt.Errorf("agents.list RPC: %w", err) + } + + var agentItems []agentListItem + if err := json.Unmarshal(agentsRaw, &agentItems); err != nil { + return fmt.Errorf("parse agents.list response: %w", err) + } + + c.logger.Info("agents.list received", "count", len(agentItems)) + + // 2. Persist each agent (create if not exists, update if changed) + for _, item := range agentItems { + card := agentItemToCard(item) + + existing, err := c.agents.Get(ctx, card.ID) + if err != nil { + // Agent doesn't exist — create it + if createErr := c.agents.Create(ctx, card); createErr != nil { + c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr) + continue + } + c.logger.Info("sync: agent created", "id", card.ID) + continue + } + + // Agent exists — update display name or role if changed + if existing.DisplayName != card.DisplayName || existing.Role != card.Role { + // Update what we can via UpdateAgentRequest + channel := card.Channel + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + Channel: &channel, + }) + if updateErr != nil { + c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr) + } + } + } + + // 3. Fetch sessions via RPC + sessionsRaw, err := c.Send("sessions.list", nil) + if err != nil { + return fmt.Errorf("sessions.list RPC: %w", err) + } + + var sessionItems []sessionListItem + if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil { + return fmt.Errorf("parse sessions.list response: %w", err) + } + + c.logger.Info("sessions.list received", "count", len(sessionItems)) + + // 4. Build agentId → session map for merge + sessionByAgent := make(map[string]sessionListItem) + for _, s := range sessionItems { + if s.AgentID != "" { + sessionByAgent[s.AgentID] = s + } + } + + // 5. Merge session state into agents, update DB, and collect for broadcast + mergedAgents := make([]models.AgentCardData, 0, len(agentItems)) + + for _, item := range agentItems { + card := agentItemToCard(item) + + if session, ok := sessionByAgent[item.ID]; ok { + // Merge session state into agent card + card.SessionKey = session.SessionKey + card.Status = mapSessionStatus(session.Status) + card.LastActivity = session.LastActivityAt + + if session.TotalTokens > 0 { + prog := min(session.TotalTokens/100, 100) + card.TaskProgress = &prog + } + } + + // Persist merged status change + existing, err := c.agents.Get(ctx, card.ID) + if err == nil && existing.Status != card.Status { + status := card.Status + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + Status: &status, + }) + if updateErr != nil { + c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr) + } + } + + mergedAgents = append(mergedAgents, card) + } + + // 6. Broadcast the full merged fleet + c.broker.Broadcast("fleet.update", mergedAgents) + c.logger.Info("initial sync complete", "agents", len(mergedAgents)) + + return nil +} + +// mapSessionStatus converts a gateway session status string to an AgentStatus. +// - "running" / "streaming" → active +// - "error" → error +// - "done" / "" / other → idle +func mapSessionStatus(status string) models.AgentStatus { + switch status { + case "running", "streaming": + return models.AgentStatusActive + case "error": + return models.AgentStatusError + default: + return models.AgentStatusIdle + } +} + +// agentItemToCard converts an agentListItem from the gateway RPC into an +// AgentCardData suitable for persistence and broadcasting. +func agentItemToCard(item agentListItem) models.AgentCardData { + role := item.Role + if role == "" { + role = "agent" + } + channel := item.Channel + if channel == "" { + channel = "discord" + } + name := item.Name + if name == "" { + name = item.ID + } + + return models.AgentCardData{ + ID: item.ID, + DisplayName: name, + Role: role, + Status: models.AgentStatusIdle, // default; overridden by session merge + SessionKey: "", + Channel: channel, + LastActivity: time.Now().UTC().Format(time.RFC3339), + } +} \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go new file mode 100644 index 0000000..88db477 --- /dev/null +++ b/go-backend/internal/gateway/wsclient.go @@ -0,0 +1,443 @@ +// Package gateway provides WebSocket client integration with the OpenClaw +// gateway using WS protocol v3. The WSClient handles connection, handshake, +// frame routing, request/response correlation, and automatic reconnection +// with exponential backoff (1s → 30s max). +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository" + + "github.com/google/uuid" + "github.com/gorilla/websocket" +) + +// WSConfig holds WebSocket client configuration, typically loaded from +// environment variables. AuthToken must be set to a valid OpenClaw gateway +// operator token. +type WSConfig struct { + URL string // e.g. "ws://host.docker.internal:18789/" + AuthToken string // from OPENCLAW_GATEWAY_TOKEN +} + +// DefaultWSConfig returns sensible defaults for local development. +func DefaultWSConfig() WSConfig { + return WSConfig{ + URL: "ws://localhost:18789/", + AuthToken: "", + } +} + +// eventHandler is a callback invoked when a named event arrives from the +// gateway. +type eventHandler func(json.RawMessage) + +// WSClient connects to the OpenClaw gateway over WebSocket, completes the +// v3 handshake, routes incoming frames, and automatically reconnects on +// disconnect with exponential backoff (1s → 30s max). +type WSClient struct { + config WSConfig + conn *websocket.Conn + connMu sync.Mutex // protects conn for writes + pending map[string]chan<- json.RawMessage + mu sync.Mutex // protects pending and handlers + agents repository.AgentRepo + broker *handler.Broker + logger *slog.Logger + handlers map[string][]eventHandler + connID string // set after successful hello-ok + restClient *Client // optional REST client to notify on WS ready +} + +// NewWSClient returns a WSClient wired to the given repository and broker. +func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Broker, logger *slog.Logger) *WSClient { + if logger == nil { + logger = slog.Default() + } + return &WSClient{ + config: cfg, + pending: make(map[string]chan<- json.RawMessage), + agents: agents, + broker: broker, + logger: logger, + handlers: make(map[string][]eventHandler), + } +} + +// SetRESTClient wires the REST fallback client so the WS client can notify +// it when the WS connection is ready. Call this before Start. +func (c *WSClient) SetRESTClient(rest *Client) { + c.restClient = rest +} + +// OnEvent registers a handler for the given event name. Handlers are called +// when an incoming frame with type "event" and matching event name is +// received. Safe to call before Start. +func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) { + c.mu.Lock() + defer c.mu.Unlock() + c.handlers[event] = append(c.handlers[event], handler) +} + +// ── Frame types ────────────────────────────────────────────────────────── + +// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol. +type wsFrame struct { + Type string `json:"type"` // "req", "res", "event" + ID string `json:"id,omitempty"` // request/response correlation + Method string `json:"method,omitempty"` // method name (req/res frames) + Event string `json:"event,omitempty"` // event name (event frames) + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *wsError `json:"error,omitempty"` +} + +// wsError represents an error in a response frame. +type wsError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// connectRequest builds the initial connect handshake payload. +type connectRequest struct { + MinProtocol int `json:"minProtocol"` + MaxProtocol int `json:"maxProtocol"` + Client connectClientInfo `json:"client"` + Role string `json:"role"` + Scopes []string `json:"scopes"` + Auth connectAuth `json:"auth"` +} + +type connectClientInfo struct { + ID string `json:"id"` + Version string `json:"version"` + Platform string `json:"platform"` + Mode string `json:"mode"` +} + +type connectAuth struct { + Token string `json:"token"` +} + +// helloOKResponse represents the expected response to a successful connect. +type helloOKResponse struct { + ConnID string `json:"connId"` + Features struct { + Methods []string `json:"methods"` + Events []string `json:"events"` + } `json:"features"` +} + +// ── Start loop ─────────────────────────────────────────────────────────── + +// Start connects to the gateway, completes the handshake, and begins the +// read loop. On disconnect it reconnects with exponential backoff (1s → 30s). +// On ctx cancellation it performs a clean shutdown. +func (c *WSClient) Start(ctx context.Context) { + backoff := 1 * time.Second + maxBackoff := 30 * time.Second + + for { + err := c.connectAndRun(ctx) + if err != nil { + if ctx.Err() != nil { + c.logger.Info("ws client stopped (context cancelled)") + return + } + c.logger.Warn("ws client disconnected, reconnecting", + "error", err, + "backoff", backoff) + } + + select { + case <-ctx.Done(): + c.logger.Info("ws client stopped during backoff (context cancelled)") + return + case <-time.After(backoff): + // Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + +// connectAndRun dials the gateway, completes the handshake, and runs the +// read loop until an error occurs or ctx is cancelled. +func (c *WSClient) connectAndRun(ctx context.Context) error { + c.logger.Info("ws client connecting", "url", c.config.URL) + + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + conn, _, err := dialer.DialContext(ctx, c.config.URL, nil) + if err != nil { + return fmt.Errorf("dial failed: %w", err) + } + + c.connMu.Lock() + c.conn = conn + c.connMu.Unlock() + + defer conn.Close() + + // Step 1: Read the connect.challenge frame + if err := c.readChallenge(conn); err != nil { + return fmt.Errorf("handshake challenge: %w", err) + } + + // Step 2: Send connect request and read hello-ok response + helloOK, err := c.sendConnect(conn) + if err != nil { + return fmt.Errorf("handshake connect: %w", err) + } + + c.logger.Info("ws client handshake complete", + "connId", helloOK.ConnID, + "methods", helloOK.Features.Methods, + "events", helloOK.Features.Events) + + c.connMu.Lock() + c.connID = helloOK.ConnID + c.connMu.Unlock() + + // Notify REST client that WS is live so it stands down + if c.restClient != nil { + c.restClient.MarkWSReady() + c.logger.Info("ws client notified REST fallback to stand down") + } + + // Step 3: Initial sync — fetch agents + sessions from gateway + if err := c.initialSync(ctx); err != nil { + c.logger.Warn("initial sync failed, continuing with read loop", "error", err) + } + + // Step 4: Register live event handlers + c.registerEventHandlers() + + // Step 5: Read loop — blocks until disconnect or ctx cancel + return c.readLoop(ctx, conn) +} + +// readChallenge reads the first frame from the gateway, which must be a +// connect.challenge event. +func (c *WSClient) readChallenge(conn *websocket.Conn) error { + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + return fmt.Errorf("read challenge: %w", err) + } + + if frame.Type != "event" || frame.Event != "connect.challenge" { + return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event) + } + + c.logger.Debug("received connect.challenge") + return nil +} + +// sendConnect sends the connect request and waits for the hello-ok response. +func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) { + reqID := uuid.New().String() + params := connectRequest{ + MinProtocol: 3, + MaxProtocol: 3, + Client: connectClientInfo{ + ID: "control-center", + Version: "1.0", + Platform: "server", + Mode: "operator", + }, + Role: "operator", + Scopes: []string{"operator.read"}, + Auth: connectAuth{ + Token: c.config.AuthToken, + }, + } + + paramsJSON, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal connect params: %w", err) + } + + reqFrame := wsFrame{ + Type: "req", + ID: reqID, + Method: "connect", + Params: paramsJSON, + } + + if err := conn.WriteJSON(reqFrame); err != nil { + return nil, fmt.Errorf("write connect request: %w", err) + } + + // Read response + var resFrame wsFrame + if err := conn.ReadJSON(&resFrame); err != nil { + return nil, fmt.Errorf("read connect response: %w", err) + } + + if resFrame.Error != nil { + return nil, fmt.Errorf("connect rejected: code=%d msg=%s", resFrame.Error.Code, resFrame.Error.Message) + } + + if resFrame.ID != reqID { + return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID) + } + + var helloOK helloOKResponse + if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil { + return nil, fmt.Errorf("parse hello-ok: %w", err) + } + + return &helloOK, nil +} + +// readLoop continuously reads frames from the connection and routes them. +// It returns on read error or context cancellation. +func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error { + for { + select { + case <-ctx.Done(): + // Clean shutdown: send close frame + c.connMu.Lock() + c.conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"), + time.Now().Add(5*time.Second), + ) + c.connMu.Unlock() + return ctx.Err() + default: + } + + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + c.logger.Info("ws connection closed by server") + return nil + } + if websocket.IsUnexpectedCloseError(err) { + c.logger.Warn("ws connection unexpectedly closed", "error", err) + return err + } + return fmt.Errorf("read frame: %w", err) + } + + c.routeFrame(frame) + } +} + +// routeFrame dispatches a received frame to the appropriate handler. +func (c *WSClient) routeFrame(frame wsFrame) { + switch frame.Type { + case "res": + c.handleResponse(frame) + case "event": + c.handleEvent(frame) + default: + c.logger.Debug("unknown frame type", "type", frame.Type, "id", frame.ID) + } +} + +// handleResponse correlates a response frame to a pending request channel. +func (c *WSClient) handleResponse(frame wsFrame) { + c.mu.Lock() + ch, ok := c.pending[frame.ID] + if ok { + delete(c.pending, frame.ID) + } + c.mu.Unlock() + + if !ok { + c.logger.Warn("received response for unknown request", "id", frame.ID) + return + } + + if frame.Error != nil { + ch <- nil + return + } + + ch <- frame.Result +} + +// handleEvent dispatches an event frame to registered handlers. +func (c *WSClient) handleEvent(frame wsFrame) { + c.mu.Lock() + handlers := c.handlers[frame.Event] + c.mu.Unlock() + + if len(handlers) == 0 { + c.logger.Debug("unhandled event", "event", frame.Event) + return + } + + for _, h := range handlers { + h(frame.Params) + } +} + +// ── Send (RPC) ────────────────────────────────────────────────────────── + +// Send sends a JSON-RPC request to the gateway and returns the response +// payload. It is safe for concurrent use. +func (c *WSClient) Send(method string, params any) (json.RawMessage, error) { + reqID := uuid.New().String() + + var paramsJSON json.RawMessage + if params != nil { + var err error + paramsJSON, err = json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal params: %w", err) + } + } + + // Register pending response channel + respCh := make(chan json.RawMessage, 1) + c.mu.Lock() + c.pending[reqID] = respCh + c.mu.Unlock() + + defer func() { + c.mu.Lock() + delete(c.pending, reqID) + c.mu.Unlock() + }() + + // Build and send frame + frame := wsFrame{ + Type: "req", + ID: reqID, + Method: method, + Params: paramsJSON, + } + + c.connMu.Lock() + err := c.conn.WriteJSON(frame) + c.connMu.Unlock() + + if err != nil { + return nil, fmt.Errorf("write request: %w", err) + } + + // Wait for response with timeout + select { + case resp := <-respCh: + if resp == nil { + return nil, fmt.Errorf("gateway returned error for request %s (%s)", reqID, method) + } + return resp, nil + case <-time.After(30 * time.Second): + return nil, fmt.Errorf("request %s (%s) timed out", reqID, method) + } +} \ No newline at end of file