From b7b05bb4e341201a31122a2d0cb2aae397d70fc8 Mon Sep 17 00:00:00 2001 From: rex-bot Date: Wed, 20 May 2026 21:52:39 +0000 Subject: [PATCH] =?UTF-8?q?CUB-200:=20fix=20event-loss=20race=20=E2=80=94?= =?UTF-8?q?=20register=20handlers=20before=20readLoop=20starts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move registerEventHandlers() call before the readLoop goroutine starts in connectAndRun(). This eliminates the startup window where live gateway events were actively read and dropped as 'unhandled' because handler registration happened only after initialSync completed. The handlers only depend on c.agents and c.broker, which are wired in the constructor — they do not require initialSync to have completed. Also adds TestConnectAndRun_EventNotLostDuringSync regression test that sends a live presence event during initial sync and asserts it is not lost. All gateway tests pass with -race. --- go-backend/internal/gateway/wsclient.go | 21 +-- go-backend/internal/gateway/wsclient_test.go | 132 +++++++++++++++++++ 2 files changed, 145 insertions(+), 8 deletions(-) diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index db7f3b9..709882e 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -229,26 +229,31 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.connId = helloOK.ConnID c.connMu.Unlock() - // Step 2b: Start the read loop in a goroutine so that Send() in + // Step 2b: Register live event handlers BEFORE starting the read + // loop. This eliminates the race window where readLoop dispatches + // live events as "unhandled" because no handlers are registered yet. + // The handlers only depend on c.agents and c.broker, which are wired + // in the constructor — they do not need initialSync to have completed. + c.registerEventHandlers() + + // Step 2c: Start the read loop in a goroutine so that Send() in // initialSync can receive responses. The read loop goroutine will // continue running after initialSync completes, routing live events - // and any future RPC responses. + // and any future RPC responses. Because handlers are already + // registered, any events arriving during or after initialSync are + // dispatched correctly. readLoopErrCh := make(chan error, 1) go func() { readLoopErrCh <- c.readLoop(ctx, conn) }() - // Step 2c: Initial sync — fetch agents + sessions from gateway. - // This now works because the read loop is active and will route + // Step 2d: Initial sync — fetch agents + sessions from gateway. + // This works because the read loop is active and will route // response frames back to Send() via handleResponse. if err := c.initialSync(ctx); err != nil { c.logger.Warn("initial sync failed, will continue with read loop", "error", err) } - // Step 2d: Register live event handlers (read loop is already - // active, so events will be dispatched immediately) - c.registerEventHandlers() - // Notify REST client that WS is live so it stands down. // This must happen AFTER initialSync so that the REST poller // doesn't start polling while we're still syncing. diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go index 300a32d..8e37a6a 100644 --- a/go-backend/internal/gateway/wsclient_test.go +++ b/go-backend/internal/gateway/wsclient_test.go @@ -565,6 +565,138 @@ func TestConnectAndRun_InitialSyncOrdering(t *testing.T) { } } +// ── 7. Test: Event not lost during initial sync (regression) ─────────────── + +// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events +// arriving during initial sync are NOT dropped. This is a regression test +// for the race where readLoop started before registerEventHandlers(), +// causing events read during that window to be logged as "unhandled" and lost. +// +// The mock server sends a live event (sessions.changed) right after the +// handshake, interleaved with the RPC responses for agents.list and +// sessions.list. The test asserts the event is received by the handler. +func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + // Pre-seed an agent so the event handler can update it. + repo.agents["otto"] = models.AgentCardData{ + ID: "otto", + DisplayName: "Otto", + Status: models.AgentStatusIdle, + } + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + // Handshake + handleHandshake(t, conn) + + // After handshake, process RPCs and inject a live event. + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + // Respond to agents.list RPC + if method == "agents.list" { + // Before responding, inject a live event — simulates + // a gateway pushing a presence update during sync. + evt := map[string]any{ + "type": "event", + "event": "presence", + "params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"}, + } + if err := conn.WriteJSON(evt); err != nil { + break + } + + // Now send the RPC response + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": []map[string]any{ + {"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"}, + }, + } + if err := conn.WriteJSON(res); err != nil { + break + } + continue + } + + // Respond to sessions.list RPC + if method == "sessions.list" { + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": []map[string]any{}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + continue + } + + // Default response for other methods + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": map[string]any{}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait for the presence event to be processed by checking the repo. + // The presence handler updates the agent, so we check for the + // lastActivityAt change. + deadline := time.Now().Add(5 * time.Second) + var lastActivity string + for time.Now().Before(deadline) { + repo.mu.Lock() + if a, ok := repo.agents["otto"]; ok { + lastActivity = a.LastActivity + } + repo.mu.Unlock() + if lastActivity == "2025-05-20T12:30:00Z" { + break + } + time.Sleep(50 * time.Millisecond) + } + + if lastActivity != "2025-05-20T12:30:00Z" { + t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z") + } + + cancel() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly") + } +} + func TestStrPtr(t *testing.T) { s := "hello" p := strPtr(s)