diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index db7f3b9..709882e 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -229,26 +229,31 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.connId = helloOK.ConnID c.connMu.Unlock() - // Step 2b: Start the read loop in a goroutine so that Send() in + // Step 2b: Register live event handlers BEFORE starting the read + // loop. This eliminates the race window where readLoop dispatches + // live events as "unhandled" because no handlers are registered yet. + // The handlers only depend on c.agents and c.broker, which are wired + // in the constructor — they do not need initialSync to have completed. + c.registerEventHandlers() + + // Step 2c: Start the read loop in a goroutine so that Send() in // initialSync can receive responses. The read loop goroutine will // continue running after initialSync completes, routing live events - // and any future RPC responses. + // and any future RPC responses. Because handlers are already + // registered, any events arriving during or after initialSync are + // dispatched correctly. readLoopErrCh := make(chan error, 1) go func() { readLoopErrCh <- c.readLoop(ctx, conn) }() - // Step 2c: Initial sync — fetch agents + sessions from gateway. - // This now works because the read loop is active and will route + // Step 2d: Initial sync — fetch agents + sessions from gateway. + // This works because the read loop is active and will route // response frames back to Send() via handleResponse. if err := c.initialSync(ctx); err != nil { c.logger.Warn("initial sync failed, will continue with read loop", "error", err) } - // Step 2d: Register live event handlers (read loop is already - // active, so events will be dispatched immediately) - c.registerEventHandlers() - // Notify REST client that WS is live so it stands down. // This must happen AFTER initialSync so that the REST poller // doesn't start polling while we're still syncing. diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go index 300a32d..8e37a6a 100644 --- a/go-backend/internal/gateway/wsclient_test.go +++ b/go-backend/internal/gateway/wsclient_test.go @@ -565,6 +565,138 @@ func TestConnectAndRun_InitialSyncOrdering(t *testing.T) { } } +// ── 7. Test: Event not lost during initial sync (regression) ─────────────── + +// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events +// arriving during initial sync are NOT dropped. This is a regression test +// for the race where readLoop started before registerEventHandlers(), +// causing events read during that window to be logged as "unhandled" and lost. +// +// The mock server sends a live event (sessions.changed) right after the +// handshake, interleaved with the RPC responses for agents.list and +// sessions.list. The test asserts the event is received by the handler. +func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + // Pre-seed an agent so the event handler can update it. + repo.agents["otto"] = models.AgentCardData{ + ID: "otto", + DisplayName: "Otto", + Status: models.AgentStatusIdle, + } + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + // Handshake + handleHandshake(t, conn) + + // After handshake, process RPCs and inject a live event. + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + // Respond to agents.list RPC + if method == "agents.list" { + // Before responding, inject a live event — simulates + // a gateway pushing a presence update during sync. + evt := map[string]any{ + "type": "event", + "event": "presence", + "params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"}, + } + if err := conn.WriteJSON(evt); err != nil { + break + } + + // Now send the RPC response + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": []map[string]any{ + {"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"}, + }, + } + if err := conn.WriteJSON(res); err != nil { + break + } + continue + } + + // Respond to sessions.list RPC + if method == "sessions.list" { + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": []map[string]any{}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + continue + } + + // Default response for other methods + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": map[string]any{}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait for the presence event to be processed by checking the repo. + // The presence handler updates the agent, so we check for the + // lastActivityAt change. + deadline := time.Now().Add(5 * time.Second) + var lastActivity string + for time.Now().Before(deadline) { + repo.mu.Lock() + if a, ok := repo.agents["otto"]; ok { + lastActivity = a.LastActivity + } + repo.mu.Unlock() + if lastActivity == "2025-05-20T12:30:00Z" { + break + } + time.Sleep(50 * time.Millisecond) + } + + if lastActivity != "2025-05-20T12:30:00Z" { + t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z") + } + + cancel() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly") + } +} + func TestStrPtr(t *testing.T) { s := "hello" p := strPtr(s)