236 lines
6.9 KiB
Go
236 lines
6.9 KiB
Go
|
|
package gateway
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"testing"
|
||
|
|
|
||
|
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||
|
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||
|
|
)
|
||
|
|
|
||
|
|
func TestInitialSync(t *testing.T) {
|
||
|
|
_ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles
|
||
|
|
broker := handler.NewBroker()
|
||
|
|
capture := newBroadcastCapture(broker)
|
||
|
|
defer capture.close()
|
||
|
|
|
||
|
|
// --- Test agentItemToCard + session merge (the core of initialSync) ---
|
||
|
|
|
||
|
|
agentItems := []agentListItem{
|
||
|
|
{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"},
|
||
|
|
{ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"},
|
||
|
|
}
|
||
|
|
|
||
|
|
sessionItems := []sessionListItem{
|
||
|
|
{SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"},
|
||
|
|
{SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"},
|
||
|
|
}
|
||
|
|
|
||
|
|
// Build sessionByAgent map (mirrors initialSync logic)
|
||
|
|
sessionByAgent := make(map[string]sessionListItem)
|
||
|
|
for _, s := range sessionItems {
|
||
|
|
if s.AgentID != "" {
|
||
|
|
sessionByAgent[s.AgentID] = s
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Merge and verify
|
||
|
|
merged := make([]models.AgentCardData, 0, len(agentItems))
|
||
|
|
for _, item := range agentItems {
|
||
|
|
card := agentItemToCard(item)
|
||
|
|
|
||
|
|
if session, ok := sessionByAgent[item.ID]; ok {
|
||
|
|
card.SessionKey = session.SessionKey
|
||
|
|
card.Status = mapSessionStatus(session.Status)
|
||
|
|
card.LastActivity = session.LastActivityAt
|
||
|
|
|
||
|
|
if session.TotalTokens > 0 {
|
||
|
|
prog := min(session.TotalTokens/100, 100)
|
||
|
|
card.TaskProgress = &prog
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
merged = append(merged, card)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Verify otto: running → active
|
||
|
|
if merged[0].ID != "otto" {
|
||
|
|
t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto")
|
||
|
|
}
|
||
|
|
if merged[0].Status != models.AgentStatusActive {
|
||
|
|
t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive)
|
||
|
|
}
|
||
|
|
if merged[0].SessionKey != "s1" {
|
||
|
|
t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1")
|
||
|
|
}
|
||
|
|
if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 {
|
||
|
|
t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Verify dex: done → idle
|
||
|
|
if merged[1].ID != "dex" {
|
||
|
|
t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex")
|
||
|
|
}
|
||
|
|
if merged[1].Status != models.AgentStatusIdle {
|
||
|
|
t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle)
|
||
|
|
}
|
||
|
|
if merged[1].SessionKey != "s2" {
|
||
|
|
t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestInitialSync_PersistCreatesNew(t *testing.T) {
|
||
|
|
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||
|
|
broker := handler.NewBroker()
|
||
|
|
capture := newBroadcastCapture(broker)
|
||
|
|
defer capture.close()
|
||
|
|
|
||
|
|
// Simulate the persist logic from initialSync:
|
||
|
|
// new agents should be created
|
||
|
|
card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"})
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
|
||
|
|
// Agent doesn't exist → create
|
||
|
|
_, err := repo.Get(ctx, card.ID)
|
||
|
|
if err == nil {
|
||
|
|
t.Fatal("expected agent to not exist yet")
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := repo.Create(ctx, card); err != nil {
|
||
|
|
t.Fatalf("Create failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
got, err := repo.Get(ctx, card.ID)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("Get after Create failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
if got.ID != "otto" {
|
||
|
|
t.Errorf("got.ID = %q, want %q", got.ID, "otto")
|
||
|
|
}
|
||
|
|
if got.DisplayName != "Otto" {
|
||
|
|
t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto")
|
||
|
|
}
|
||
|
|
if got.Role != "Orchestrator" {
|
||
|
|
t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestInitialSync_PersistUpdatesExisting(t *testing.T) {
|
||
|
|
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||
|
|
broker := handler.NewBroker()
|
||
|
|
capture := newBroadcastCapture(broker)
|
||
|
|
defer capture.close()
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
|
||
|
|
// Pre-populate with existing agent
|
||
|
|
repo.agents["otto"] = models.AgentCardData{
|
||
|
|
ID: "otto",
|
||
|
|
DisplayName: "Otto",
|
||
|
|
Role: "Old Role",
|
||
|
|
Status: models.AgentStatusIdle,
|
||
|
|
}
|
||
|
|
|
||
|
|
// Simulate initialSync: agent exists, name/role changed → update
|
||
|
|
newName := "Otto Prime"
|
||
|
|
newRole := "Super Orchestrator"
|
||
|
|
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
|
||
|
|
DisplayName: &newName,
|
||
|
|
Role: &newRole,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("Update failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
got, err := repo.Get(ctx, "otto")
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("Get after Update failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
if got.DisplayName != "Otto Prime" {
|
||
|
|
t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime")
|
||
|
|
}
|
||
|
|
if got.Role != "Super Orchestrator" {
|
||
|
|
t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestInitialSync_MergesSessionStatus(t *testing.T) {
|
||
|
|
// When initialSync merges session state, an agent whose existing status
|
||
|
|
// differs from the session-derived status should be updated.
|
||
|
|
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||
|
|
ctx := context.Background()
|
||
|
|
|
||
|
|
repo.agents["otto"] = models.AgentCardData{
|
||
|
|
ID: "otto",
|
||
|
|
DisplayName: "Otto",
|
||
|
|
Role: "Orchestrator",
|
||
|
|
Status: models.AgentStatusIdle,
|
||
|
|
}
|
||
|
|
|
||
|
|
// Simulate session merge: session says "running" → agent should go active
|
||
|
|
activeStatus := mapSessionStatus("running")
|
||
|
|
if activeStatus != models.AgentStatusActive {
|
||
|
|
t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus)
|
||
|
|
}
|
||
|
|
|
||
|
|
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
|
||
|
|
Status: &activeStatus,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("Update failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
got, err := repo.Get(ctx, "otto")
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("Get failed: %v", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
if got.Status != models.AgentStatusActive {
|
||
|
|
t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestInitialSync_BroadcastsFleet(t *testing.T) {
|
||
|
|
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||
|
|
broker := handler.NewBroker()
|
||
|
|
capture := newBroadcastCapture(broker)
|
||
|
|
defer capture.close()
|
||
|
|
|
||
|
|
// Create some agents in the repo
|
||
|
|
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive}
|
||
|
|
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
|
||
|
|
|
||
|
|
// Simulate the final broadcast from initialSync
|
||
|
|
mergedAgents := []models.AgentCardData{
|
||
|
|
repo.agents["otto"],
|
||
|
|
repo.agents["dex"],
|
||
|
|
}
|
||
|
|
broker.Broadcast("fleet.update", mergedAgents)
|
||
|
|
|
||
|
|
events := capture.captured()
|
||
|
|
if len(events) == 0 {
|
||
|
|
t.Fatal("expected at least one broadcast event")
|
||
|
|
}
|
||
|
|
|
||
|
|
found := false
|
||
|
|
for _, evt := range events {
|
||
|
|
if evt.EventType == "fleet.update" {
|
||
|
|
found = true
|
||
|
|
// Verify data is the merged agents list
|
||
|
|
agents, ok := evt.Data.([]models.AgentCardData)
|
||
|
|
if !ok {
|
||
|
|
t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data)
|
||
|
|
}
|
||
|
|
if len(agents) != 2 {
|
||
|
|
t.Errorf("fleet.update agents count = %d, want 2", len(agents))
|
||
|
|
}
|
||
|
|
break
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if !found {
|
||
|
|
t.Error("expected fleet.update broadcast event")
|
||
|
|
}
|
||
|
|
}
|