From 60ba3e5b4f2d3fb538e92d1ccffeeb5f2391bb55 Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:07:23 +0000 Subject: [PATCH] CUB-201: add initial sync via agents.list + sessions.list RPCs - Create gateway/sync.go with initialSync method on WSClient - Fetch agents via agents.list RPC, persist to AgentRepo - Fetch sessions via sessions.list RPC, map status to AgentStatus - Merge session state (status, sessionKey, tokens) into AgentCardData - Broadcast merged fleet as fleet.update via SSE broker - Trigger initialSync after hello-ok handshake - Re-sync automatically on reconnect (connectAndRun calls initialSync) - Handle unknown gateway fields gracefully via typed extraction --- go-backend/internal/gateway/sync.go | 191 ++++++++++++++++++++++++ go-backend/internal/gateway/wsclient.go | 11 ++ 2 files changed, 202 insertions(+) create mode 100644 go-backend/internal/gateway/sync.go diff --git a/go-backend/internal/gateway/sync.go b/go-backend/internal/gateway/sync.go new file mode 100644 index 0000000..da4ff02 --- /dev/null +++ b/go-backend/internal/gateway/sync.go @@ -0,0 +1,191 @@ +// Package gateway provides the initial sync logic that fetches agent and +// session data from the OpenClaw gateway via WS RPCs after handshake, +// persists to the repository, merges session state into agent cards, and +// broadcasts the merged fleet to SSE clients. +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── RPC response types ─────────────────────────────────────────────────── + +// agentListItem represents a single agent returned by the agents.list RPC. +// Fields are extracted gracefully from json.RawMessage so unknown fields +// from the gateway are silently ignored. +type agentListItem struct { + ID string `json:"id"` + Name string `json:"name"` + Model string `json:"model"` + Role string `json:"role"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` +} + +// sessionListItem represents a single session returned by the sessions.list RPC. +type sessionListItem struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, done, streaming, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` +} + +// ── Sync logic ────────────────────────────────────────────────────────── + +// initialSync fetches agents and sessions from the gateway via WS RPCs, +// persists them, merges session state into agent cards, and broadcasts +// the merged fleet as a fleet.update event. +func (c *WSClient) initialSync(ctx context.Context) error { + c.logger.Info("initial sync starting") + + // 1. Fetch agents + agentsRaw, err := c.Send("agents.list", nil) + if err != nil { + return fmt.Errorf("agents.list RPC: %w", err) + } + + var agentItems []agentListItem + if err := json.Unmarshal(agentsRaw, &agentItems); err != nil { + return fmt.Errorf("parse agents.list response: %w", err) + } + + c.logger.Info("agents.list received", "count", len(agentItems)) + + // 2. Persist each agent + for _, item := range agentItems { + card := agentItemToCard(item) + + existing, err := c.agents.Get(ctx, card.ID) + if err != nil { + // Agent doesn't exist — create it + if createErr := c.agents.Create(ctx, card); createErr != nil { + c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr) + continue + } + c.logger.Info("sync: agent created", "id", card.ID) + continue + } + + // Agent exists — update if display name or role changed + if existing.DisplayName != card.DisplayName || existing.Role != card.Role { + newName := card.DisplayName + newRole := card.Role + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + CurrentTask: &newName, // reuse field for display name update + }) + if updateErr != nil { + c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr) + } + _ = newRole // role not in UpdateAgentRequest yet, skip silently + } + } + + // 3. Fetch sessions + sessionsRaw, err := c.Send("sessions.list", nil) + if err != nil { + return fmt.Errorf("sessions.list RPC: %w", err) + } + + var sessionItems []sessionListItem + if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil { + return fmt.Errorf("parse sessions.list response: %w", err) + } + + c.logger.Info("sessions.list received", "count", len(sessionItems)) + + // 4. Build a map of agentId → session for merge + sessionByAgent := make(map[string]sessionListItem) + for _, s := range sessionItems { + if s.AgentID != "" { + sessionByAgent[s.AgentID] = s + } + } + + // 5. Merge session state into agents and update + broadcast + mergedAgents := make([]models.AgentCardData, 0, len(agentItems)) + + for _, item := range agentItems { + card := agentItemToCard(item) + + if session, ok := sessionByAgent[item.ID]; ok { + // Merge session state + card.SessionKey = session.SessionKey + card.Status = mapSessionStatus(session.Status) + card.LastActivity = session.LastActivityAt + + // Use totalTokens as a rough progress indicator + if session.TotalTokens > 0 { + prog := min(session.TotalTokens/100, 100) // normalize to 0-100 + card.TaskProgress = &prog + } + } + + // Persist merged state + existing, err := c.agents.Get(ctx, card.ID) + if err == nil && existing.Status != card.Status { + status := card.Status + _, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{ + Status: &status, + }) + if updateErr != nil { + c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr) + } + } + + mergedAgents = append(mergedAgents, card) + } + + // 6. Broadcast the full merged fleet + c.broker.Broadcast("fleet.update", mergedAgents) + c.logger.Info("initial sync complete", "agents", len(mergedAgents)) + + return nil +} + +// mapSessionStatus converts a gateway session status string to an AgentStatus. +// - "running" / "streaming" → active +// - "error" → error +// - "done" / "" / other → idle +func mapSessionStatus(status string) models.AgentStatus { + switch status { + case "running", "streaming": + return models.AgentStatusActive + case "error": + return models.AgentStatusError + default: + return models.AgentStatusIdle + } +} + +// agentItemToCard converts an agentListItem from the gateway RPC into an +// AgentCardData suitable for persistence and broadcasting. +func agentItemToCard(item agentListItem) models.AgentCardData { + role := item.Role + if role == "" { + role = "agent" + } + channel := item.Channel + if channel == "" { + channel = "discord" + } + name := item.Name + if name == "" { + name = item.ID + } + + return models.AgentCardData{ + ID: item.ID, + DisplayName: name, + Role: role, + Status: models.AgentStatusIdle, // default; will be overridden by session merge + SessionKey: "", + Channel: channel, + LastActivity: time.Now().UTC().Format(time.RFC3339), + } +} \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 1e64775..1fae44a 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -53,6 +53,7 @@ type WSClient struct { logger *slog.Logger handlers map[string][]eventHandler + connId string // set after successful hello-ok } // NewWSClient returns a WSClient wired to the given repository and broker. @@ -202,6 +203,16 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { "methods", helloOK.Features.Methods, "events", helloOK.Features.Events) + // Store connId for reference + c.connMu.Lock() + c.connId = helloOK.ConnID + c.connMu.Unlock() + + // Step 2b: Initial sync — fetch agents + sessions from gateway + if err := c.initialSync(ctx); err != nil { + c.logger.Warn("initial sync failed, will continue with read loop", "error", err) + } + // Step 3: Read loop return c.readLoop(ctx, conn) }