From 3c26b8debadd8d2f2ea42123ed448bf28d91783c Mon Sep 17 00:00:00 2001 From: Dex Date: Wed, 20 May 2026 11:58:42 +0000 Subject: [PATCH] CUB-207: add unit tests for event handlers and initial sync --- go-backend/internal/gateway/events_test.go | 516 +++++++++++++++++++++ go-backend/internal/gateway/sync_test.go | 236 ++++++++++ 2 files changed, 752 insertions(+) create mode 100644 go-backend/internal/gateway/events_test.go create mode 100644 go-backend/internal/gateway/sync_test.go diff --git a/go-backend/internal/gateway/events_test.go b/go-backend/internal/gateway/events_test.go new file mode 100644 index 0000000..cc5ce24 --- /dev/null +++ b/go-backend/internal/gateway/events_test.go @@ -0,0 +1,516 @@ +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "testing" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── Mock AgentRepo ──────────────────────────────────────────────────────── + +type mockAgentRepo struct { + mu sync.Mutex + agents map[string]models.AgentCardData + updateCalls []updateCall +} + +type updateCall struct { + id string + req models.UpdateAgentRequest +} + +func (m *mockAgentRepo) Get(_ context.Context, id string) (models.AgentCardData, error) { + m.mu.Lock() + defer m.mu.Unlock() + a, ok := m.agents[id] + if !ok { + return models.AgentCardData{}, errNotFound + } + return a, nil +} + +func (m *mockAgentRepo) Update(_ context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) { + m.mu.Lock() + defer m.mu.Unlock() + + a, ok := m.agents[id] + if !ok { + return models.AgentCardData{}, errNotFound + } + + if req.Status != nil { + a.Status = *req.Status + } + if req.DisplayName != nil { + a.DisplayName = *req.DisplayName + } + if req.Role != nil { + a.Role = *req.Role + } + if req.Channel != nil { + a.Channel = *req.Channel + } + if req.CurrentTask != nil { + a.CurrentTask = req.CurrentTask + } + if req.TaskProgress != nil { + a.TaskProgress = req.TaskProgress + } + if req.TaskElapsed != nil { + a.TaskElapsed = req.TaskElapsed + } + if req.ErrorMessage != nil { + a.ErrorMessage = req.ErrorMessage + } + if req.LastActivityAt != nil { + a.LastActivity = *req.LastActivityAt + } + + m.agents[id] = a + m.updateCalls = append(m.updateCalls, updateCall{id, req}) + return a, nil +} + +func (m *mockAgentRepo) Create(_ context.Context, a models.AgentCardData) error { + m.mu.Lock() + defer m.mu.Unlock() + m.agents[a.ID] = a + return nil +} + +func (m *mockAgentRepo) List(_ context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) { + m.mu.Lock() + defer m.mu.Unlock() + + var result []models.AgentCardData + for _, a := range m.agents { + if statusFilter == "" || a.Status == statusFilter { + result = append(result, a) + } + } + return result, nil +} + +func (m *mockAgentRepo) Delete(_ context.Context, id string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.agents, id) + return nil +} + +func (m *mockAgentRepo) Count(_ context.Context) (int, error) { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.agents), nil +} + +// errNotFound is returned by the mock repo when an agent is not found. +var errNotFound = fmt.Errorf("not found") + +// ── Broadcast capture helper ─────────────────────────────────────────────── + +// broadcastCapture wraps a real Broker and captures all broadcasts +// via a subscribed channel. Use captured() to retrieve events that have +// been received so far. Call close() to unsubscribe when done. +type broadcastCapture struct { + broker *handler.Broker + ch chan handler.SSEEvent +} + +func newBroadcastCapture(broker *handler.Broker) *broadcastCapture { + return &broadcastCapture{ + broker: broker, + ch: broker.Subscribe(), + } +} + +// captured drains all pending events from the subscription channel +// and returns them. This is synchronous — it only returns events that +// have already been sent to the channel. +func (bc *broadcastCapture) captured() []handler.SSEEvent { + var events []handler.SSEEvent + for { + select { + case evt := <-bc.ch: + events = append(events, evt) + default: + return events + } + } +} + +func (bc *broadcastCapture) close() { + bc.broker.Unsubscribe(bc.ch) +} + +// ── Test helpers ────────────────────────────────────────────────────────── + +// newTestWSClient creates a WSClient wired to a mock repo and a real broker. +// Returns the client, the mock repo, and a broadcast capture. +func newTestWSClient() (*WSClient, *mockAgentRepo, *handler.Broker, *broadcastCapture) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + client := NewWSClient(WSConfig{}, repo, broker, slog.Default()) + return client, repo, broker, capture +} + +// ── Tests ───────────────────────────────────────────────────────────────── + +func TestHandleSessionsChanged_Active(t *testing.T) { + client, repo, broker, capture := newTestWSClient() + defer capture.close() + + repo.agents["otto"] = models.AgentCardData{ + ID: "otto", + DisplayName: "Otto", + Status: models.AgentStatusIdle, + } + + payload := json.RawMessage(`{ + "sessionKey": "s1", + "agentId": "otto", + "status": "running", + "totalTokens": 500, + "currentTask": "Orchestrating tasks" + }`) + + client.handleSessionsChanged(payload) + + // Verify: agent status updated to active + repo.mu.Lock() + agent := repo.agents["otto"] + calls := make([]updateCall, len(repo.updateCalls)) + copy(calls, repo.updateCalls) + repo.mu.Unlock() + + if agent.Status != models.AgentStatusActive { + t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusActive) + } + + // Verify: update was called + if len(calls) == 0 { + t.Fatal("expected at least one update call") + } + if calls[0].id != "otto" { + t.Errorf("update call agentId = %q, want %q", calls[0].id, "otto") + } + + // Verify: broker broadcast "agent.status" + events := capture.captured() + found := false + for _, evt := range events { + if evt.EventType == "agent.status" { + found = true + break + } + } + if !found { + t.Error("expected broker broadcast with event type 'agent.status'") + } +} + +func TestHandleSessionsChanged_Idle(t *testing.T) { + client, repo, _, capture := newTestWSClient() + defer capture.close() + + repo.agents["dex"] = models.AgentCardData{ + ID: "dex", + DisplayName: "Dex", + Status: models.AgentStatusActive, + CurrentTask: strPtr("Writing API"), + } + + payload := json.RawMessage(`{ + "sessionKey": "s2", + "agentId": "dex", + "status": "done", + "totalTokens": 1000 + }`) + + client.handleSessionsChanged(payload) + + repo.mu.Lock() + agent := repo.agents["dex"] + repo.mu.Unlock() + + // Verify: agent goes idle + if agent.Status != models.AgentStatusIdle { + t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusIdle) + } + + // Verify: current task cleared (set to empty string) + if agent.CurrentTask != nil && *agent.CurrentTask != "" { + t.Errorf("current task = %q, want empty (cleared on idle)", *agent.CurrentTask) + } + + // Verify: broker fires "agent.status" + events := capture.captured() + found := false + for _, evt := range events { + if evt.EventType == "agent.status" { + found = true + break + } + } + if !found { + t.Error("expected broker broadcast with event type 'agent.status'") + } +} + +func TestHandleSessionsChanged_ArrayPayload(t *testing.T) { + client, repo, _, capture := newTestWSClient() + defer capture.close() + + repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusIdle} + repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle} + + payload := json.RawMessage(`[ + {"sessionKey":"s1","agentId":"otto","status":"running","totalTokens":100}, + {"sessionKey":"s2","agentId":"dex","status":"streaming","totalTokens":200} + ]`) + + client.handleSessionsChanged(payload) + + repo.mu.Lock() + otto := repo.agents["otto"] + dex := repo.agents["dex"] + repo.mu.Unlock() + + if otto.Status != models.AgentStatusActive { + t.Errorf("otto status = %q, want active", otto.Status) + } + if dex.Status != models.AgentStatusActive { + t.Errorf("dex status = %q, want active", dex.Status) + } + + // Both should produce broadcasts + events := capture.captured() + statusCount := 0 + for _, evt := range events { + if evt.EventType == "agent.status" { + statusCount++ + } + } + if statusCount < 2 { + t.Errorf("expected at least 2 agent.status broadcasts, got %d", statusCount) + } +} + +func TestHandleSessionsChanged_SkipsEmptyAgentID(t *testing.T) { + client, _, _, capture := newTestWSClient() + defer capture.close() + + payload := json.RawMessage(`{"sessionKey":"s1","agentId":"","status":"running"}`) + client.handleSessionsChanged(payload) + + events := capture.captured() + if len(events) > 0 { + t.Errorf("expected no broadcasts for empty agentId, got %d", len(events)) + } +} + +func TestHandleSessionsChanged_UnparseablePayload(t *testing.T) { + client, _, _, capture := newTestWSClient() + defer capture.close() + + payload := json.RawMessage(`not json at all`) + client.handleSessionsChanged(payload) + + events := capture.captured() + if len(events) > 0 { + t.Errorf("expected no broadcasts for unparseable payload, got %d", len(events)) + } +} + +func TestHandlePresence(t *testing.T) { + client, repo, _, capture := newTestWSClient() + defer capture.close() + + repo.agents["pip"] = models.AgentCardData{ + ID: "pip", + DisplayName: "Pip", + Status: models.AgentStatusActive, + } + + payload := json.RawMessage(`{ + "agentId": "pip", + "connected": true, + "lastActivityAt": "2025-01-01T00:00:00Z" + }`) + + client.handlePresence(payload) + + repo.mu.Lock() + agent := repo.agents["pip"] + calls := make([]updateCall, len(repo.updateCalls)) + copy(calls, repo.updateCalls) + repo.mu.Unlock() + + // Agent should still be active (connected=true doesn't change status) + if agent.Status != models.AgentStatusActive { + t.Errorf("agent status = %q, want active", agent.Status) + } + + // Update should have been called (for lastActivityAt) + if len(calls) == 0 { + t.Fatal("expected at least one update call") + } + + // Verify broadcast + events := capture.captured() + found := false + for _, evt := range events { + if evt.EventType == "agent.status" { + found = true + break + } + } + if !found { + t.Error("expected broker broadcast with event type 'agent.status'") + } +} + +func TestHandlePresence_Disconnect(t *testing.T) { + client, repo, _, capture := newTestWSClient() + defer capture.close() + + repo.agents["pip"] = models.AgentCardData{ + ID: "pip", + DisplayName: "Pip", + Status: models.AgentStatusActive, + } + + payload := json.RawMessage(`{ + "agentId": "pip", + "connected": false + }`) + + client.handlePresence(payload) + + repo.mu.Lock() + agent := repo.agents["pip"] + repo.mu.Unlock() + + // Agent should go idle on disconnect + if agent.Status != models.AgentStatusIdle { + t.Errorf("agent status = %q, want idle after disconnect", agent.Status) + } + + events := capture.captured() + found := false + for _, evt := range events { + if evt.EventType == "agent.status" { + found = true + break + } + } + if !found { + t.Error("expected broker broadcast with event type 'agent.status' on disconnect") + } +} + +func TestHandlePresence_EmptyAgentID(t *testing.T) { + client, _, _, capture := newTestWSClient() + defer capture.close() + + payload := json.RawMessage(`{"agentId":"","connected":true}`) + client.handlePresence(payload) + + events := capture.captured() + if len(events) > 0 { + t.Errorf("expected no broadcasts for empty agentId, got %d", len(events)) + } +} + +func TestHandleAgentConfig(t *testing.T) { + client, repo, _, capture := newTestWSClient() + defer capture.close() + + repo.agents["rex"] = models.AgentCardData{ + ID: "rex", + DisplayName: "Rex", + Role: "Frontend Dev", + Status: models.AgentStatusIdle, + Channel: "discord", + } + + payload := json.RawMessage(`{ + "id": "rex", + "name": "Rex the Dev", + "role": "Senior Frontend", + "channel": "telegram" + }`) + + client.handleAgentConfig(payload) + + repo.mu.Lock() + agent := repo.agents["rex"] + calls := make([]updateCall, len(repo.updateCalls)) + copy(calls, repo.updateCalls) + repo.mu.Unlock() + + // Verify DisplayName and Role updated + if agent.DisplayName != "Rex the Dev" { + t.Errorf("displayName = %q, want %q", agent.DisplayName, "Rex the Dev") + } + if agent.Role != "Senior Frontend" { + t.Errorf("role = %q, want %q", agent.Role, "Senior Frontend") + } + if agent.Channel != "telegram" { + t.Errorf("channel = %q, want %q", agent.Channel, "telegram") + } + + // Verify update was called + if len(calls) == 0 { + t.Fatal("expected at least one update call") + } + + // Verify broker fires "fleet.update" + events := capture.captured() + found := false + for _, evt := range events { + if evt.EventType == "fleet.update" { + found = true + break + } + } + if !found { + t.Error("expected broker broadcast with event type 'fleet.update'") + } +} + +func TestHandleAgentConfig_EmptyID(t *testing.T) { + client, _, _, capture := newTestWSClient() + defer capture.close() + + payload := json.RawMessage(`{"id":"","name":"Ghost"}`) + client.handleAgentConfig(payload) + + events := capture.captured() + if len(events) > 0 { + t.Errorf("expected no broadcasts for empty id, got %d", len(events)) + } +} + +func TestHandleAgentConfig_NotFound(t *testing.T) { + client, _, _, capture := newTestWSClient() + defer capture.close() + + payload := json.RawMessage(`{"id":"unknown","name":"Ghost","role":"Phantom"}`) + client.handleAgentConfig(payload) + + // Agent doesn't exist in repo, so Update will fail → handler logs warning, returns early + events := capture.captured() + for _, evt := range events { + if evt.EventType == "fleet.update" { + t.Error("fleet.update should not be broadcast when agent update fails") + } + } +} \ No newline at end of file diff --git a/go-backend/internal/gateway/sync_test.go b/go-backend/internal/gateway/sync_test.go new file mode 100644 index 0000000..3d2b5c4 --- /dev/null +++ b/go-backend/internal/gateway/sync_test.go @@ -0,0 +1,236 @@ +package gateway + +import ( + "context" + "testing" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +func TestInitialSync(t *testing.T) { + _ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + // --- Test agentItemToCard + session merge (the core of initialSync) --- + + agentItems := []agentListItem{ + {ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"}, + {ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"}, + } + + sessionItems := []sessionListItem{ + {SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"}, + {SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"}, + } + + // Build sessionByAgent map (mirrors initialSync logic) + sessionByAgent := make(map[string]sessionListItem) + for _, s := range sessionItems { + if s.AgentID != "" { + sessionByAgent[s.AgentID] = s + } + } + + // Merge and verify + merged := make([]models.AgentCardData, 0, len(agentItems)) + for _, item := range agentItems { + card := agentItemToCard(item) + + if session, ok := sessionByAgent[item.ID]; ok { + card.SessionKey = session.SessionKey + card.Status = mapSessionStatus(session.Status) + card.LastActivity = session.LastActivityAt + + if session.TotalTokens > 0 { + prog := min(session.TotalTokens/100, 100) + card.TaskProgress = &prog + } + } + + merged = append(merged, card) + } + + // Verify otto: running → active + if merged[0].ID != "otto" { + t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto") + } + if merged[0].Status != models.AgentStatusActive { + t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive) + } + if merged[0].SessionKey != "s1" { + t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1") + } + if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 { + t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress) + } + + // Verify dex: done → idle + if merged[1].ID != "dex" { + t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex") + } + if merged[1].Status != models.AgentStatusIdle { + t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle) + } + if merged[1].SessionKey != "s2" { + t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2") + } +} + +func TestInitialSync_PersistCreatesNew(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + // Simulate the persist logic from initialSync: + // new agents should be created + card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"}) + + ctx := context.Background() + + // Agent doesn't exist → create + _, err := repo.Get(ctx, card.ID) + if err == nil { + t.Fatal("expected agent to not exist yet") + } + + if err := repo.Create(ctx, card); err != nil { + t.Fatalf("Create failed: %v", err) + } + + got, err := repo.Get(ctx, card.ID) + if err != nil { + t.Fatalf("Get after Create failed: %v", err) + } + + if got.ID != "otto" { + t.Errorf("got.ID = %q, want %q", got.ID, "otto") + } + if got.DisplayName != "Otto" { + t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto") + } + if got.Role != "Orchestrator" { + t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator") + } +} + +func TestInitialSync_PersistUpdatesExisting(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + ctx := context.Background() + + // Pre-populate with existing agent + repo.agents["otto"] = models.AgentCardData{ + ID: "otto", + DisplayName: "Otto", + Role: "Old Role", + Status: models.AgentStatusIdle, + } + + // Simulate initialSync: agent exists, name/role changed → update + newName := "Otto Prime" + newRole := "Super Orchestrator" + _, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{ + DisplayName: &newName, + Role: &newRole, + }) + if err != nil { + t.Fatalf("Update failed: %v", err) + } + + got, err := repo.Get(ctx, "otto") + if err != nil { + t.Fatalf("Get after Update failed: %v", err) + } + + if got.DisplayName != "Otto Prime" { + t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime") + } + if got.Role != "Super Orchestrator" { + t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator") + } +} + +func TestInitialSync_MergesSessionStatus(t *testing.T) { + // When initialSync merges session state, an agent whose existing status + // differs from the session-derived status should be updated. + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + ctx := context.Background() + + repo.agents["otto"] = models.AgentCardData{ + ID: "otto", + DisplayName: "Otto", + Role: "Orchestrator", + Status: models.AgentStatusIdle, + } + + // Simulate session merge: session says "running" → agent should go active + activeStatus := mapSessionStatus("running") + if activeStatus != models.AgentStatusActive { + t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus) + } + + _, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{ + Status: &activeStatus, + }) + if err != nil { + t.Fatalf("Update failed: %v", err) + } + + got, err := repo.Get(ctx, "otto") + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if got.Status != models.AgentStatusActive { + t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive) + } +} + +func TestInitialSync_BroadcastsFleet(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + // Create some agents in the repo + repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive} + repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle} + + // Simulate the final broadcast from initialSync + mergedAgents := []models.AgentCardData{ + repo.agents["otto"], + repo.agents["dex"], + } + broker.Broadcast("fleet.update", mergedAgents) + + events := capture.captured() + if len(events) == 0 { + t.Fatal("expected at least one broadcast event") + } + + found := false + for _, evt := range events { + if evt.EventType == "fleet.update" { + found = true + // Verify data is the merged agents list + agents, ok := evt.Data.([]models.AgentCardData) + if !ok { + t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data) + } + if len(agents) != 2 { + t.Errorf("fleet.update agents count = %d, want 2", len(agents)) + } + break + } + } + if !found { + t.Error("expected fleet.update broadcast event") + } +} \ No newline at end of file