Files
Control-Center/go-backend/internal/gateway/sync.go

191 lines
6.0 KiB
Go
Raw Normal View History

// Package gateway provides the initial sync logic that fetches agent and
// session data from the OpenClaw gateway via WS RPCs after handshake,
// persists to the repository, merges session state into agent cards, and
// broadcasts the merged fleet to SSE clients.
package gateway
import (
"context"
"encoding/json"
"fmt"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
// ── RPC response types ───────────────────────────────────────────────────
// agentListItem represents a single agent returned by the agents.list RPC.
// Fields are extracted gracefully from json.RawMessage so unknown fields
// from the gateway are silently ignored.
type agentListItem struct {
ID string `json:"id"`
Name string `json:"name"`
Model string `json:"model"`
Role string `json:"role"`
Channel string `json:"channel"`
Metadata json.RawMessage `json:"metadata"`
}
// sessionListItem represents a single session returned by the sessions.list RPC.
type sessionListItem struct {
SessionKey string `json:"sessionKey"`
AgentID string `json:"agentId"`
Status string `json:"status"` // running, done, streaming, error
TotalTokens int `json:"totalTokens"`
LastActivityAt string `json:"lastActivityAt"`
}
// ── Sync logic ──────────────────────────────────────────────────────────
// initialSync fetches agents and sessions from the gateway via WS RPCs,
// persists them, merges session state into agent cards, and broadcasts
// the merged fleet as a fleet.update event.
func (c *WSClient) initialSync(ctx context.Context) error {
c.logger.Info("initial sync starting")
// 1. Fetch agents
agentsRaw, err := c.Send("agents.list", nil)
if err != nil {
return fmt.Errorf("agents.list RPC: %w", err)
}
var agentItems []agentListItem
if err := json.Unmarshal(agentsRaw, &agentItems); err != nil {
return fmt.Errorf("parse agents.list response: %w", err)
}
c.logger.Info("agents.list received", "count", len(agentItems))
// 2. Persist each agent
for _, item := range agentItems {
card := agentItemToCard(item)
existing, err := c.agents.Get(ctx, card.ID)
if err != nil {
// Agent doesn't exist — create it
if createErr := c.agents.Create(ctx, card); createErr != nil {
c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr)
continue
}
c.logger.Info("sync: agent created", "id", card.ID)
continue
}
// Agent exists — update if display name or role changed
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
newName := card.DisplayName
newRole := card.Role
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
CurrentTask: &newName, // reuse field for display name update
})
if updateErr != nil {
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
}
_ = newRole // role not in UpdateAgentRequest yet, skip silently
}
}
// 3. Fetch sessions
sessionsRaw, err := c.Send("sessions.list", nil)
if err != nil {
return fmt.Errorf("sessions.list RPC: %w", err)
}
var sessionItems []sessionListItem
if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil {
return fmt.Errorf("parse sessions.list response: %w", err)
}
c.logger.Info("sessions.list received", "count", len(sessionItems))
// 4. Build a map of agentId → session for merge
sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems {
if s.AgentID != "" {
sessionByAgent[s.AgentID] = s
}
}
// 5. Merge session state into agents and update + broadcast
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems {
card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok {
// Merge session state
card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt
// Use totalTokens as a rough progress indicator
if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100) // normalize to 0-100
card.TaskProgress = &prog
}
}
// Persist merged state
existing, err := c.agents.Get(ctx, card.ID)
if err == nil && existing.Status != card.Status {
status := card.Status
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
Status: &status,
})
if updateErr != nil {
c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr)
}
}
mergedAgents = append(mergedAgents, card)
}
// 6. Broadcast the full merged fleet
c.broker.Broadcast("fleet.update", mergedAgents)
c.logger.Info("initial sync complete", "agents", len(mergedAgents))
return nil
}
// mapSessionStatus converts a gateway session status string to an AgentStatus.
// - "running" / "streaming" → active
// - "error" → error
// - "done" / "" / other → idle
func mapSessionStatus(status string) models.AgentStatus {
switch status {
case "running", "streaming":
return models.AgentStatusActive
case "error":
return models.AgentStatusError
default:
return models.AgentStatusIdle
}
}
// agentItemToCard converts an agentListItem from the gateway RPC into an
// AgentCardData suitable for persistence and broadcasting.
func agentItemToCard(item agentListItem) models.AgentCardData {
role := item.Role
if role == "" {
role = "agent"
}
channel := item.Channel
if channel == "" {
channel = "unknown"
}
name := item.Name
if name == "" {
name = item.ID
}
return models.AgentCardData{
ID: item.ID,
DisplayName: name,
Role: role,
Status: models.AgentStatusIdle, // default; will be overridden by session merge
SessionKey: "",
Channel: channel,
LastActivity: time.Now().UTC().Format(time.RFC3339),
}
}