diff --git a/.env.example b/.env.example index 3d4290c..73357ac 100644 --- a/.env.example +++ b/.env.example @@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s # When using docker-compose, these are set in the services section # See docker-compose.yml for service-specific environment variables -# ── Database Configuration ───────────────────────────────────────────── +# ── Database Configuration ─────────────────────────────────────────────── # Set in the db service environment section of docker-compose.yml # POSTGRES_USER=controlcenter # POSTGRES_PASSWORD=controlcenter @@ -47,4 +47,4 @@ GATEWAY_POLL_INTERVAL=5s # For Docker deployment: # 1. Copy .env.example to .env (backend only) # 2. Run: docker compose up -d -# 3. Access frontend at http://localhost:3000 +# 3. Access frontend at http://localhost:3000 \ No newline at end of file diff --git a/go-backend/cmd/server/main.go b/go-backend/cmd/server/main.go index cc96fb1..6426f5f 100644 --- a/go-backend/cmd/server/main.go +++ b/go-backend/cmd/server/main.go @@ -112,7 +112,7 @@ func main() { <-quit slog.Info("shutting down server...") - cancel() // stop gateway polling + cancel() // stop gateway clients shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) defer shutdownCancel() @@ -136,4 +136,4 @@ func parseLogLevel(level string) slog.Level { default: return slog.LevelInfo } -} +} \ No newline at end of file diff --git a/go-backend/internal/config/config.go b/go-backend/internal/config/config.go index c6a26b3..2832b06 100644 --- a/go-backend/internal/config/config.go +++ b/go-backend/internal/config/config.go @@ -10,30 +10,30 @@ import ( // Config holds all application configuration. type Config struct { - Port int - DatabaseURL string - CORSOrigin string - LogLevel string - Environment string - GatewayRestURL string - GatewayRestPollInterval time.Duration - WSGatewayURL string - WSGatewayToken string + Port int + DatabaseURL string + CORSOrigin string + LogLevel string + Environment string + GatewayRestURL string + GatewayRestPollInterval time.Duration + WSGatewayURL string + WSGatewayToken string } // Load reads configuration from environment variables, applying defaults where // values are not set. All secrets come from the environment — nothing is hardcoded. func Load() *Config { return &Config{ - Port: getEnvInt("PORT", 8080), - DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"), - CORSOrigin: getEnv("CORS_ORIGIN", "*"), - LogLevel: getEnv("LOG_LEVEL", "info"), - Environment: getEnv("ENVIRONMENT", "development"), - GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), - GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), - WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"), - WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), + Port: getEnvInt("PORT", 8080), + DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"), + CORSOrigin: getEnv("CORS_ORIGIN", "*"), + LogLevel: getEnv("LOG_LEVEL", "info"), + Environment: getEnv("ENVIRONMENT", "development"), + GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), + GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), + WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"), + WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), } } @@ -60,4 +60,4 @@ func getEnvDuration(key string, fallback time.Duration) time.Duration { } } return fallback -} +} \ No newline at end of file diff --git a/go-backend/internal/gateway/client.go b/go-backend/internal/gateway/client.go index 4b2d520..5a8db94 100644 --- a/go-backend/internal/gateway/client.go +++ b/go-backend/internal/gateway/client.go @@ -1,6 +1,10 @@ // Package gateway provides an OpenClaw gateway integration client that // polls agent states, persists them via the repository layer, and broadcasts // changes through the SSE broker for real-time frontend updates. +// +// When a WSClient is wired via SetWSClient, the REST poller becomes a +// fallback: it waits for the WS client to signal readiness, and only starts +// polling if WS fails to connect within 30 seconds. package gateway import ( @@ -29,7 +33,7 @@ type Client struct { broker *handler.Broker wsClient *WSClient // optional WS client; when set, REST is fallback only wsReady chan struct{} // closed once WS connection is established - wsReadyOnce sync.Once // protects wsReady close from double-close race + wsReadyOnce sync.Once // protects wsReady close from double-close race } // Config holds gateway client configuration, typically loaded from environment. @@ -140,7 +144,6 @@ func (c *Client) poll(ctx context.Context) { } for _, ga := range agents { - // Check if agent already exists; if so, update; otherwise create. existing, err := c.agents.Get(ctx, ga.ID) if err != nil { // Not found — create it @@ -185,51 +188,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error { slog.Info("seeding demo agents") demoAgents := []models.AgentCardData{ { - ID: "otto", - DisplayName: "Otto", - Role: "Orchestrator", - Status: models.AgentStatusActive, + ID: "otto", + DisplayName: "Otto", + Role: "Orchestrator", + Status: models.AgentStatusActive, CurrentTask: strPtr("Orchestrating tasks"), SessionKey: "otto-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "rex", - DisplayName: "Rex", - Role: "Frontend Dev", - Status: models.AgentStatusIdle, + ID: "rex", + DisplayName: "Rex", + Role: "Frontend Dev", + Status: models.AgentStatusIdle, SessionKey: "rex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339), }, { - ID: "dex", - DisplayName: "Dex", - Role: "Backend Dev", - Status: models.AgentStatusThinking, + ID: "dex", + DisplayName: "Dex", + Role: "Backend Dev", + Status: models.AgentStatusThinking, CurrentTask: strPtr("Designing API contracts"), SessionKey: "dex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "hex", - DisplayName: "Hex", - Role: "Database Specialist", - Status: models.AgentStatusActive, + ID: "hex", + DisplayName: "Hex", + Role: "Database Specialist", + Status: models.AgentStatusActive, CurrentTask: strPtr("Reviewing schema migrations"), SessionKey: "hex-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Format(time.RFC3339), }, { - ID: "pip", - DisplayName: "Pip", - Role: "Edge Device Dev", - Status: models.AgentStatusIdle, + ID: "pip", + DisplayName: "Pip", + Role: "Edge Device Dev", + Status: models.AgentStatusIdle, SessionKey: "pip-session", - Channel: "discord", + Channel: "discord", LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339), }, } @@ -243,4 +246,4 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error { return nil } -func strPtr(s string) *string { return &s } +func strPtr(s string) *string { return &s } \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 322d551..709882e 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -92,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) { // wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol. type wsFrame struct { - Type string `json:"type"` // "req", "res", "event" - ID string `json:"id,omitempty"` // request/response correlation - Method string `json:"method,omitempty"` // method name (req frames) - Event string `json:"event,omitempty"` // event name (event frames) + Type string `json:"type"` // "req", "res", "event" + ID string `json:"id,omitempty"` // request/response correlation + Method string `json:"method,omitempty"` // method name (req frames) + Event string `json:"event,omitempty"` // event name (event frames) Params json.RawMessage `json:"params,omitempty"` Result json.RawMessage `json:"result,omitempty"` Error *wsError `json:"error,omitempty"` @@ -229,7 +229,34 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.connId = helloOK.ConnID c.connMu.Unlock() - // Notify REST client that WS is live so it stands down + // Step 2b: Register live event handlers BEFORE starting the read + // loop. This eliminates the race window where readLoop dispatches + // live events as "unhandled" because no handlers are registered yet. + // The handlers only depend on c.agents and c.broker, which are wired + // in the constructor — they do not need initialSync to have completed. + c.registerEventHandlers() + + // Step 2c: Start the read loop in a goroutine so that Send() in + // initialSync can receive responses. The read loop goroutine will + // continue running after initialSync completes, routing live events + // and any future RPC responses. Because handlers are already + // registered, any events arriving during or after initialSync are + // dispatched correctly. + readLoopErrCh := make(chan error, 1) + go func() { + readLoopErrCh <- c.readLoop(ctx, conn) + }() + + // Step 2d: Initial sync — fetch agents + sessions from gateway. + // This works because the read loop is active and will route + // response frames back to Send() via handleResponse. + if err := c.initialSync(ctx); err != nil { + c.logger.Warn("initial sync failed, will continue with read loop", "error", err) + } + + // Notify REST client that WS is live so it stands down. + // This must happen AFTER initialSync so that the REST poller + // doesn't start polling while we're still syncing. if c.restClient != nil { c.restClient.MarkWSReady() c.logger.Info("ws client notified REST fallback to stand down") @@ -238,16 +265,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { // Reset wsReadyOnce so MarkWSReady can fire again after a reconnect c.wsReadyOnce = sync.Once{} - // Step 2b: Initial sync — fetch agents + sessions from gateway - if err := c.initialSync(ctx); err != nil { - c.logger.Warn("initial sync failed, will continue with read loop", "error", err) - } - - // Step 2c: Register live event handlers - c.registerEventHandlers() - - // Step 3: Read loop - return c.readLoop(ctx, conn) + // Step 3: Wait for the read loop goroutine to finish (blocks + // until the connection drops or context is cancelled). + return <-readLoopErrCh } // readChallenge reads the first frame from the gateway, which must be a diff --git a/go-backend/internal/gateway/wsclient_test.go b/go-backend/internal/gateway/wsclient_test.go index 92a1d66..8e37a6a 100644 --- a/go-backend/internal/gateway/wsclient_test.go +++ b/go-backend/internal/gateway/wsclient_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" "github.com/gorilla/websocket" @@ -466,6 +467,236 @@ func TestAgentItemToCard(t *testing.T) { }) } +// ── 6. Test: Initial sync ordering (readLoop active before Send) ────────── + +// TestConnectAndRun_InitialSyncOrdering verifies that the WS client +// completes initial sync successfully. This test would hang/timeout if +// readLoop were NOT started before initialSync, because Send() relies on +// readLoop→routeFrame→handleResponse to deliver RPC responses. +func TestConnectAndRun_InitialSyncOrdering(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + // Handshake + handleHandshake(t, conn) + + // After handshake, respond to RPCs + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + var result any + switch method { + case "agents.list": + result = []map[string]any{ + {"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"}, + {"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"}, + } + case "sessions.list": + result = []map[string]any{ + {"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"}, + } + default: + result = map[string]any{} + } + + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": result, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait for initial sync to complete by checking repo state. + // The agents should be persisted from the RPC responses. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + repo.mu.Lock() + _, ottoOK := repo.agents["otto"] + _, dexOK := repo.agents["dex"] + repo.mu.Unlock() + if ottoOK && dexOK { + break + } + time.Sleep(50 * time.Millisecond) + } + + repo.mu.Lock() + _, ottoOK := repo.agents["otto"] + _, dexOK := repo.agents["dex"] + repo.mu.Unlock() + + if !ottoOK { + t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()") + } + if !dexOK { + t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()") + } + + cancel() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly") + } +} + +// ── 7. Test: Event not lost during initial sync (regression) ─────────────── + +// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events +// arriving during initial sync are NOT dropped. This is a regression test +// for the race where readLoop started before registerEventHandlers(), +// causing events read during that window to be logged as "unhandled" and lost. +// +// The mock server sends a live event (sessions.changed) right after the +// handshake, interleaved with the RPC responses for agents.list and +// sessions.list. The test asserts the event is received by the handler. +func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) { + repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)} + broker := handler.NewBroker() + capture := newBroadcastCapture(broker) + defer capture.close() + + // Pre-seed an agent so the event handler can update it. + repo.agents["otto"] = models.AgentCardData{ + ID: "otto", + DisplayName: "Otto", + Status: models.AgentStatusIdle, + } + + srv := newTestWSServer(t, func(conn *websocket.Conn) { + // Handshake + handleHandshake(t, conn) + + // After handshake, process RPCs and inject a live event. + for { + var req map[string]any + if err := conn.ReadJSON(&req); err != nil { + break + } + reqID, _ := req["id"].(string) + method, _ := req["method"].(string) + + // Respond to agents.list RPC + if method == "agents.list" { + // Before responding, inject a live event — simulates + // a gateway pushing a presence update during sync. + evt := map[string]any{ + "type": "event", + "event": "presence", + "params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"}, + } + if err := conn.WriteJSON(evt); err != nil { + break + } + + // Now send the RPC response + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": []map[string]any{ + {"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"}, + }, + } + if err := conn.WriteJSON(res); err != nil { + break + } + continue + } + + // Respond to sessions.list RPC + if method == "sessions.list" { + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": []map[string]any{}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + continue + } + + // Default response for other methods + res := map[string]any{ + "type": "res", + "id": reqID, + "ok": true, + "result": map[string]any{}, + } + if err := conn.WriteJSON(res); err != nil { + break + } + } + }) + defer srv.Close() + + client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + client.Start(ctx) + close(done) + }() + + // Wait for the presence event to be processed by checking the repo. + // The presence handler updates the agent, so we check for the + // lastActivityAt change. + deadline := time.Now().Add(5 * time.Second) + var lastActivity string + for time.Now().Before(deadline) { + repo.mu.Lock() + if a, ok := repo.agents["otto"]; ok { + lastActivity = a.LastActivity + } + repo.mu.Unlock() + if lastActivity == "2025-05-20T12:30:00Z" { + break + } + time.Sleep(50 * time.Millisecond) + } + + if lastActivity != "2025-05-20T12:30:00Z" { + t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z") + } + + cancel() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("WSClient did not shut down cleanly") + } +} + func TestStrPtr(t *testing.T) { s := "hello" p := strPtr(s)