// Package gateway provides WebSocket client integration with the OpenClaw // gateway using WS protocol v3. The WSClient handles connection, handshake, // frame routing, request/response correlation, and automatic reconnection // with exponential backoff. package gateway import ( "context" "encoding/json" "fmt" "log/slog" "sync" "time" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository" "github.com/gorilla/websocket" "github.com/google/uuid" ) // WSConfig holds WebSocket client configuration, typically loaded from // environment variables. AuthToken must be set to a valid OpenClaw gateway // operator token. type WSConfig struct { URL string // e.g. "ws://host.docker.internal:18789/" AuthToken string // from OPENCLAW_GATEWAY_TOKEN } // DefaultWSConfig returns sensible defaults for local development. func DefaultWSConfig() WSConfig { return WSConfig{ URL: "ws://localhost:18789/", AuthToken: "", } } // eventHandler is a callback invoked when a named event arrives from the // gateway. type eventHandler func(json.RawMessage) // WSClient connects to the OpenClaw gateway over WebSocket, completes the // v3 handshake, routes incoming frames, and automatically reconnects on // disconnect with exponential backoff. type WSClient struct { config WSConfig conn *websocket.Conn connMu sync.Mutex // protects conn for writes pending map[string]chan<- json.RawMessage mu sync.Mutex // protects pending and handlers agents repository.AgentRepo broker *handler.Broker logger *slog.Logger handlers map[string][]eventHandler } // NewWSClient returns a WSClient wired to the given repository and broker. func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Broker, logger *slog.Logger) *WSClient { if logger == nil { logger = slog.Default() } return &WSClient{ config: cfg, pending: make(map[string]chan<- json.RawMessage), agents: agents, broker: broker, logger: logger, handlers: make(map[string][]eventHandler), } } // OnEvent registers a handler for the given event name. Handlers are called // when an incoming frame with type "event" and matching event name is // received. This is safe to call before Start. func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) { c.mu.Lock() defer c.mu.Unlock() c.handlers[event] = append(c.handlers[event], handler) } // ── Frame types ────────────────────────────────────────────────────────── // wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol. type wsFrame struct { Type string `json:"type"` // "req", "res", "event" ID string `json:"id,omitempty"` // request/response correlation Method string `json:"method,omitempty"` // method name (req frames) Event string `json:"event,omitempty"` // event name (event frames) Params json.RawMessage `json:"params,omitempty"` Result json.RawMessage `json:"result,omitempty"` Error *wsError `json:"error,omitempty"` } // wsError represents an error in a response frame. type wsError struct { Code int `json:"code"` Message string `json:"message"` } // connectRequest builds the initial connect handshake payload. type connectRequest struct { MinProtocol int `json:"minProtocol"` MaxProtocol int `json:"maxProtocol"` Client connectClientInfo `json:"client"` Role string `json:"role"` Scopes []string `json:"scopes"` Auth connectAuth `json:"auth"` } type connectClientInfo struct { ID string `json:"id"` Version string `json:"version"` Platform string `json:"platform"` Mode string `json:"mode"` } type connectAuth struct { Token string `json:"token"` } // helloOKResponse represents the expected response to a successful connect. type helloOKResponse struct { ConnID string `json:"connId"` Features struct { Methods []string `json:"methods"` Events []string `json:"events"` } `json:"features"` } // ── Start loop ─────────────────────────────────────────────────────────── // Start connects to the gateway, completes the handshake, and begins the // read loop. On disconnect it reconnects with exponential backoff. On // ctx cancellation it performs a clean shutdown. func (c *WSClient) Start(ctx context.Context) { backoff := 1 * time.Second maxBackoff := 30 * time.Second for { err := c.connectAndRun(ctx) if err != nil { if ctx.Err() != nil { c.logger.Info("ws client stopped (context cancelled)") return } c.logger.Warn("ws client disconnected, reconnecting", "error", err, "backoff", backoff) } select { case <-ctx.Done(): c.logger.Info("ws client stopped during backoff (context cancelled)") return case <-time.After(backoff): // Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s backoff = backoff * 2 if backoff > maxBackoff { backoff = maxBackoff } } } } // connectAndRun dials the gateway, completes the handshake, and runs the // read loop until an error occurs or ctx is cancelled. func (c *WSClient) connectAndRun(ctx context.Context) error { c.logger.Info("ws client connecting", "url", c.config.URL) dialer := websocket.Dialer{ HandshakeTimeout: 10 * time.Second, } conn, _, err := dialer.DialContext(ctx, c.config.URL, nil) if err != nil { return fmt.Errorf("dial failed: %w", err) } c.connMu.Lock() c.conn = conn c.connMu.Unlock() // Reset backoff on successful connect defer func() { conn.Close() }() // Step 1: Read the connect.challenge frame if err := c.readChallenge(conn); err != nil { return fmt.Errorf("handshake challenge: %w", err) } // Step 2: Send connect request helloOK, err := c.sendConnect(conn) if err != nil { return fmt.Errorf("handshake connect: %w", err) } c.logger.Info("ws client handshake complete", "connId", helloOK.ConnID, "methods", helloOK.Features.Methods, "events", helloOK.Features.Events) // Step 3: Read loop return c.readLoop(ctx, conn) } // readChallenge reads the first frame from the gateway, which must be a // connect.challenge event. func (c *WSClient) readChallenge(conn *websocket.Conn) error { var frame wsFrame if err := conn.ReadJSON(&frame); err != nil { return fmt.Errorf("read challenge: %w", err) } if frame.Type != "event" || frame.Event != "connect.challenge" { return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event) } c.logger.Debug("received connect.challenge", "params", string(frame.Params)) return nil } // sendConnect sends the connect request and waits for the hello-ok response. func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) { reqID := uuid.New().String() params := connectRequest{ MinProtocol: 3, MaxProtocol: 3, Client: connectClientInfo{ ID: "control-center", Version: "1.0", Platform: "server", Mode: "operator", }, Role: "operator", Scopes: []string{"operator.read"}, Auth: connectAuth{ Token: c.config.AuthToken, }, } paramsJSON, err := json.Marshal(params) if err != nil { return nil, fmt.Errorf("marshal connect params: %w", err) } reqFrame := wsFrame{ Type: "req", ID: reqID, Method: "connect", Params: paramsJSON, } if err := conn.WriteJSON(reqFrame); err != nil { return nil, fmt.Errorf("write connect request: %w", err) } // Read response var resFrame wsFrame if err := conn.ReadJSON(&resFrame); err != nil { return nil, fmt.Errorf("read connect response: %w", err) } if resFrame.Error != nil { return nil, fmt.Errorf("connect rejected: code=%d msg=%s", resFrame.Error.Code, resFrame.Error.Message) } if resFrame.ID != reqID { return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID) } // Check for hello-ok method in the result // The gateway responds with method "hello-ok" on success var helloOK helloOKResponse if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil { return nil, fmt.Errorf("parse hello-ok: %w", err) } return &helloOK, nil } // readLoop continuously reads frames from the connection and routes them. // It returns on read error or context cancellation. func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error { for { select { case <-ctx.Done(): // Clean shutdown: send close frame c.connMu.Lock() c.conn.WriteControl( websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"), time.Now().Add(5*time.Second), ) c.connMu.Unlock() return ctx.Err() default: } var frame wsFrame if err := conn.ReadJSON(&frame); err != nil { // Check if it's a close error if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { c.logger.Info("ws connection closed by server") return nil } if websocket.IsUnexpectedCloseError(err) { c.logger.Warn("ws connection unexpectedly closed", "error", err) return err } return fmt.Errorf("read frame: %w", err) } c.routeFrame(frame) } } // routeFrame dispatches a received frame to the appropriate handler. func (c *WSClient) routeFrame(frame wsFrame) { switch frame.Type { case "res": c.handleResponse(frame) case "event": c.handleEvent(frame) default: c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID) } } // handleResponse correlates a response frame to a pending request channel. func (c *WSClient) handleResponse(frame wsFrame) { c.mu.Lock() ch, ok := c.pending[frame.ID] if ok { delete(c.pending, frame.ID) } c.mu.Unlock() if !ok { c.logger.Warn("received response for unknown request", "id", frame.ID) return } if frame.Error != nil { // Send nil to signal error; caller checks via Send return ch <- nil return } ch <- frame.Result } // handleEvent dispatches an event frame to registered handlers. func (c *WSClient) handleEvent(frame wsFrame) { c.mu.Lock() handlers := c.handlers[frame.Event] c.mu.Unlock() if len(handlers) == 0 { c.logger.Debug("unhandled event", "event", frame.Event) return } for _, h := range handlers { h(frame.Params) } } // ── Send ───────────────────────────────────────────────────────────────── // Send sends a JSON request to the gateway and returns the response payload. // It is safe for concurrent use. The caller should check for errors in the // returned payload. A nil payload with nil error means the gateway sent an // error response (check via the response frame's error field, which is logged). func (c *WSClient) Send(method string, params any) (json.RawMessage, error) { reqID := uuid.New().String() paramsJSON, err := json.Marshal(params) if err != nil { return nil, fmt.Errorf("marshal params: %w", err) } // Register pending response channel respCh := make(chan json.RawMessage, 1) c.mu.Lock() c.pending[reqID] = respCh c.mu.Unlock() defer func() { c.mu.Lock() delete(c.pending, reqID) c.mu.Unlock() }() // Build and send frame frame := wsFrame{ Type: "req", ID: reqID, Method: method, Params: paramsJSON, } c.connMu.Lock() err = c.conn.WriteJSON(frame) c.connMu.Unlock() if err != nil { return nil, fmt.Errorf("write request: %w", err) } // Wait for response with timeout select { case resp := <-respCh: if resp == nil { return nil, fmt.Errorf("gateway returned error for request %s", reqID) } return resp, nil case <-time.After(30 * time.Second): return nil, fmt.Errorf("request %s timed out", reqID) } }