Files
Control-Center/go-backend/internal/gateway/events.go
Dex d28d6e8dac
Some checks are pending
Dev Build / build-test (pull_request) Waiting to run
Dev Build / deploy-dev (pull_request) Blocked by required conditions
CUB-200: implement WebSocket gateway client with v3 protocol
Replace REST poller with WebSocket client as primary gateway connection:

- wsclient.go: WebSocket client with v3 handshake (connect.challenge →
  connect → hello-ok), frame routing (req/res/event), JSON-RPC Send(),
  auto-reconnect with exponential backoff (1s → 30s max)
- sync.go: Initial sync via agents.list + sessions.list RPCs, merge
  session runtime state into AgentCardData, broadcast fleet.update
- events.go: Real-time event handlers for sessions.changed, presence,
  and agent.config — DB update first, then SSE broadcast
- client.go: REST poller retained as fallback (WS is primary)
- config.go: Add GATEWAY_WS_URL and OPENCLAW_GATEWAY_TOKEN env vars
- main.go: Wire WS client as primary, REST as fallback
- .env.example: Document new WS config vars

Fallback: If WS connection fails, seeded demo data + REST polling
remain available.
2026-05-20 11:33:17 +00:00

243 lines
7.7 KiB
Go

// Package gateway provides real-time event handlers for the Control Center
// WebSocket client. Handlers process gateway events (sessions.changed,
// presence, agent.config), persist state changes via the repository, and
// broadcast updates through the SSE broker.
//
// Rule: DB update first, then SSE broadcast. This keeps REST API responses
// consistent with SSE events.
package gateway
import (
"context"
"encoding/json"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
// ── Event payload types ──────────────────────────────────────────────────
// sessionChangedPayload represents a single session delta from a
// sessions.changed event.
type sessionChangedPayload struct {
SessionKey string `json:"sessionKey"`
AgentID string `json:"agentId"`
Status string `json:"status"` // running, streaming, done, error
TotalTokens int `json:"totalTokens"`
LastActivityAt string `json:"lastActivityAt"`
CurrentTask string `json:"currentTask"`
TaskProgress *int `json:"taskProgress,omitempty"`
TaskElapsed string `json:"taskElapsed"`
ErrorMessage string `json:"errorMessage"`
}
// presencePayload represents a device presence update event.
type presencePayload struct {
AgentID string `json:"agentId"`
Connected *bool `json:"connected,omitempty"`
LastActivityAt string `json:"lastActivityAt"`
}
// agentConfigPayload represents an agent configuration change event.
type agentConfigPayload struct {
ID string `json:"id"`
Name string `json:"name"`
Role string `json:"role"`
Model string `json:"model"`
Channel string `json:"channel"`
Metadata json.RawMessage `json:"metadata"`
}
// ── Handler registration ─────────────────────────────────────────────────
// registerEventHandlers sets up all live event handlers on the WSClient.
// Called once after a successful handshake + initial sync.
func (c *WSClient) registerEventHandlers() {
c.OnEvent("sessions.changed", c.handleSessionsChanged)
c.OnEvent("presence", c.handlePresence)
c.OnEvent("agent.config", c.handleAgentConfig)
c.logger.Info("event handlers registered",
"events", []string{"sessions.changed", "presence", "agent.config"})
}
// ── sessions.changed ─────────────────────────────────────────────────────
// handleSessionsChanged processes sessions.changed events from the gateway.
// The payload may be a single session object or an array of session deltas.
// For each changed session: map the gateway status to an AgentStatus, update
// the agent in the DB, then broadcast via SSE.
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
c.logger.Debug("handleSessionsChanged", "payload", string(payload))
// Try array first, then single object
var deltas []sessionChangedPayload
if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 {
var single sessionChangedPayload
if err := json.Unmarshal(payload, &single); err != nil {
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
return
}
deltas = []sessionChangedPayload{single}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, d := range deltas {
if d.AgentID == "" {
c.logger.Debug("sessions.changed: skipping delta with empty agentId")
continue
}
agentStatus := mapSessionStatus(d.Status)
update := models.UpdateAgentRequest{
Status: &agentStatus,
}
if d.CurrentTask != "" {
update.CurrentTask = &d.CurrentTask
}
if d.TaskProgress != nil {
update.TaskProgress = d.TaskProgress
}
if d.TaskElapsed != "" {
update.TaskElapsed = &d.TaskElapsed
}
if d.ErrorMessage != "" {
update.ErrorMessage = &d.ErrorMessage
}
// If session ended, clear task and progress
if agentStatus == models.AgentStatusIdle {
emptyTask := ""
update.CurrentTask = &emptyTask
zeroProg := 0
update.TaskProgress = &zeroProg
}
// DB update first
updated, err := c.agents.Update(ctx, d.AgentID, update)
if err != nil {
c.logger.Warn("sessions.changed: DB update failed",
"agentId", d.AgentID, "error", err)
continue
}
// Then SSE broadcast
c.broker.Broadcast("agent.status", updated)
if d.TaskProgress != nil || d.CurrentTask != "" {
c.broker.Broadcast("agent.progress", updated)
}
c.logger.Debug("sessions.changed: agent updated",
"agentId", d.AgentID, "status", string(agentStatus))
}
}
// ── presence ─────────────────────────────────────────────────────────────
// handlePresence processes presence events from the gateway. Updates the
// agent's lastActivity and broadcasts status if the connection state changed.
func (c *WSClient) handlePresence(payload json.RawMessage) {
c.logger.Debug("handlePresence", "payload", string(payload))
var p presencePayload
if err := json.Unmarshal(payload, &p); err != nil {
c.logger.Warn("presence: unparseable payload, skipping", "error", err)
return
}
if p.AgentID == "" {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
update := models.UpdateAgentRequest{}
// If device disconnected, set agent to idle
if p.Connected != nil && !*p.Connected {
idle := models.AgentStatusIdle
update.Status = &idle
}
// DB update first (Update always bumps last_activity)
updated, err := c.agents.Update(ctx, p.AgentID, update)
if err != nil {
c.logger.Warn("presence: DB update failed",
"agentId", p.AgentID, "error", err)
return
}
if p.LastActivityAt != "" {
updated.LastActivity = p.LastActivityAt
}
// Then SSE broadcast
c.broker.Broadcast("agent.status", updated)
c.logger.Debug("presence: agent updated",
"agentId", p.AgentID, "connected", p.Connected)
}
// ── agent.config ─────────────────────────────────────────────────────────
// handleAgentConfig processes agent.config events from the gateway. Updates
// agent metadata (channel) in the DB and broadcasts a fleet.update with the
// full fleet snapshot.
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
c.logger.Debug("handleAgentConfig", "payload", string(payload))
var cfg agentConfigPayload
if err := json.Unmarshal(payload, &cfg); err != nil {
c.logger.Warn("agent.config: unparseable payload, skipping", "error", err)
return
}
if cfg.ID == "" {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
update := models.UpdateAgentRequest{}
if cfg.Channel != "" {
update.Channel = &cfg.Channel
}
// DB update first
updated, err := c.agents.Update(ctx, cfg.ID, update)
if err != nil {
c.logger.Warn("agent.config: DB update failed",
"agentId", cfg.ID, "error", err)
return
}
// Apply display name/role from config event
if cfg.Name != "" {
updated.DisplayName = cfg.Name
}
if cfg.Role != "" {
updated.Role = cfg.Role
}
// Broadcast full fleet snapshot so frontend gets updated agent info
allAgents, err := c.agents.List(ctx, "")
if err != nil {
c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err)
c.broker.Broadcast("agent.status", updated)
return
}
c.broker.Broadcast("fleet.update", allAgents)
c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name)
}