CUB-203: WebSocket client scaffold for OpenClaw gateway v3 #41
191
go-backend/internal/gateway/sync.go
Normal file
191
go-backend/internal/gateway/sync.go
Normal file
@@ -0,0 +1,191 @@
|
||||
// Package gateway provides the initial sync logic that fetches agent and
|
||||
// session data from the OpenClaw gateway via WS RPCs after handshake,
|
||||
// persists to the repository, merges session state into agent cards, and
|
||||
// broadcasts the merged fleet to SSE clients.
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||
)
|
||||
|
||||
// ── RPC response types ───────────────────────────────────────────────────
|
||||
|
||||
// agentListItem represents a single agent returned by the agents.list RPC.
|
||||
// Fields are extracted gracefully from json.RawMessage so unknown fields
|
||||
// from the gateway are silently ignored.
|
||||
type agentListItem struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Model string `json:"model"`
|
||||
Role string `json:"role"`
|
||||
Channel string `json:"channel"`
|
||||
Metadata json.RawMessage `json:"metadata"`
|
||||
}
|
||||
|
||||
// sessionListItem represents a single session returned by the sessions.list RPC.
|
||||
type sessionListItem struct {
|
||||
SessionKey string `json:"sessionKey"`
|
||||
AgentID string `json:"agentId"`
|
||||
Status string `json:"status"` // running, done, streaming, error
|
||||
TotalTokens int `json:"totalTokens"`
|
||||
LastActivityAt string `json:"lastActivityAt"`
|
||||
}
|
||||
|
||||
// ── Sync logic ──────────────────────────────────────────────────────────
|
||||
|
||||
// initialSync fetches agents and sessions from the gateway via WS RPCs,
|
||||
// persists them, merges session state into agent cards, and broadcasts
|
||||
// the merged fleet as a fleet.update event.
|
||||
func (c *WSClient) initialSync(ctx context.Context) error {
|
||||
c.logger.Info("initial sync starting")
|
||||
|
||||
// 1. Fetch agents
|
||||
agentsRaw, err := c.Send("agents.list", nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("agents.list RPC: %w", err)
|
||||
}
|
||||
|
||||
var agentItems []agentListItem
|
||||
if err := json.Unmarshal(agentsRaw, &agentItems); err != nil {
|
||||
return fmt.Errorf("parse agents.list response: %w", err)
|
||||
}
|
||||
|
||||
c.logger.Info("agents.list received", "count", len(agentItems))
|
||||
|
||||
// 2. Persist each agent
|
||||
for _, item := range agentItems {
|
||||
card := agentItemToCard(item)
|
||||
|
||||
existing, err := c.agents.Get(ctx, card.ID)
|
||||
if err != nil {
|
||||
// Agent doesn't exist — create it
|
||||
if createErr := c.agents.Create(ctx, card); createErr != nil {
|
||||
c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr)
|
||||
continue
|
||||
}
|
||||
c.logger.Info("sync: agent created", "id", card.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Agent exists — update if display name or role changed
|
||||
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
|
||||
newName := card.DisplayName
|
||||
newRole := card.Role
|
||||
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
||||
CurrentTask: &newName, // reuse field for display name update
|
||||
})
|
||||
if updateErr != nil {
|
||||
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
|
||||
}
|
||||
_ = newRole // role not in UpdateAgentRequest yet, skip silently
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Fetch sessions
|
||||
sessionsRaw, err := c.Send("sessions.list", nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sessions.list RPC: %w", err)
|
||||
}
|
||||
|
||||
var sessionItems []sessionListItem
|
||||
if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil {
|
||||
return fmt.Errorf("parse sessions.list response: %w", err)
|
||||
}
|
||||
|
||||
c.logger.Info("sessions.list received", "count", len(sessionItems))
|
||||
|
||||
// 4. Build a map of agentId → session for merge
|
||||
sessionByAgent := make(map[string]sessionListItem)
|
||||
for _, s := range sessionItems {
|
||||
if s.AgentID != "" {
|
||||
sessionByAgent[s.AgentID] = s
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Merge session state into agents and update + broadcast
|
||||
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
|
||||
|
||||
for _, item := range agentItems {
|
||||
card := agentItemToCard(item)
|
||||
|
||||
if session, ok := sessionByAgent[item.ID]; ok {
|
||||
// Merge session state
|
||||
card.SessionKey = session.SessionKey
|
||||
card.Status = mapSessionStatus(session.Status)
|
||||
card.LastActivity = session.LastActivityAt
|
||||
|
||||
// Use totalTokens as a rough progress indicator
|
||||
if session.TotalTokens > 0 {
|
||||
prog := min(session.TotalTokens/100, 100) // normalize to 0-100
|
||||
card.TaskProgress = &prog
|
||||
}
|
||||
}
|
||||
|
||||
// Persist merged state
|
||||
existing, err := c.agents.Get(ctx, card.ID)
|
||||
if err == nil && existing.Status != card.Status {
|
||||
status := card.Status
|
||||
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
||||
Status: &status,
|
||||
})
|
||||
if updateErr != nil {
|
||||
c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr)
|
||||
}
|
||||
}
|
||||
|
||||
mergedAgents = append(mergedAgents, card)
|
||||
}
|
||||
|
||||
// 6. Broadcast the full merged fleet
|
||||
c.broker.Broadcast("fleet.update", mergedAgents)
|
||||
c.logger.Info("initial sync complete", "agents", len(mergedAgents))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// mapSessionStatus converts a gateway session status string to an AgentStatus.
|
||||
// - "running" / "streaming" → active
|
||||
// - "error" → error
|
||||
// - "done" / "" / other → idle
|
||||
func mapSessionStatus(status string) models.AgentStatus {
|
||||
switch status {
|
||||
case "running", "streaming":
|
||||
return models.AgentStatusActive
|
||||
case "error":
|
||||
return models.AgentStatusError
|
||||
default:
|
||||
return models.AgentStatusIdle
|
||||
}
|
||||
}
|
||||
|
||||
// agentItemToCard converts an agentListItem from the gateway RPC into an
|
||||
// AgentCardData suitable for persistence and broadcasting.
|
||||
func agentItemToCard(item agentListItem) models.AgentCardData {
|
||||
role := item.Role
|
||||
if role == "" {
|
||||
role = "agent"
|
||||
}
|
||||
channel := item.Channel
|
||||
if channel == "" {
|
||||
channel = "discord"
|
||||
}
|
||||
name := item.Name
|
||||
if name == "" {
|
||||
name = item.ID
|
||||
}
|
||||
|
||||
return models.AgentCardData{
|
||||
ID: item.ID,
|
||||
DisplayName: name,
|
||||
Role: role,
|
||||
Status: models.AgentStatusIdle, // default; will be overridden by session merge
|
||||
SessionKey: "",
|
||||
Channel: channel,
|
||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
}
|
||||
@@ -53,6 +53,7 @@ type WSClient struct {
|
||||
logger *slog.Logger
|
||||
|
||||
handlers map[string][]eventHandler
|
||||
connId string // set after successful hello-ok
|
||||
}
|
||||
|
||||
// NewWSClient returns a WSClient wired to the given repository and broker.
|
||||
@@ -202,6 +203,16 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
||||
"methods", helloOK.Features.Methods,
|
||||
"events", helloOK.Features.Events)
|
||||
|
||||
// Store connId for reference
|
||||
c.connMu.Lock()
|
||||
c.connId = helloOK.ConnID
|
||||
c.connMu.Unlock()
|
||||
|
||||
// Step 2b: Initial sync — fetch agents + sessions from gateway
|
||||
if err := c.initialSync(ctx); err != nil {
|
||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||
}
|
||||
|
||||
// Step 3: Read loop
|
||||
return c.readLoop(ctx, conn)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user