CUB-207: unit tests for event handlers and initial sync #44
516
go-backend/internal/gateway/events_test.go
Normal file
516
go-backend/internal/gateway/events_test.go
Normal file
@@ -0,0 +1,516 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||
)
|
||||
|
||||
// ── Mock AgentRepo ────────────────────────────────────────────────────────
|
||||
|
||||
type mockAgentRepo struct {
|
||||
mu sync.Mutex
|
||||
agents map[string]models.AgentCardData
|
||||
updateCalls []updateCall
|
||||
}
|
||||
|
||||
type updateCall struct {
|
||||
id string
|
||||
req models.UpdateAgentRequest
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Get(_ context.Context, id string) (models.AgentCardData, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
a, ok := m.agents[id]
|
||||
if !ok {
|
||||
return models.AgentCardData{}, errNotFound
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Update(_ context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
a, ok := m.agents[id]
|
||||
if !ok {
|
||||
return models.AgentCardData{}, errNotFound
|
||||
}
|
||||
|
||||
if req.Status != nil {
|
||||
a.Status = *req.Status
|
||||
}
|
||||
if req.DisplayName != nil {
|
||||
a.DisplayName = *req.DisplayName
|
||||
}
|
||||
if req.Role != nil {
|
||||
a.Role = *req.Role
|
||||
}
|
||||
if req.Channel != nil {
|
||||
a.Channel = *req.Channel
|
||||
}
|
||||
if req.CurrentTask != nil {
|
||||
a.CurrentTask = req.CurrentTask
|
||||
}
|
||||
if req.TaskProgress != nil {
|
||||
a.TaskProgress = req.TaskProgress
|
||||
}
|
||||
if req.TaskElapsed != nil {
|
||||
a.TaskElapsed = req.TaskElapsed
|
||||
}
|
||||
if req.ErrorMessage != nil {
|
||||
a.ErrorMessage = req.ErrorMessage
|
||||
}
|
||||
if req.LastActivityAt != nil {
|
||||
a.LastActivity = *req.LastActivityAt
|
||||
}
|
||||
|
||||
m.agents[id] = a
|
||||
m.updateCalls = append(m.updateCalls, updateCall{id, req})
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Create(_ context.Context, a models.AgentCardData) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.agents[a.ID] = a
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) List(_ context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
var result []models.AgentCardData
|
||||
for _, a := range m.agents {
|
||||
if statusFilter == "" || a.Status == statusFilter {
|
||||
result = append(result, a)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Delete(_ context.Context, id string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.agents, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockAgentRepo) Count(_ context.Context) (int, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return len(m.agents), nil
|
||||
}
|
||||
|
||||
// errNotFound is returned by the mock repo when an agent is not found.
|
||||
var errNotFound = fmt.Errorf("not found")
|
||||
|
||||
// ── Broadcast capture helper ───────────────────────────────────────────────
|
||||
|
||||
// broadcastCapture wraps a real Broker and captures all broadcasts
|
||||
// via a subscribed channel. Use captured() to retrieve events that have
|
||||
// been received so far. Call close() to unsubscribe when done.
|
||||
type broadcastCapture struct {
|
||||
broker *handler.Broker
|
||||
ch chan handler.SSEEvent
|
||||
}
|
||||
|
||||
func newBroadcastCapture(broker *handler.Broker) *broadcastCapture {
|
||||
return &broadcastCapture{
|
||||
broker: broker,
|
||||
ch: broker.Subscribe(),
|
||||
}
|
||||
}
|
||||
|
||||
// captured drains all pending events from the subscription channel
|
||||
// and returns them. This is synchronous — it only returns events that
|
||||
// have already been sent to the channel.
|
||||
func (bc *broadcastCapture) captured() []handler.SSEEvent {
|
||||
var events []handler.SSEEvent
|
||||
for {
|
||||
select {
|
||||
case evt := <-bc.ch:
|
||||
events = append(events, evt)
|
||||
default:
|
||||
return events
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bc *broadcastCapture) close() {
|
||||
bc.broker.Unsubscribe(bc.ch)
|
||||
}
|
||||
|
||||
// ── Test helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
// newTestWSClient creates a WSClient wired to a mock repo and a real broker.
|
||||
// Returns the client, the mock repo, and a broadcast capture.
|
||||
func newTestWSClient() (*WSClient, *mockAgentRepo, *handler.Broker, *broadcastCapture) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
client := NewWSClient(WSConfig{}, repo, broker, slog.Default())
|
||||
return client, repo, broker, capture
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestHandleSessionsChanged_Active(t *testing.T) {
|
||||
client, repo, broker, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["otto"] = models.AgentCardData{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Status: models.AgentStatusIdle,
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"sessionKey": "s1",
|
||||
"agentId": "otto",
|
||||
"status": "running",
|
||||
"totalTokens": 500,
|
||||
"currentTask": "Orchestrating tasks"
|
||||
}`)
|
||||
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
// Verify: agent status updated to active
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["otto"]
|
||||
calls := make([]updateCall, len(repo.updateCalls))
|
||||
copy(calls, repo.updateCalls)
|
||||
repo.mu.Unlock()
|
||||
|
||||
if agent.Status != models.AgentStatusActive {
|
||||
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusActive)
|
||||
}
|
||||
|
||||
// Verify: update was called
|
||||
if len(calls) == 0 {
|
||||
t.Fatal("expected at least one update call")
|
||||
}
|
||||
if calls[0].id != "otto" {
|
||||
t.Errorf("update call agentId = %q, want %q", calls[0].id, "otto")
|
||||
}
|
||||
|
||||
// Verify: broker broadcast "agent.status"
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSessionsChanged_Idle(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["dex"] = models.AgentCardData{
|
||||
ID: "dex",
|
||||
DisplayName: "Dex",
|
||||
Status: models.AgentStatusActive,
|
||||
CurrentTask: strPtr("Writing API"),
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"sessionKey": "s2",
|
||||
"agentId": "dex",
|
||||
"status": "done",
|
||||
"totalTokens": 1000
|
||||
}`)
|
||||
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["dex"]
|
||||
repo.mu.Unlock()
|
||||
|
||||
// Verify: agent goes idle
|
||||
if agent.Status != models.AgentStatusIdle {
|
||||
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusIdle)
|
||||
}
|
||||
|
||||
// Verify: current task cleared (set to empty string)
|
||||
if agent.CurrentTask != nil && *agent.CurrentTask != "" {
|
||||
t.Errorf("current task = %q, want empty (cleared on idle)", *agent.CurrentTask)
|
||||
}
|
||||
|
||||
// Verify: broker fires "agent.status"
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSessionsChanged_ArrayPayload(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusIdle}
|
||||
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
|
||||
|
||||
payload := json.RawMessage(`[
|
||||
{"sessionKey":"s1","agentId":"otto","status":"running","totalTokens":100},
|
||||
{"sessionKey":"s2","agentId":"dex","status":"streaming","totalTokens":200}
|
||||
]`)
|
||||
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
otto := repo.agents["otto"]
|
||||
dex := repo.agents["dex"]
|
||||
repo.mu.Unlock()
|
||||
|
||||
if otto.Status != models.AgentStatusActive {
|
||||
t.Errorf("otto status = %q, want active", otto.Status)
|
||||
}
|
||||
if dex.Status != models.AgentStatusActive {
|
||||
t.Errorf("dex status = %q, want active", dex.Status)
|
||||
}
|
||||
|
||||
// Both should produce broadcasts
|
||||
events := capture.captured()
|
||||
statusCount := 0
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
statusCount++
|
||||
}
|
||||
}
|
||||
if statusCount < 2 {
|
||||
t.Errorf("expected at least 2 agent.status broadcasts, got %d", statusCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSessionsChanged_SkipsEmptyAgentID(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`{"sessionKey":"s1","agentId":"","status":"running"}`)
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) > 0 {
|
||||
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSessionsChanged_UnparseablePayload(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`not json at all`)
|
||||
client.handleSessionsChanged(payload)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) > 0 {
|
||||
t.Errorf("expected no broadcasts for unparseable payload, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlePresence(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["pip"] = models.AgentCardData{
|
||||
ID: "pip",
|
||||
DisplayName: "Pip",
|
||||
Status: models.AgentStatusActive,
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"agentId": "pip",
|
||||
"connected": true,
|
||||
"lastActivityAt": "2025-01-01T00:00:00Z"
|
||||
}`)
|
||||
|
||||
client.handlePresence(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["pip"]
|
||||
calls := make([]updateCall, len(repo.updateCalls))
|
||||
copy(calls, repo.updateCalls)
|
||||
repo.mu.Unlock()
|
||||
|
||||
// Agent should still be active (connected=true doesn't change status)
|
||||
if agent.Status != models.AgentStatusActive {
|
||||
t.Errorf("agent status = %q, want active", agent.Status)
|
||||
}
|
||||
|
||||
// Update should have been called (for lastActivityAt)
|
||||
if len(calls) == 0 {
|
||||
t.Fatal("expected at least one update call")
|
||||
}
|
||||
|
||||
// Verify broadcast
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'agent.status'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlePresence_Disconnect(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["pip"] = models.AgentCardData{
|
||||
ID: "pip",
|
||||
DisplayName: "Pip",
|
||||
Status: models.AgentStatusActive,
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"agentId": "pip",
|
||||
"connected": false
|
||||
}`)
|
||||
|
||||
client.handlePresence(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["pip"]
|
||||
repo.mu.Unlock()
|
||||
|
||||
// Agent should go idle on disconnect
|
||||
if agent.Status != models.AgentStatusIdle {
|
||||
t.Errorf("agent status = %q, want idle after disconnect", agent.Status)
|
||||
}
|
||||
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "agent.status" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'agent.status' on disconnect")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlePresence_EmptyAgentID(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`{"agentId":"","connected":true}`)
|
||||
client.handlePresence(payload)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) > 0 {
|
||||
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleAgentConfig(t *testing.T) {
|
||||
client, repo, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
repo.agents["rex"] = models.AgentCardData{
|
||||
ID: "rex",
|
||||
DisplayName: "Rex",
|
||||
Role: "Frontend Dev",
|
||||
Status: models.AgentStatusIdle,
|
||||
Channel: "discord",
|
||||
}
|
||||
|
||||
payload := json.RawMessage(`{
|
||||
"id": "rex",
|
||||
"name": "Rex the Dev",
|
||||
"role": "Senior Frontend",
|
||||
"channel": "telegram"
|
||||
}`)
|
||||
|
||||
client.handleAgentConfig(payload)
|
||||
|
||||
repo.mu.Lock()
|
||||
agent := repo.agents["rex"]
|
||||
calls := make([]updateCall, len(repo.updateCalls))
|
||||
copy(calls, repo.updateCalls)
|
||||
repo.mu.Unlock()
|
||||
|
||||
// Verify DisplayName and Role updated
|
||||
if agent.DisplayName != "Rex the Dev" {
|
||||
t.Errorf("displayName = %q, want %q", agent.DisplayName, "Rex the Dev")
|
||||
}
|
||||
if agent.Role != "Senior Frontend" {
|
||||
t.Errorf("role = %q, want %q", agent.Role, "Senior Frontend")
|
||||
}
|
||||
if agent.Channel != "telegram" {
|
||||
t.Errorf("channel = %q, want %q", agent.Channel, "telegram")
|
||||
}
|
||||
|
||||
// Verify update was called
|
||||
if len(calls) == 0 {
|
||||
t.Fatal("expected at least one update call")
|
||||
}
|
||||
|
||||
// Verify broker fires "fleet.update"
|
||||
events := capture.captured()
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "fleet.update" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected broker broadcast with event type 'fleet.update'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleAgentConfig_EmptyID(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`{"id":"","name":"Ghost"}`)
|
||||
client.handleAgentConfig(payload)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) > 0 {
|
||||
t.Errorf("expected no broadcasts for empty id, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleAgentConfig_NotFound(t *testing.T) {
|
||||
client, _, _, capture := newTestWSClient()
|
||||
defer capture.close()
|
||||
|
||||
payload := json.RawMessage(`{"id":"unknown","name":"Ghost","role":"Phantom"}`)
|
||||
client.handleAgentConfig(payload)
|
||||
|
||||
// Agent doesn't exist in repo, so Update will fail → handler logs warning, returns early
|
||||
events := capture.captured()
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "fleet.update" {
|
||||
t.Error("fleet.update should not be broadcast when agent update fails")
|
||||
}
|
||||
}
|
||||
}
|
||||
236
go-backend/internal/gateway/sync_test.go
Normal file
236
go-backend/internal/gateway/sync_test.go
Normal file
@@ -0,0 +1,236 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||
)
|
||||
|
||||
func TestInitialSync(t *testing.T) {
|
||||
_ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
// --- Test agentItemToCard + session merge (the core of initialSync) ---
|
||||
|
||||
agentItems := []agentListItem{
|
||||
{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"},
|
||||
{ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"},
|
||||
}
|
||||
|
||||
sessionItems := []sessionListItem{
|
||||
{SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"},
|
||||
{SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"},
|
||||
}
|
||||
|
||||
// Build sessionByAgent map (mirrors initialSync logic)
|
||||
sessionByAgent := make(map[string]sessionListItem)
|
||||
for _, s := range sessionItems {
|
||||
if s.AgentID != "" {
|
||||
sessionByAgent[s.AgentID] = s
|
||||
}
|
||||
}
|
||||
|
||||
// Merge and verify
|
||||
merged := make([]models.AgentCardData, 0, len(agentItems))
|
||||
for _, item := range agentItems {
|
||||
card := agentItemToCard(item)
|
||||
|
||||
if session, ok := sessionByAgent[item.ID]; ok {
|
||||
card.SessionKey = session.SessionKey
|
||||
card.Status = mapSessionStatus(session.Status)
|
||||
card.LastActivity = session.LastActivityAt
|
||||
|
||||
if session.TotalTokens > 0 {
|
||||
prog := min(session.TotalTokens/100, 100)
|
||||
card.TaskProgress = &prog
|
||||
}
|
||||
}
|
||||
|
||||
merged = append(merged, card)
|
||||
}
|
||||
|
||||
// Verify otto: running → active
|
||||
if merged[0].ID != "otto" {
|
||||
t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto")
|
||||
}
|
||||
if merged[0].Status != models.AgentStatusActive {
|
||||
t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive)
|
||||
}
|
||||
if merged[0].SessionKey != "s1" {
|
||||
t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1")
|
||||
}
|
||||
if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 {
|
||||
t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress)
|
||||
}
|
||||
|
||||
// Verify dex: done → idle
|
||||
if merged[1].ID != "dex" {
|
||||
t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex")
|
||||
}
|
||||
if merged[1].Status != models.AgentStatusIdle {
|
||||
t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle)
|
||||
}
|
||||
if merged[1].SessionKey != "s2" {
|
||||
t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialSync_PersistCreatesNew(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
// Simulate the persist logic from initialSync:
|
||||
// new agents should be created
|
||||
card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"})
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Agent doesn't exist → create
|
||||
_, err := repo.Get(ctx, card.ID)
|
||||
if err == nil {
|
||||
t.Fatal("expected agent to not exist yet")
|
||||
}
|
||||
|
||||
if err := repo.Create(ctx, card); err != nil {
|
||||
t.Fatalf("Create failed: %v", err)
|
||||
}
|
||||
|
||||
got, err := repo.Get(ctx, card.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("Get after Create failed: %v", err)
|
||||
}
|
||||
|
||||
if got.ID != "otto" {
|
||||
t.Errorf("got.ID = %q, want %q", got.ID, "otto")
|
||||
}
|
||||
if got.DisplayName != "Otto" {
|
||||
t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto")
|
||||
}
|
||||
if got.Role != "Orchestrator" {
|
||||
t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialSync_PersistUpdatesExisting(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Pre-populate with existing agent
|
||||
repo.agents["otto"] = models.AgentCardData{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Role: "Old Role",
|
||||
Status: models.AgentStatusIdle,
|
||||
}
|
||||
|
||||
// Simulate initialSync: agent exists, name/role changed → update
|
||||
newName := "Otto Prime"
|
||||
newRole := "Super Orchestrator"
|
||||
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
|
||||
DisplayName: &newName,
|
||||
Role: &newRole,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Update failed: %v", err)
|
||||
}
|
||||
|
||||
got, err := repo.Get(ctx, "otto")
|
||||
if err != nil {
|
||||
t.Fatalf("Get after Update failed: %v", err)
|
||||
}
|
||||
|
||||
if got.DisplayName != "Otto Prime" {
|
||||
t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime")
|
||||
}
|
||||
if got.Role != "Super Orchestrator" {
|
||||
t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialSync_MergesSessionStatus(t *testing.T) {
|
||||
// When initialSync merges session state, an agent whose existing status
|
||||
// differs from the session-derived status should be updated.
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
ctx := context.Background()
|
||||
|
||||
repo.agents["otto"] = models.AgentCardData{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Role: "Orchestrator",
|
||||
Status: models.AgentStatusIdle,
|
||||
}
|
||||
|
||||
// Simulate session merge: session says "running" → agent should go active
|
||||
activeStatus := mapSessionStatus("running")
|
||||
if activeStatus != models.AgentStatusActive {
|
||||
t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus)
|
||||
}
|
||||
|
||||
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
|
||||
Status: &activeStatus,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Update failed: %v", err)
|
||||
}
|
||||
|
||||
got, err := repo.Get(ctx, "otto")
|
||||
if err != nil {
|
||||
t.Fatalf("Get failed: %v", err)
|
||||
}
|
||||
|
||||
if got.Status != models.AgentStatusActive {
|
||||
t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialSync_BroadcastsFleet(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
// Create some agents in the repo
|
||||
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive}
|
||||
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
|
||||
|
||||
// Simulate the final broadcast from initialSync
|
||||
mergedAgents := []models.AgentCardData{
|
||||
repo.agents["otto"],
|
||||
repo.agents["dex"],
|
||||
}
|
||||
broker.Broadcast("fleet.update", mergedAgents)
|
||||
|
||||
events := capture.captured()
|
||||
if len(events) == 0 {
|
||||
t.Fatal("expected at least one broadcast event")
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, evt := range events {
|
||||
if evt.EventType == "fleet.update" {
|
||||
found = true
|
||||
// Verify data is the merged agents list
|
||||
agents, ok := evt.Data.([]models.AgentCardData)
|
||||
if !ok {
|
||||
t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data)
|
||||
}
|
||||
if len(agents) != 2 {
|
||||
t.Errorf("fleet.update agents count = %d, want 2", len(agents))
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("expected fleet.update broadcast event")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user