CUB-204: wire WS client as primary, REST poller as fallback
Some checks failed
Dev Build / build-test (pull_request) Failing after 1s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
openclaw/grimm-review REJECTED — 6 blocking issues
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / build-frontend (pull_request) Failing after 1s
Some checks failed
Dev Build / build-test (pull_request) Failing after 1s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
openclaw/grimm-review REJECTED — 6 blocking issues
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / build-frontend (pull_request) Failing after 1s
- Rename GatewayURL/GatewayPollInterval → GatewayRestURL/GatewayRestPollInterval - Change Docker-aware defaults (host.docker.internal instead of localhost) - Client.Start() waits for WS readiness (30s timeout), falls back to REST - Client.SetWSClient()/MarkWSReady() for WS→REST coordination - WSClient.SetRESTClient() so WS notifies REST on successful handshake - main.go wires both clients: WS primary, REST fallback with cross-references - .env.example documents WS_GATEWAY_URL, GATEWAY_TOKEN, REST fallback vars - docker-compose.yml adds WS_GATEWAY_URL and GATEWAY_TOKEN env vars - reference/CONTROL_CENTER_CONTEXT.md documents architecture and startup sequence
This commit is contained in:
@@ -63,25 +63,30 @@ func main() {
|
||||
Broker: broker,
|
||||
})
|
||||
|
||||
// ── Gateway client (polls OpenClaw for agent states) ───────────────────
|
||||
gwClient := gateway.NewClient(gateway.Config{
|
||||
URL: cfg.GatewayURL,
|
||||
PollInterval: cfg.GatewayPollInterval,
|
||||
}, agentRepo, broker)
|
||||
|
||||
// ── WebSocket client (connects to OpenClaw gateway WS v3) ─────────────
|
||||
// ── Gateway clients (WS primary, REST fallback) ───────────────────
|
||||
// WS gateway client (primary path)
|
||||
wsClient := gateway.NewWSClient(gateway.WSConfig{
|
||||
URL: cfg.WSGatewayURL,
|
||||
AuthToken: cfg.WSGatewayToken,
|
||||
}, agentRepo, broker, logger)
|
||||
|
||||
// REST gateway client (fallback — only polls if WS fails to connect)
|
||||
gwClient := gateway.NewClient(gateway.Config{
|
||||
URL: cfg.GatewayRestURL,
|
||||
PollInterval: cfg.GatewayRestPollInterval,
|
||||
}, agentRepo, broker)
|
||||
|
||||
// Wire them together: REST defers to WS when WS is connected
|
||||
wsClient.SetRESTClient(gwClient)
|
||||
gwClient.SetWSClient(wsClient)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go gwClient.Start(ctx)
|
||||
|
||||
// Start WS client in background; logs connection status
|
||||
// Start WS client first (primary)
|
||||
go wsClient.Start(ctx)
|
||||
// Start REST client (will wait for WS, then stand down or fall back)
|
||||
go gwClient.Start(ctx)
|
||||
|
||||
// ── Server ─────────────────────────────────────────────────────────────
|
||||
srv := &http.Server{
|
||||
|
||||
@@ -15,10 +15,10 @@ type Config struct {
|
||||
CORSOrigin string
|
||||
LogLevel string
|
||||
Environment string
|
||||
GatewayURL string
|
||||
GatewayPollInterval time.Duration
|
||||
WSGatewayURL string
|
||||
WSGatewayToken string
|
||||
GatewayRestURL string
|
||||
GatewayRestPollInterval time.Duration
|
||||
WSGatewayURL string
|
||||
WSGatewayToken string
|
||||
}
|
||||
|
||||
// Load reads configuration from environment variables, applying defaults where
|
||||
@@ -30,10 +30,10 @@ func Load() *Config {
|
||||
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||
Environment: getEnv("ENVIRONMENT", "development"),
|
||||
GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"),
|
||||
GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://localhost:18789/"),
|
||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
||||
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
||||
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,13 +17,17 @@ import (
|
||||
)
|
||||
|
||||
// Client polls the OpenClaw gateway for agent status and keeps the database
|
||||
// and SSE broker in sync.
|
||||
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
|
||||
// fallback: it waits for the WS client to signal readiness, and only starts
|
||||
// polling if WS fails to connect after initial backoff retries.
|
||||
type Client struct {
|
||||
url string
|
||||
pollInterval time.Duration
|
||||
httpClient *http.Client
|
||||
agents repository.AgentRepo
|
||||
broker *handler.Broker
|
||||
wsClient *WSClient // optional WS client; when set, REST is fallback only
|
||||
wsReady chan struct{} // closed once WS connection is established
|
||||
}
|
||||
|
||||
// Config holds gateway client configuration, typically loaded from environment.
|
||||
@@ -48,22 +52,67 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker)
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
agents: agents,
|
||||
broker: broker,
|
||||
wsReady: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the polling loop. It runs until ctx is cancelled.
|
||||
func (c *Client) Start(ctx context.Context) {
|
||||
slog.Info("gateway client starting",
|
||||
"url", c.url,
|
||||
"pollInterval", c.pollInterval.String())
|
||||
// SetWSClient wires the WebSocket client so the REST poller knows to defer
|
||||
// to it. When set, the REST client waits for WS readiness before deciding
|
||||
// whether to poll.
|
||||
func (c *Client) SetWSClient(ws *WSClient) {
|
||||
c.wsClient = ws
|
||||
}
|
||||
|
||||
// MarkWSReady signals that the WS connection is live and the REST poller
|
||||
// should stand down. Called by WSClient after a successful handshake.
|
||||
func (c *Client) MarkWSReady() {
|
||||
select {
|
||||
case <-c.wsReady:
|
||||
// already closed
|
||||
default:
|
||||
close(c.wsReady)
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the gateway client loop. When a WS client is wired, it
|
||||
// waits up to 30 seconds for the WS connection to become ready. If WS
|
||||
// connects, the REST poller stands down and only logs periodically. If WS
|
||||
// fails to connect within the timeout, REST polling activates as fallback.
|
||||
func (c *Client) Start(ctx context.Context) {
|
||||
if c.wsClient != nil {
|
||||
slog.Info("gateway client waiting for WS connection", "timeout", "30s")
|
||||
|
||||
select {
|
||||
case <-c.wsReady:
|
||||
slog.Info("gateway client using WS — REST poller standing down")
|
||||
// WS is live; keep this goroutine alive but idle. If WS
|
||||
// disconnects later, we could re-enter polling, but for now
|
||||
// the WS client handles its own reconnection.
|
||||
<-ctx.Done()
|
||||
slog.Info("gateway client stopped (WS mode)")
|
||||
return
|
||||
case <-time.After(30 * time.Second):
|
||||
slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling",
|
||||
"url", c.url,
|
||||
"pollInterval", c.pollInterval.String())
|
||||
case <-ctx.Done():
|
||||
slog.Info("gateway client stopped while waiting for WS")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
slog.Info("gateway client using REST polling (no WS client configured)",
|
||||
"url", c.url,
|
||||
"pollInterval", c.pollInterval.String())
|
||||
}
|
||||
|
||||
// REST fallback polling
|
||||
ticker := time.NewTicker(c.pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
slog.Info("gateway client stopped")
|
||||
slog.Info("gateway client stopped (REST fallback)")
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.poll(ctx)
|
||||
|
||||
@@ -43,17 +43,18 @@ type eventHandler func(json.RawMessage)
|
||||
// v3 handshake, routes incoming frames, and automatically reconnects on
|
||||
// disconnect with exponential backoff.
|
||||
type WSClient struct {
|
||||
config WSConfig
|
||||
conn *websocket.Conn
|
||||
connMu sync.Mutex // protects conn for writes
|
||||
pending map[string]chan<- json.RawMessage
|
||||
mu sync.Mutex // protects pending and handlers
|
||||
agents repository.AgentRepo
|
||||
broker *handler.Broker
|
||||
logger *slog.Logger
|
||||
config WSConfig
|
||||
conn *websocket.Conn
|
||||
connMu sync.Mutex // protects conn for writes
|
||||
pending map[string]chan<- json.RawMessage
|
||||
mu sync.Mutex // protects pending and handlers
|
||||
agents repository.AgentRepo
|
||||
broker *handler.Broker
|
||||
logger *slog.Logger
|
||||
|
||||
handlers map[string][]eventHandler
|
||||
connId string // set after successful hello-ok
|
||||
handlers map[string][]eventHandler
|
||||
connId string // set after successful hello-ok
|
||||
restClient *Client // optional REST client to notify on WS ready
|
||||
}
|
||||
|
||||
// NewWSClient returns a WSClient wired to the given repository and broker.
|
||||
@@ -71,6 +72,12 @@ func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Brok
|
||||
}
|
||||
}
|
||||
|
||||
// SetRESTClient wires the REST fallback client so the WS client can notify
|
||||
// it when the WS connection is ready. Call this before Start.
|
||||
func (c *WSClient) SetRESTClient(rest *Client) {
|
||||
c.restClient = rest
|
||||
}
|
||||
|
||||
// OnEvent registers a handler for the given event name. Handlers are called
|
||||
// when an incoming frame with type "event" and matching event name is
|
||||
// received. This is safe to call before Start.
|
||||
@@ -208,6 +215,12 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
||||
c.connId = helloOK.ConnID
|
||||
c.connMu.Unlock()
|
||||
|
||||
// Notify REST client that WS is live so it stands down
|
||||
if c.restClient != nil {
|
||||
c.restClient.MarkWSReady()
|
||||
c.logger.Info("ws client notified REST fallback to stand down")
|
||||
}
|
||||
|
||||
// Step 2b: Initial sync — fetch agents + sessions from gateway
|
||||
if err := c.initialSync(ctx); err != nil {
|
||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||
|
||||
Reference in New Issue
Block a user