191 lines
6.0 KiB
Go
191 lines
6.0 KiB
Go
|
|
// Package gateway provides the initial sync logic that fetches agent and
|
||
|
|
// session data from the OpenClaw gateway via WS RPCs after handshake,
|
||
|
|
// persists to the repository, merges session state into agent cards, and
|
||
|
|
// broadcasts the merged fleet to SSE clients.
|
||
|
|
package gateway
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"fmt"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||
|
|
)
|
||
|
|
|
||
|
|
// ── RPC response types ───────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// agentListItem represents a single agent returned by the agents.list RPC.
|
||
|
|
// Fields are extracted gracefully from json.RawMessage so unknown fields
|
||
|
|
// from the gateway are silently ignored.
|
||
|
|
type agentListItem struct {
|
||
|
|
ID string `json:"id"`
|
||
|
|
Name string `json:"name"`
|
||
|
|
Model string `json:"model"`
|
||
|
|
Role string `json:"role"`
|
||
|
|
Channel string `json:"channel"`
|
||
|
|
Metadata json.RawMessage `json:"metadata"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// sessionListItem represents a single session returned by the sessions.list RPC.
|
||
|
|
type sessionListItem struct {
|
||
|
|
SessionKey string `json:"sessionKey"`
|
||
|
|
AgentID string `json:"agentId"`
|
||
|
|
Status string `json:"status"` // running, done, streaming, error
|
||
|
|
TotalTokens int `json:"totalTokens"`
|
||
|
|
LastActivityAt string `json:"lastActivityAt"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Sync logic ──────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// initialSync fetches agents and sessions from the gateway via WS RPCs,
|
||
|
|
// persists them, merges session state into agent cards, and broadcasts
|
||
|
|
// the merged fleet as a fleet.update event.
|
||
|
|
func (c *WSClient) initialSync(ctx context.Context) error {
|
||
|
|
c.logger.Info("initial sync starting")
|
||
|
|
|
||
|
|
// 1. Fetch agents
|
||
|
|
agentsRaw, err := c.Send("agents.list", nil)
|
||
|
|
if err != nil {
|
||
|
|
return fmt.Errorf("agents.list RPC: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
var agentItems []agentListItem
|
||
|
|
if err := json.Unmarshal(agentsRaw, &agentItems); err != nil {
|
||
|
|
return fmt.Errorf("parse agents.list response: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
c.logger.Info("agents.list received", "count", len(agentItems))
|
||
|
|
|
||
|
|
// 2. Persist each agent
|
||
|
|
for _, item := range agentItems {
|
||
|
|
card := agentItemToCard(item)
|
||
|
|
|
||
|
|
existing, err := c.agents.Get(ctx, card.ID)
|
||
|
|
if err != nil {
|
||
|
|
// Agent doesn't exist — create it
|
||
|
|
if createErr := c.agents.Create(ctx, card); createErr != nil {
|
||
|
|
c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr)
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
c.logger.Info("sync: agent created", "id", card.ID)
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
// Agent exists — update if display name or role changed
|
||
|
|
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
|
||
|
|
newName := card.DisplayName
|
||
|
|
newRole := card.Role
|
||
|
|
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
||
|
|
CurrentTask: &newName, // reuse field for display name update
|
||
|
|
})
|
||
|
|
if updateErr != nil {
|
||
|
|
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
|
||
|
|
}
|
||
|
|
_ = newRole // role not in UpdateAgentRequest yet, skip silently
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 3. Fetch sessions
|
||
|
|
sessionsRaw, err := c.Send("sessions.list", nil)
|
||
|
|
if err != nil {
|
||
|
|
return fmt.Errorf("sessions.list RPC: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
var sessionItems []sessionListItem
|
||
|
|
if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil {
|
||
|
|
return fmt.Errorf("parse sessions.list response: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
c.logger.Info("sessions.list received", "count", len(sessionItems))
|
||
|
|
|
||
|
|
// 4. Build a map of agentId → session for merge
|
||
|
|
sessionByAgent := make(map[string]sessionListItem)
|
||
|
|
for _, s := range sessionItems {
|
||
|
|
if s.AgentID != "" {
|
||
|
|
sessionByAgent[s.AgentID] = s
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 5. Merge session state into agents and update + broadcast
|
||
|
|
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
|
||
|
|
|
||
|
|
for _, item := range agentItems {
|
||
|
|
card := agentItemToCard(item)
|
||
|
|
|
||
|
|
if session, ok := sessionByAgent[item.ID]; ok {
|
||
|
|
// Merge session state
|
||
|
|
card.SessionKey = session.SessionKey
|
||
|
|
card.Status = mapSessionStatus(session.Status)
|
||
|
|
card.LastActivity = session.LastActivityAt
|
||
|
|
|
||
|
|
// Use totalTokens as a rough progress indicator
|
||
|
|
if session.TotalTokens > 0 {
|
||
|
|
prog := min(session.TotalTokens/100, 100) // normalize to 0-100
|
||
|
|
card.TaskProgress = &prog
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Persist merged state
|
||
|
|
existing, err := c.agents.Get(ctx, card.ID)
|
||
|
|
if err == nil && existing.Status != card.Status {
|
||
|
|
status := card.Status
|
||
|
|
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
||
|
|
Status: &status,
|
||
|
|
})
|
||
|
|
if updateErr != nil {
|
||
|
|
c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
mergedAgents = append(mergedAgents, card)
|
||
|
|
}
|
||
|
|
|
||
|
|
// 6. Broadcast the full merged fleet
|
||
|
|
c.broker.Broadcast("fleet.update", mergedAgents)
|
||
|
|
c.logger.Info("initial sync complete", "agents", len(mergedAgents))
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// mapSessionStatus converts a gateway session status string to an AgentStatus.
|
||
|
|
// - "running" / "streaming" → active
|
||
|
|
// - "error" → error
|
||
|
|
// - "done" / "" / other → idle
|
||
|
|
func mapSessionStatus(status string) models.AgentStatus {
|
||
|
|
switch status {
|
||
|
|
case "running", "streaming":
|
||
|
|
return models.AgentStatusActive
|
||
|
|
case "error":
|
||
|
|
return models.AgentStatusError
|
||
|
|
default:
|
||
|
|
return models.AgentStatusIdle
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// agentItemToCard converts an agentListItem from the gateway RPC into an
|
||
|
|
// AgentCardData suitable for persistence and broadcasting.
|
||
|
|
func agentItemToCard(item agentListItem) models.AgentCardData {
|
||
|
|
role := item.Role
|
||
|
|
if role == "" {
|
||
|
|
role = "agent"
|
||
|
|
}
|
||
|
|
channel := item.Channel
|
||
|
|
if channel == "" {
|
||
|
|
channel = "discord"
|
||
|
|
}
|
||
|
|
name := item.Name
|
||
|
|
if name == "" {
|
||
|
|
name = item.ID
|
||
|
|
}
|
||
|
|
|
||
|
|
return models.AgentCardData{
|
||
|
|
ID: item.ID,
|
||
|
|
DisplayName: name,
|
||
|
|
Role: role,
|
||
|
|
Status: models.AgentStatusIdle, // default; will be overridden by session merge
|
||
|
|
SessionKey: "",
|
||
|
|
Channel: channel,
|
||
|
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||
|
|
}
|
||
|
|
}
|