diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 9500809..db7f3b9 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -229,7 +229,29 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.connId = helloOK.ConnID c.connMu.Unlock() - // Notify REST client that WS is live so it stands down + // Step 2b: Start the read loop in a goroutine so that Send() in + // initialSync can receive responses. The read loop goroutine will + // continue running after initialSync completes, routing live events + // and any future RPC responses. + readLoopErrCh := make(chan error, 1) + go func() { + readLoopErrCh <- c.readLoop(ctx, conn) + }() + + // Step 2c: Initial sync — fetch agents + sessions from gateway. + // This now works because the read loop is active and will route + // response frames back to Send() via handleResponse. + if err := c.initialSync(ctx); err != nil { + c.logger.Warn("initial sync failed, will continue with read loop", "error", err) + } + + // Step 2d: Register live event handlers (read loop is already + // active, so events will be dispatched immediately) + c.registerEventHandlers() + + // Notify REST client that WS is live so it stands down. + // This must happen AFTER initialSync so that the REST poller + // doesn't start polling while we're still syncing. if c.restClient != nil { c.restClient.MarkWSReady() c.logger.Info("ws client notified REST fallback to stand down") @@ -238,16 +260,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { // Reset wsReadyOnce so MarkWSReady can fire again after a reconnect c.wsReadyOnce = sync.Once{} - // Step 2b: Initial sync — fetch agents + sessions from gateway - if err := c.initialSync(ctx); err != nil { - c.logger.Warn("initial sync failed, will continue with read loop", "error", err) - } - - // Step 2c: Register live event handlers - c.registerEventHandlers() - - // Step 3: Read loop - return c.readLoop(ctx, conn) + // Step 3: Wait for the read loop goroutine to finish (blocks + // until the connection drops or context is cancelled). + return <-readLoopErrCh } // readChallenge reads the first frame from the gateway, which must be a diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go index 92a1d66..300a32d 100644 --- a/go-backend/internal/gateway/wsclient_test.go +++ b/go-backend/internal/gateway/wsclient_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" "github.com/gorilla/websocket" @@ -466,6 +467,104 @@ func TestAgentItemToCard(t *testing.T) { }) } +// ── 6. Test: Initial sync ordering (readLoop active before Send) ────────── + +// TestConnectAndRun_InitialSyncOrdering verifies that the WS client +// completes initial sync successfully. This test would hang/timeout if +// readLoop were NOT started before initialSync, because Send() relies on +// readLoop→routeFrame→handleResponse to deliver RPC responses. +func TestConnectAndRun_InitialSyncOrdering(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + // Handshake + handleHandshake(t, conn) + + // After handshake, respond to RPCs + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + var result any + switch method { + case "agents.list": + result = []map[string]any{ + {"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"}, + {"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"}, + } + case "sessions.list": + result = []map[string]any{ + {"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"}, + } + default: + result = map[string]any{} + } + + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": result, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait for initial sync to complete by checking repo state. + // The agents should be persisted from the RPC responses. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + repo.mu.Lock() + _, ottoOK := repo.agents["otto"] + _, dexOK := repo.agents["dex"] + repo.mu.Unlock() + if ottoOK && dexOK { + break + } + time.Sleep(50 * time.Millisecond) + } + + repo.mu.Lock() + _, ottoOK := repo.agents["otto"] + _, dexOK := repo.agents["dex"] + repo.mu.Unlock() + + if !ottoOK { + t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()") + } + if !dexOK { + t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()") + } + + cancel() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly") + } +} + func TestStrPtr(t *testing.T) { s := "hello" p := strPtr(s)