CUB-200: fix event-loss race — register handlers before readLoop starts
Move registerEventHandlers() call before the readLoop goroutine starts in connectAndRun(). This eliminates the startup window where live gateway events were actively read and dropped as 'unhandled' because handler registration happened only after initialSync completed. The handlers only depend on c.agents and c.broker, which are wired in the constructor — they do not require initialSync to have completed. Also adds TestConnectAndRun_EventNotLostDuringSync regression test that sends a live presence event during initial sync and asserts it is not lost. All gateway tests pass with -race.
This commit is contained in:
@@ -229,26 +229,31 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
||||
c.connId = helloOK.ConnID
|
||||
c.connMu.Unlock()
|
||||
|
||||
// Step 2b: Start the read loop in a goroutine so that Send() in
|
||||
// Step 2b: Register live event handlers BEFORE starting the read
|
||||
// loop. This eliminates the race window where readLoop dispatches
|
||||
// live events as "unhandled" because no handlers are registered yet.
|
||||
// The handlers only depend on c.agents and c.broker, which are wired
|
||||
// in the constructor — they do not need initialSync to have completed.
|
||||
c.registerEventHandlers()
|
||||
|
||||
// Step 2c: Start the read loop in a goroutine so that Send() in
|
||||
// initialSync can receive responses. The read loop goroutine will
|
||||
// continue running after initialSync completes, routing live events
|
||||
// and any future RPC responses.
|
||||
// and any future RPC responses. Because handlers are already
|
||||
// registered, any events arriving during or after initialSync are
|
||||
// dispatched correctly.
|
||||
readLoopErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
readLoopErrCh <- c.readLoop(ctx, conn)
|
||||
}()
|
||||
|
||||
// Step 2c: Initial sync — fetch agents + sessions from gateway.
|
||||
// This now works because the read loop is active and will route
|
||||
// Step 2d: Initial sync — fetch agents + sessions from gateway.
|
||||
// This works because the read loop is active and will route
|
||||
// response frames back to Send() via handleResponse.
|
||||
if err := c.initialSync(ctx); err != nil {
|
||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||
}
|
||||
|
||||
// Step 2d: Register live event handlers (read loop is already
|
||||
// active, so events will be dispatched immediately)
|
||||
c.registerEventHandlers()
|
||||
|
||||
// Notify REST client that WS is live so it stands down.
|
||||
// This must happen AFTER initialSync so that the REST poller
|
||||
// doesn't start polling while we're still syncing.
|
||||
|
||||
@@ -565,6 +565,138 @@ func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
|
||||
|
||||
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
|
||||
// arriving during initial sync are NOT dropped. This is a regression test
|
||||
// for the race where readLoop started before registerEventHandlers(),
|
||||
// causing events read during that window to be logged as "unhandled" and lost.
|
||||
//
|
||||
// The mock server sends a live event (sessions.changed) right after the
|
||||
// handshake, interleaved with the RPC responses for agents.list and
|
||||
// sessions.list. The test asserts the event is received by the handler.
|
||||
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
|
||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
||||
broker := handler.NewBroker()
|
||||
capture := newBroadcastCapture(broker)
|
||||
defer capture.close()
|
||||
|
||||
// Pre-seed an agent so the event handler can update it.
|
||||
repo.agents["otto"] = models.AgentCardData{
|
||||
ID: "otto",
|
||||
DisplayName: "Otto",
|
||||
Status: models.AgentStatusIdle,
|
||||
}
|
||||
|
||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
||||
// Handshake
|
||||
handleHandshake(t, conn)
|
||||
|
||||
// After handshake, process RPCs and inject a live event.
|
||||
for {
|
||||
var req map[string]any
|
||||
if err := conn.ReadJSON(&req); err != nil {
|
||||
break
|
||||
}
|
||||
reqID, _ := req["id"].(string)
|
||||
method, _ := req["method"].(string)
|
||||
|
||||
// Respond to agents.list RPC
|
||||
if method == "agents.list" {
|
||||
// Before responding, inject a live event — simulates
|
||||
// a gateway pushing a presence update during sync.
|
||||
evt := map[string]any{
|
||||
"type": "event",
|
||||
"event": "presence",
|
||||
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
|
||||
}
|
||||
if err := conn.WriteJSON(evt); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Now send the RPC response
|
||||
res := map[string]any{
|
||||
"type": "res",
|
||||
"id": reqID,
|
||||
"ok": true,
|
||||
"result": []map[string]any{
|
||||
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
||||
},
|
||||
}
|
||||
if err := conn.WriteJSON(res); err != nil {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Respond to sessions.list RPC
|
||||
if method == "sessions.list" {
|
||||
res := map[string]any{
|
||||
"type": "res",
|
||||
"id": reqID,
|
||||
"ok": true,
|
||||
"result": []map[string]any{},
|
||||
}
|
||||
if err := conn.WriteJSON(res); err != nil {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Default response for other methods
|
||||
res := map[string]any{
|
||||
"type": "res",
|
||||
"id": reqID,
|
||||
"ok": true,
|
||||
"result": map[string]any{},
|
||||
}
|
||||
if err := conn.WriteJSON(res); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
defer srv.Close()
|
||||
|
||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
client.Start(ctx)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait for the presence event to be processed by checking the repo.
|
||||
// The presence handler updates the agent, so we check for the
|
||||
// lastActivityAt change.
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
var lastActivity string
|
||||
for time.Now().Before(deadline) {
|
||||
repo.mu.Lock()
|
||||
if a, ok := repo.agents["otto"]; ok {
|
||||
lastActivity = a.LastActivity
|
||||
}
|
||||
repo.mu.Unlock()
|
||||
if lastActivity == "2025-05-20T12:30:00Z" {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
if lastActivity != "2025-05-20T12:30:00Z" {
|
||||
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
|
||||
}
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("WSClient did not shut down cleanly")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStrPtr(t *testing.T) {
|
||||
s := "hello"
|
||||
p := strPtr(s)
|
||||
|
||||
Reference in New Issue
Block a user