Compare commits
3 Commits
agent/dex/
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49b959aee5 | ||
|
|
ae37d79aa8 | ||
|
|
8fb4183abe |
10
.env.example
10
.env.example
@@ -12,14 +12,8 @@ ENVIRONMENT=development
|
|||||||
# Format: postgresql://user:password@host:port/database?sslmode=disable
|
# Format: postgresql://user:password@host:port/database?sslmode=disable
|
||||||
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
|
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
|
||||||
|
|
||||||
# Gateway (OpenClaw) connection — WebSocket (primary)
|
# Gateway (OpenClaw) connection
|
||||||
# WebSocket URL for real-time OpenClaw gateway v3 protocol
|
# URL to the OpenClaw gateway API for polling agent states
|
||||||
GATEWAY_WS_URL=ws://localhost:18789/
|
|
||||||
# Auth token for the OpenClaw gateway (operator scope)
|
|
||||||
OPENCLAW_GATEWAY_TOKEN=
|
|
||||||
|
|
||||||
# Gateway (OpenClaw) connection — REST (fallback)
|
|
||||||
# URL to the OpenClaw gateway API for polling agent states (used only if WS fails)
|
|
||||||
GATEWAY_URL=http://localhost:18789/api/agents
|
GATEWAY_URL=http://localhost:18789/api/agents
|
||||||
# Polling interval for agent state updates
|
# Polling interval for agent state updates
|
||||||
GATEWAY_POLL_INTERVAL=5s
|
GATEWAY_POLL_INTERVAL=5s
|
||||||
|
|||||||
@@ -13,16 +13,11 @@ env:
|
|||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test-and-build:
|
test-and-build:
|
||||||
runs-on: ubuntu-latest
|
runs-on: go-react
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
- name: Setup Go
|
|
||||||
run: |
|
|
||||||
curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
|
|
||||||
echo "/usr/local/go/bin" >> $GITHUB_PATH
|
|
||||||
|
|
||||||
- name: Run backend tests
|
- name: Run backend tests
|
||||||
run: go test ./...
|
run: go test ./...
|
||||||
working-directory: ./go-backend
|
working-directory: ./go-backend
|
||||||
@@ -31,11 +26,6 @@ jobs:
|
|||||||
run: go build -ldflags="-w -s" -o /tmp/server ./cmd/server
|
run: go build -ldflags="-w -s" -o /tmp/server ./cmd/server
|
||||||
working-directory: ./go-backend
|
working-directory: ./go-backend
|
||||||
|
|
||||||
- name: Setup Node
|
|
||||||
run: |
|
|
||||||
curl -sL https://deb.nodesource.com/setup_22.x | bash -
|
|
||||||
apt-get install -y nodejs
|
|
||||||
|
|
||||||
- name: Install frontend deps
|
- name: Install frontend deps
|
||||||
run: npm ci
|
run: npm ci
|
||||||
working-directory: ./frontend
|
working-directory: ./frontend
|
||||||
@@ -51,7 +41,7 @@ jobs:
|
|||||||
docker-build-push:
|
docker-build-push:
|
||||||
needs: test-and-build
|
needs: test-and-build
|
||||||
if: gitea.event_name == 'push'
|
if: gitea.event_name == 'push'
|
||||||
runs-on: ubuntu-latest
|
runs-on: go-react
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
|
|||||||
11
ci-image/Dockerfile
Normal file
11
ci-image/Dockerfile
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
FROM catthehacker/ubuntu:act-latest
|
||||||
|
|
||||||
|
# Install Go 1.23
|
||||||
|
RUN curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
|
||||||
|
|
||||||
|
# Install Node 22
|
||||||
|
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
|
||||||
|
&& apt-get install -y nodejs \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
ENV PATH="/usr/local/go/bin:${PATH}"
|
||||||
@@ -63,29 +63,16 @@ func main() {
|
|||||||
Broker: broker,
|
Broker: broker,
|
||||||
})
|
})
|
||||||
|
|
||||||
// ── Gateway: WS primary + REST fallback ────────────────────────────────
|
// ── Gateway client (polls OpenClaw for agent states) ───────────────────
|
||||||
// WebSocket client (primary — real-time events via OpenClaw v3 protocol)
|
gwClient := gateway.NewClient(gateway.Config{
|
||||||
wsClient := gateway.NewWSClient(gateway.WSConfig{
|
|
||||||
URL: cfg.WSGatewayURL,
|
|
||||||
AuthToken: cfg.WSGatewayToken,
|
|
||||||
}, agentRepo, broker, logger)
|
|
||||||
|
|
||||||
// REST polling client (fallback — only used if WS connection fails)
|
|
||||||
restClient := gateway.NewClient(gateway.Config{
|
|
||||||
URL: cfg.GatewayURL,
|
URL: cfg.GatewayURL,
|
||||||
PollInterval: cfg.GatewayPollInterval,
|
PollInterval: cfg.GatewayPollInterval,
|
||||||
}, agentRepo, broker)
|
}, agentRepo, broker)
|
||||||
|
|
||||||
// Wire them: WS notifies REST to stand down on successful connect
|
|
||||||
wsClient.SetRESTClient(restClient)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Start WS client first (primary)
|
go gwClient.Start(ctx)
|
||||||
go wsClient.Start(ctx)
|
|
||||||
// Start REST client (fallback polling)
|
|
||||||
go restClient.Start(ctx)
|
|
||||||
|
|
||||||
// ── Server ─────────────────────────────────────────────────────────────
|
// ── Server ─────────────────────────────────────────────────────────────
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
@@ -111,7 +98,7 @@ func main() {
|
|||||||
<-quit
|
<-quit
|
||||||
slog.Info("shutting down server...")
|
slog.Info("shutting down server...")
|
||||||
|
|
||||||
cancel() // stop gateway clients
|
cancel() // stop gateway polling
|
||||||
|
|
||||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
defer shutdownCancel()
|
defer shutdownCancel()
|
||||||
@@ -135,4 +122,4 @@ func parseLogLevel(level string) slog.Level {
|
|||||||
default:
|
default:
|
||||||
return slog.LevelInfo
|
return slog.LevelInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ require (
|
|||||||
github.com/go-chi/cors v1.2.1
|
github.com/go-chi/cors v1.2.1
|
||||||
github.com/go-playground/validator/v10 v10.24.0
|
github.com/go-playground/validator/v10 v10.24.0
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/gorilla/websocket v1.5.3
|
|
||||||
github.com/jackc/pgx/v5 v5.7.2
|
github.com/jackc/pgx/v5 v5.7.2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -17,8 +17,6 @@ github.com/go-playground/validator/v10 v10.24.0 h1:KHQckvo8G6hlWnrPX4NJJ+aBfWNAE
|
|||||||
github.com/go-playground/validator/v10 v10.24.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus=
|
github.com/go-playground/validator/v10 v10.24.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
|
||||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
|
|||||||
@@ -10,15 +10,13 @@ import (
|
|||||||
|
|
||||||
// Config holds all application configuration.
|
// Config holds all application configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port int
|
Port int
|
||||||
DatabaseURL string
|
DatabaseURL string
|
||||||
CORSOrigin string
|
CORSOrigin string
|
||||||
LogLevel string
|
LogLevel string
|
||||||
Environment string
|
Environment string
|
||||||
GatewayURL string // REST fallback URL
|
GatewayURL string
|
||||||
GatewayPollInterval time.Duration // REST fallback poll interval
|
GatewayPollInterval time.Duration
|
||||||
WSGatewayURL string // WebSocket gateway URL
|
|
||||||
WSGatewayToken string // WebSocket auth token
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load reads configuration from environment variables, applying defaults where
|
// Load reads configuration from environment variables, applying defaults where
|
||||||
@@ -30,10 +28,8 @@ func Load() *Config {
|
|||||||
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
||||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||||
Environment: getEnv("ENVIRONMENT", "development"),
|
Environment: getEnv("ENVIRONMENT", "development"),
|
||||||
GatewayURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
GatewayURL: getEnv("GATEWAY_URL", "http://localhost:18789/api/agents"),
|
||||||
GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||||
WSGatewayURL: getEnv("GATEWAY_WS_URL", "ws://host.docker.internal:18789/"),
|
|
||||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,4 +56,4 @@ func getEnvDuration(key string, fallback time.Duration) time.Duration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fallback
|
return fallback
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,6 @@
|
|||||||
// Package gateway provides an OpenClaw gateway integration client that
|
// Package gateway provides an OpenClaw gateway integration client that
|
||||||
// polls agent states, persists them via the repository layer, and broadcasts
|
// polls agent states, persists them via the repository layer, and broadcasts
|
||||||
// changes through the SSE broker for real-time frontend updates.
|
// changes through the SSE broker for real-time frontend updates.
|
||||||
//
|
|
||||||
// When a WSClient is wired via SetWSClient, the REST poller becomes a
|
|
||||||
// fallback: it waits for the WS client to signal readiness, and only starts
|
|
||||||
// polling if WS fails to connect within 30 seconds.
|
|
||||||
package gateway
|
package gateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -21,16 +17,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Client polls the OpenClaw gateway for agent status and keeps the database
|
// Client polls the OpenClaw gateway for agent status and keeps the database
|
||||||
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
|
// and SSE broker in sync.
|
||||||
// fallback that only activates if the WS connection fails.
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
url string
|
url string
|
||||||
pollInterval time.Duration
|
pollInterval time.Duration
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
agents repository.AgentRepo
|
agents repository.AgentRepo
|
||||||
broker *handler.Broker
|
broker *handler.Broker
|
||||||
wsClient *Client // optional WS client; when set, REST is fallback only
|
|
||||||
wsReady chan struct{} // closed once WS connection is established
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config holds gateway client configuration, typically loaded from environment.
|
// Config holds gateway client configuration, typically loaded from environment.
|
||||||
@@ -55,32 +48,10 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker)
|
|||||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||||
agents: agents,
|
agents: agents,
|
||||||
broker: broker,
|
broker: broker,
|
||||||
wsReady: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetWSClient wires the WebSocket client so the REST poller knows to defer
|
// Start begins the polling loop. It runs until ctx is cancelled.
|
||||||
// to it. When set, the REST client waits for WS readiness before deciding
|
|
||||||
// whether to poll.
|
|
||||||
func (c *Client) SetWSClient(ws *WSClient) {
|
|
||||||
_ = ws // stored for future reconnection coordination
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarkWSReady signals that the WS connection is live and the REST poller
|
|
||||||
// should stand down. Called by WSClient after a successful handshake.
|
|
||||||
func (c *Client) MarkWSReady() {
|
|
||||||
select {
|
|
||||||
case <-c.wsReady:
|
|
||||||
// already closed
|
|
||||||
default:
|
|
||||||
close(c.wsReady)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start begins the gateway client loop. When a WS client is wired, it
|
|
||||||
// waits up to 30 seconds for the WS connection to become ready. If WS
|
|
||||||
// connects, the REST poller stands down. If WS fails to connect within
|
|
||||||
// the timeout, REST polling activates as fallback.
|
|
||||||
func (c *Client) Start(ctx context.Context) {
|
func (c *Client) Start(ctx context.Context) {
|
||||||
slog.Info("gateway client starting",
|
slog.Info("gateway client starting",
|
||||||
"url", c.url,
|
"url", c.url,
|
||||||
@@ -121,6 +92,7 @@ func (c *Client) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, ga := range agents {
|
for _, ga := range agents {
|
||||||
|
// Check if agent already exists; if so, update; otherwise create.
|
||||||
existing, err := c.agents.Get(ctx, ga.ID)
|
existing, err := c.agents.Get(ctx, ga.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Not found — create it
|
// Not found — create it
|
||||||
@@ -165,51 +137,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
|||||||
slog.Info("seeding demo agents")
|
slog.Info("seeding demo agents")
|
||||||
demoAgents := []models.AgentCardData{
|
demoAgents := []models.AgentCardData{
|
||||||
{
|
{
|
||||||
ID: "otto",
|
ID: "otto",
|
||||||
DisplayName: "Otto",
|
DisplayName: "Otto",
|
||||||
Role: "Orchestrator",
|
Role: "Orchestrator",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Orchestrating tasks"),
|
CurrentTask: strPtr("Orchestrating tasks"),
|
||||||
SessionKey: "otto-session",
|
SessionKey: "otto-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "rex",
|
ID: "rex",
|
||||||
DisplayName: "Rex",
|
DisplayName: "Rex",
|
||||||
Role: "Frontend Dev",
|
Role: "Frontend Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "rex-session",
|
SessionKey: "rex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "dex",
|
ID: "dex",
|
||||||
DisplayName: "Dex",
|
DisplayName: "Dex",
|
||||||
Role: "Backend Dev",
|
Role: "Backend Dev",
|
||||||
Status: models.AgentStatusThinking,
|
Status: models.AgentStatusThinking,
|
||||||
CurrentTask: strPtr("Designing API contracts"),
|
CurrentTask: strPtr("Designing API contracts"),
|
||||||
SessionKey: "dex-session",
|
SessionKey: "dex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "hex",
|
ID: "hex",
|
||||||
DisplayName: "Hex",
|
DisplayName: "Hex",
|
||||||
Role: "Database Specialist",
|
Role: "Database Specialist",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Reviewing schema migrations"),
|
CurrentTask: strPtr("Reviewing schema migrations"),
|
||||||
SessionKey: "hex-session",
|
SessionKey: "hex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "pip",
|
ID: "pip",
|
||||||
DisplayName: "Pip",
|
DisplayName: "Pip",
|
||||||
Role: "Edge Device Dev",
|
Role: "Edge Device Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "pip-session",
|
SessionKey: "pip-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -223,4 +195,4 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func strPtr(s string) *string { return &s }
|
func strPtr(s string) *string { return &s }
|
||||||
|
|||||||
@@ -1,243 +0,0 @@
|
|||||||
// Package gateway provides real-time event handlers for the Control Center
|
|
||||||
// WebSocket client. Handlers process gateway events (sessions.changed,
|
|
||||||
// presence, agent.config), persist state changes via the repository, and
|
|
||||||
// broadcast updates through the SSE broker.
|
|
||||||
//
|
|
||||||
// Rule: DB update first, then SSE broadcast. This keeps REST API responses
|
|
||||||
// consistent with SSE events.
|
|
||||||
package gateway
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ── Event payload types ──────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// sessionChangedPayload represents a single session delta from a
|
|
||||||
// sessions.changed event.
|
|
||||||
type sessionChangedPayload struct {
|
|
||||||
SessionKey string `json:"sessionKey"`
|
|
||||||
AgentID string `json:"agentId"`
|
|
||||||
Status string `json:"status"` // running, streaming, done, error
|
|
||||||
TotalTokens int `json:"totalTokens"`
|
|
||||||
LastActivityAt string `json:"lastActivityAt"`
|
|
||||||
CurrentTask string `json:"currentTask"`
|
|
||||||
TaskProgress *int `json:"taskProgress,omitempty"`
|
|
||||||
TaskElapsed string `json:"taskElapsed"`
|
|
||||||
ErrorMessage string `json:"errorMessage"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// presencePayload represents a device presence update event.
|
|
||||||
type presencePayload struct {
|
|
||||||
AgentID string `json:"agentId"`
|
|
||||||
Connected *bool `json:"connected,omitempty"`
|
|
||||||
LastActivityAt string `json:"lastActivityAt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// agentConfigPayload represents an agent configuration change event.
|
|
||||||
type agentConfigPayload struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
Name string `json:"name"`
|
|
||||||
Role string `json:"role"`
|
|
||||||
Model string `json:"model"`
|
|
||||||
Channel string `json:"channel"`
|
|
||||||
Metadata json.RawMessage `json:"metadata"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Handler registration ─────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// registerEventHandlers sets up all live event handlers on the WSClient.
|
|
||||||
// Called once after a successful handshake + initial sync.
|
|
||||||
func (c *WSClient) registerEventHandlers() {
|
|
||||||
c.OnEvent("sessions.changed", c.handleSessionsChanged)
|
|
||||||
c.OnEvent("presence", c.handlePresence)
|
|
||||||
c.OnEvent("agent.config", c.handleAgentConfig)
|
|
||||||
|
|
||||||
c.logger.Info("event handlers registered",
|
|
||||||
"events", []string{"sessions.changed", "presence", "agent.config"})
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── sessions.changed ─────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// handleSessionsChanged processes sessions.changed events from the gateway.
|
|
||||||
// The payload may be a single session object or an array of session deltas.
|
|
||||||
// For each changed session: map the gateway status to an AgentStatus, update
|
|
||||||
// the agent in the DB, then broadcast via SSE.
|
|
||||||
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
|
||||||
c.logger.Debug("handleSessionsChanged", "payload", string(payload))
|
|
||||||
|
|
||||||
// Try array first, then single object
|
|
||||||
var deltas []sessionChangedPayload
|
|
||||||
if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 {
|
|
||||||
var single sessionChangedPayload
|
|
||||||
if err := json.Unmarshal(payload, &single); err != nil {
|
|
||||||
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
deltas = []sessionChangedPayload{single}
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
for _, d := range deltas {
|
|
||||||
if d.AgentID == "" {
|
|
||||||
c.logger.Debug("sessions.changed: skipping delta with empty agentId")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
agentStatus := mapSessionStatus(d.Status)
|
|
||||||
|
|
||||||
update := models.UpdateAgentRequest{
|
|
||||||
Status: &agentStatus,
|
|
||||||
}
|
|
||||||
|
|
||||||
if d.CurrentTask != "" {
|
|
||||||
update.CurrentTask = &d.CurrentTask
|
|
||||||
}
|
|
||||||
|
|
||||||
if d.TaskProgress != nil {
|
|
||||||
update.TaskProgress = d.TaskProgress
|
|
||||||
}
|
|
||||||
|
|
||||||
if d.TaskElapsed != "" {
|
|
||||||
update.TaskElapsed = &d.TaskElapsed
|
|
||||||
}
|
|
||||||
|
|
||||||
if d.ErrorMessage != "" {
|
|
||||||
update.ErrorMessage = &d.ErrorMessage
|
|
||||||
}
|
|
||||||
|
|
||||||
// If session ended, clear task and progress
|
|
||||||
if agentStatus == models.AgentStatusIdle {
|
|
||||||
emptyTask := ""
|
|
||||||
update.CurrentTask = &emptyTask
|
|
||||||
zeroProg := 0
|
|
||||||
update.TaskProgress = &zeroProg
|
|
||||||
}
|
|
||||||
|
|
||||||
// DB update first
|
|
||||||
updated, err := c.agents.Update(ctx, d.AgentID, update)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Warn("sessions.changed: DB update failed",
|
|
||||||
"agentId", d.AgentID, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then SSE broadcast
|
|
||||||
c.broker.Broadcast("agent.status", updated)
|
|
||||||
if d.TaskProgress != nil || d.CurrentTask != "" {
|
|
||||||
c.broker.Broadcast("agent.progress", updated)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Debug("sessions.changed: agent updated",
|
|
||||||
"agentId", d.AgentID, "status", string(agentStatus))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── presence ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// handlePresence processes presence events from the gateway. Updates the
|
|
||||||
// agent's lastActivity and broadcasts status if the connection state changed.
|
|
||||||
func (c *WSClient) handlePresence(payload json.RawMessage) {
|
|
||||||
c.logger.Debug("handlePresence", "payload", string(payload))
|
|
||||||
|
|
||||||
var p presencePayload
|
|
||||||
if err := json.Unmarshal(payload, &p); err != nil {
|
|
||||||
c.logger.Warn("presence: unparseable payload, skipping", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.AgentID == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
update := models.UpdateAgentRequest{}
|
|
||||||
|
|
||||||
// If device disconnected, set agent to idle
|
|
||||||
if p.Connected != nil && !*p.Connected {
|
|
||||||
idle := models.AgentStatusIdle
|
|
||||||
update.Status = &idle
|
|
||||||
}
|
|
||||||
|
|
||||||
// DB update first (Update always bumps last_activity)
|
|
||||||
updated, err := c.agents.Update(ctx, p.AgentID, update)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Warn("presence: DB update failed",
|
|
||||||
"agentId", p.AgentID, "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.LastActivityAt != "" {
|
|
||||||
updated.LastActivity = p.LastActivityAt
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then SSE broadcast
|
|
||||||
c.broker.Broadcast("agent.status", updated)
|
|
||||||
|
|
||||||
c.logger.Debug("presence: agent updated",
|
|
||||||
"agentId", p.AgentID, "connected", p.Connected)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── agent.config ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// handleAgentConfig processes agent.config events from the gateway. Updates
|
|
||||||
// agent metadata (channel) in the DB and broadcasts a fleet.update with the
|
|
||||||
// full fleet snapshot.
|
|
||||||
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
|
||||||
c.logger.Debug("handleAgentConfig", "payload", string(payload))
|
|
||||||
|
|
||||||
var cfg agentConfigPayload
|
|
||||||
if err := json.Unmarshal(payload, &cfg); err != nil {
|
|
||||||
c.logger.Warn("agent.config: unparseable payload, skipping", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.ID == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
update := models.UpdateAgentRequest{}
|
|
||||||
|
|
||||||
if cfg.Channel != "" {
|
|
||||||
update.Channel = &cfg.Channel
|
|
||||||
}
|
|
||||||
|
|
||||||
// DB update first
|
|
||||||
updated, err := c.agents.Update(ctx, cfg.ID, update)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Warn("agent.config: DB update failed",
|
|
||||||
"agentId", cfg.ID, "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply display name/role from config event
|
|
||||||
if cfg.Name != "" {
|
|
||||||
updated.DisplayName = cfg.Name
|
|
||||||
}
|
|
||||||
if cfg.Role != "" {
|
|
||||||
updated.Role = cfg.Role
|
|
||||||
}
|
|
||||||
|
|
||||||
// Broadcast full fleet snapshot so frontend gets updated agent info
|
|
||||||
allAgents, err := c.agents.List(ctx, "")
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err)
|
|
||||||
c.broker.Broadcast("agent.status", updated)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.broker.Broadcast("fleet.update", allAgents)
|
|
||||||
|
|
||||||
c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name)
|
|
||||||
}
|
|
||||||
@@ -1,187 +0,0 @@
|
|||||||
// Package gateway provides the initial sync logic that fetches agent and
|
|
||||||
// session data from the OpenClaw gateway via WS RPCs after handshake,
|
|
||||||
// persists to the repository, merges session state into agent cards, and
|
|
||||||
// broadcasts the merged fleet to SSE clients.
|
|
||||||
package gateway
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ── RPC response types ───────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// agentListItem represents a single agent returned by the agents.list RPC.
|
|
||||||
type agentListItem struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
Name string `json:"name"`
|
|
||||||
Model string `json:"model"`
|
|
||||||
Role string `json:"role"`
|
|
||||||
Channel string `json:"channel"`
|
|
||||||
Metadata json.RawMessage `json:"metadata"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// sessionListItem represents a single session returned by the sessions.list RPC.
|
|
||||||
type sessionListItem struct {
|
|
||||||
SessionKey string `json:"sessionKey"`
|
|
||||||
AgentID string `json:"agentId"`
|
|
||||||
Status string `json:"status"` // running, done, streaming, error
|
|
||||||
TotalTokens int `json:"totalTokens"`
|
|
||||||
LastActivityAt string `json:"lastActivityAt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Sync logic ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// initialSync fetches agents and sessions from the gateway via WS RPCs,
|
|
||||||
// persists them, merges session state into agent cards, and broadcasts
|
|
||||||
// the merged fleet as a fleet.update event.
|
|
||||||
func (c *WSClient) initialSync(ctx context.Context) error {
|
|
||||||
c.logger.Info("initial sync starting")
|
|
||||||
|
|
||||||
// 1. Fetch agents via RPC
|
|
||||||
agentsRaw, err := c.Send("agents.list", nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("agents.list RPC: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var agentItems []agentListItem
|
|
||||||
if err := json.Unmarshal(agentsRaw, &agentItems); err != nil {
|
|
||||||
return fmt.Errorf("parse agents.list response: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Info("agents.list received", "count", len(agentItems))
|
|
||||||
|
|
||||||
// 2. Persist each agent (create if not exists, update if changed)
|
|
||||||
for _, item := range agentItems {
|
|
||||||
card := agentItemToCard(item)
|
|
||||||
|
|
||||||
existing, err := c.agents.Get(ctx, card.ID)
|
|
||||||
if err != nil {
|
|
||||||
// Agent doesn't exist — create it
|
|
||||||
if createErr := c.agents.Create(ctx, card); createErr != nil {
|
|
||||||
c.logger.Warn("sync: agent create failed", "id", card.ID, "error", createErr)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
c.logger.Info("sync: agent created", "id", card.ID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Agent exists — update display name or role if changed
|
|
||||||
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
|
|
||||||
// Update what we can via UpdateAgentRequest
|
|
||||||
channel := card.Channel
|
|
||||||
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
|
||||||
Channel: &channel,
|
|
||||||
})
|
|
||||||
if updateErr != nil {
|
|
||||||
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Fetch sessions via RPC
|
|
||||||
sessionsRaw, err := c.Send("sessions.list", nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("sessions.list RPC: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var sessionItems []sessionListItem
|
|
||||||
if err := json.Unmarshal(sessionsRaw, &sessionItems); err != nil {
|
|
||||||
return fmt.Errorf("parse sessions.list response: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Info("sessions.list received", "count", len(sessionItems))
|
|
||||||
|
|
||||||
// 4. Build agentId → session map for merge
|
|
||||||
sessionByAgent := make(map[string]sessionListItem)
|
|
||||||
for _, s := range sessionItems {
|
|
||||||
if s.AgentID != "" {
|
|
||||||
sessionByAgent[s.AgentID] = s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 5. Merge session state into agents, update DB, and collect for broadcast
|
|
||||||
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
|
|
||||||
|
|
||||||
for _, item := range agentItems {
|
|
||||||
card := agentItemToCard(item)
|
|
||||||
|
|
||||||
if session, ok := sessionByAgent[item.ID]; ok {
|
|
||||||
// Merge session state into agent card
|
|
||||||
card.SessionKey = session.SessionKey
|
|
||||||
card.Status = mapSessionStatus(session.Status)
|
|
||||||
card.LastActivity = session.LastActivityAt
|
|
||||||
|
|
||||||
if session.TotalTokens > 0 {
|
|
||||||
prog := min(session.TotalTokens/100, 100)
|
|
||||||
card.TaskProgress = &prog
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Persist merged status change
|
|
||||||
existing, err := c.agents.Get(ctx, card.ID)
|
|
||||||
if err == nil && existing.Status != card.Status {
|
|
||||||
status := card.Status
|
|
||||||
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
|
|
||||||
Status: &status,
|
|
||||||
})
|
|
||||||
if updateErr != nil {
|
|
||||||
c.logger.Warn("sync: agent status update failed", "id", card.ID, "error", updateErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mergedAgents = append(mergedAgents, card)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 6. Broadcast the full merged fleet
|
|
||||||
c.broker.Broadcast("fleet.update", mergedAgents)
|
|
||||||
c.logger.Info("initial sync complete", "agents", len(mergedAgents))
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// mapSessionStatus converts a gateway session status string to an AgentStatus.
|
|
||||||
// - "running" / "streaming" → active
|
|
||||||
// - "error" → error
|
|
||||||
// - "done" / "" / other → idle
|
|
||||||
func mapSessionStatus(status string) models.AgentStatus {
|
|
||||||
switch status {
|
|
||||||
case "running", "streaming":
|
|
||||||
return models.AgentStatusActive
|
|
||||||
case "error":
|
|
||||||
return models.AgentStatusError
|
|
||||||
default:
|
|
||||||
return models.AgentStatusIdle
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// agentItemToCard converts an agentListItem from the gateway RPC into an
|
|
||||||
// AgentCardData suitable for persistence and broadcasting.
|
|
||||||
func agentItemToCard(item agentListItem) models.AgentCardData {
|
|
||||||
role := item.Role
|
|
||||||
if role == "" {
|
|
||||||
role = "agent"
|
|
||||||
}
|
|
||||||
channel := item.Channel
|
|
||||||
if channel == "" {
|
|
||||||
channel = "discord"
|
|
||||||
}
|
|
||||||
name := item.Name
|
|
||||||
if name == "" {
|
|
||||||
name = item.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
return models.AgentCardData{
|
|
||||||
ID: item.ID,
|
|
||||||
DisplayName: name,
|
|
||||||
Role: role,
|
|
||||||
Status: models.AgentStatusIdle, // default; overridden by session merge
|
|
||||||
SessionKey: "",
|
|
||||||
Channel: channel,
|
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,443 +0,0 @@
|
|||||||
// Package gateway provides WebSocket client integration with the OpenClaw
|
|
||||||
// gateway using WS protocol v3. The WSClient handles connection, handshake,
|
|
||||||
// frame routing, request/response correlation, and automatic reconnection
|
|
||||||
// with exponential backoff (1s → 30s max).
|
|
||||||
package gateway
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"log/slog"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
)
|
|
||||||
|
|
||||||
// WSConfig holds WebSocket client configuration, typically loaded from
|
|
||||||
// environment variables. AuthToken must be set to a valid OpenClaw gateway
|
|
||||||
// operator token.
|
|
||||||
type WSConfig struct {
|
|
||||||
URL string // e.g. "ws://host.docker.internal:18789/"
|
|
||||||
AuthToken string // from OPENCLAW_GATEWAY_TOKEN
|
|
||||||
}
|
|
||||||
|
|
||||||
// DefaultWSConfig returns sensible defaults for local development.
|
|
||||||
func DefaultWSConfig() WSConfig {
|
|
||||||
return WSConfig{
|
|
||||||
URL: "ws://localhost:18789/",
|
|
||||||
AuthToken: "",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// eventHandler is a callback invoked when a named event arrives from the
|
|
||||||
// gateway.
|
|
||||||
type eventHandler func(json.RawMessage)
|
|
||||||
|
|
||||||
// WSClient connects to the OpenClaw gateway over WebSocket, completes the
|
|
||||||
// v3 handshake, routes incoming frames, and automatically reconnects on
|
|
||||||
// disconnect with exponential backoff (1s → 30s max).
|
|
||||||
type WSClient struct {
|
|
||||||
config WSConfig
|
|
||||||
conn *websocket.Conn
|
|
||||||
connMu sync.Mutex // protects conn for writes
|
|
||||||
pending map[string]chan<- json.RawMessage
|
|
||||||
mu sync.Mutex // protects pending and handlers
|
|
||||||
agents repository.AgentRepo
|
|
||||||
broker *handler.Broker
|
|
||||||
logger *slog.Logger
|
|
||||||
handlers map[string][]eventHandler
|
|
||||||
connID string // set after successful hello-ok
|
|
||||||
restClient *Client // optional REST client to notify on WS ready
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewWSClient returns a WSClient wired to the given repository and broker.
|
|
||||||
func NewWSClient(cfg WSConfig, agents repository.AgentRepo, broker *handler.Broker, logger *slog.Logger) *WSClient {
|
|
||||||
if logger == nil {
|
|
||||||
logger = slog.Default()
|
|
||||||
}
|
|
||||||
return &WSClient{
|
|
||||||
config: cfg,
|
|
||||||
pending: make(map[string]chan<- json.RawMessage),
|
|
||||||
agents: agents,
|
|
||||||
broker: broker,
|
|
||||||
logger: logger,
|
|
||||||
handlers: make(map[string][]eventHandler),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetRESTClient wires the REST fallback client so the WS client can notify
|
|
||||||
// it when the WS connection is ready. Call this before Start.
|
|
||||||
func (c *WSClient) SetRESTClient(rest *Client) {
|
|
||||||
c.restClient = rest
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnEvent registers a handler for the given event name. Handlers are called
|
|
||||||
// when an incoming frame with type "event" and matching event name is
|
|
||||||
// received. Safe to call before Start.
|
|
||||||
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
c.handlers[event] = append(c.handlers[event], handler)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Frame types ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
|
|
||||||
type wsFrame struct {
|
|
||||||
Type string `json:"type"` // "req", "res", "event"
|
|
||||||
ID string `json:"id,omitempty"` // request/response correlation
|
|
||||||
Method string `json:"method,omitempty"` // method name (req/res frames)
|
|
||||||
Event string `json:"event,omitempty"` // event name (event frames)
|
|
||||||
Params json.RawMessage `json:"params,omitempty"`
|
|
||||||
Result json.RawMessage `json:"result,omitempty"`
|
|
||||||
Error *wsError `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// wsError represents an error in a response frame.
|
|
||||||
type wsError struct {
|
|
||||||
Code int `json:"code"`
|
|
||||||
Message string `json:"message"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// connectRequest builds the initial connect handshake payload.
|
|
||||||
type connectRequest struct {
|
|
||||||
MinProtocol int `json:"minProtocol"`
|
|
||||||
MaxProtocol int `json:"maxProtocol"`
|
|
||||||
Client connectClientInfo `json:"client"`
|
|
||||||
Role string `json:"role"`
|
|
||||||
Scopes []string `json:"scopes"`
|
|
||||||
Auth connectAuth `json:"auth"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type connectClientInfo struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
Version string `json:"version"`
|
|
||||||
Platform string `json:"platform"`
|
|
||||||
Mode string `json:"mode"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type connectAuth struct {
|
|
||||||
Token string `json:"token"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// helloOKResponse represents the expected response to a successful connect.
|
|
||||||
type helloOKResponse struct {
|
|
||||||
ConnID string `json:"connId"`
|
|
||||||
Features struct {
|
|
||||||
Methods []string `json:"methods"`
|
|
||||||
Events []string `json:"events"`
|
|
||||||
} `json:"features"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Start loop ───────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// Start connects to the gateway, completes the handshake, and begins the
|
|
||||||
// read loop. On disconnect it reconnects with exponential backoff (1s → 30s).
|
|
||||||
// On ctx cancellation it performs a clean shutdown.
|
|
||||||
func (c *WSClient) Start(ctx context.Context) {
|
|
||||||
backoff := 1 * time.Second
|
|
||||||
maxBackoff := 30 * time.Second
|
|
||||||
|
|
||||||
for {
|
|
||||||
err := c.connectAndRun(ctx)
|
|
||||||
if err != nil {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
c.logger.Info("ws client stopped (context cancelled)")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.logger.Warn("ws client disconnected, reconnecting",
|
|
||||||
"error", err,
|
|
||||||
"backoff", backoff)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
c.logger.Info("ws client stopped during backoff (context cancelled)")
|
|
||||||
return
|
|
||||||
case <-time.After(backoff):
|
|
||||||
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s
|
|
||||||
backoff = backoff * 2
|
|
||||||
if backoff > maxBackoff {
|
|
||||||
backoff = maxBackoff
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// connectAndRun dials the gateway, completes the handshake, and runs the
|
|
||||||
// read loop until an error occurs or ctx is cancelled.
|
|
||||||
func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|
||||||
c.logger.Info("ws client connecting", "url", c.config.URL)
|
|
||||||
|
|
||||||
dialer := websocket.Dialer{
|
|
||||||
HandshakeTimeout: 10 * time.Second,
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, _, err := dialer.DialContext(ctx, c.config.URL, nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("dial failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.connMu.Lock()
|
|
||||||
c.conn = conn
|
|
||||||
c.connMu.Unlock()
|
|
||||||
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Step 1: Read the connect.challenge frame
|
|
||||||
if err := c.readChallenge(conn); err != nil {
|
|
||||||
return fmt.Errorf("handshake challenge: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Step 2: Send connect request and read hello-ok response
|
|
||||||
helloOK, err := c.sendConnect(conn)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("handshake connect: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Info("ws client handshake complete",
|
|
||||||
"connId", helloOK.ConnID,
|
|
||||||
"methods", helloOK.Features.Methods,
|
|
||||||
"events", helloOK.Features.Events)
|
|
||||||
|
|
||||||
c.connMu.Lock()
|
|
||||||
c.connID = helloOK.ConnID
|
|
||||||
c.connMu.Unlock()
|
|
||||||
|
|
||||||
// Notify REST client that WS is live so it stands down
|
|
||||||
if c.restClient != nil {
|
|
||||||
c.restClient.MarkWSReady()
|
|
||||||
c.logger.Info("ws client notified REST fallback to stand down")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Step 3: Initial sync — fetch agents + sessions from gateway
|
|
||||||
if err := c.initialSync(ctx); err != nil {
|
|
||||||
c.logger.Warn("initial sync failed, continuing with read loop", "error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Step 4: Register live event handlers
|
|
||||||
c.registerEventHandlers()
|
|
||||||
|
|
||||||
// Step 5: Read loop — blocks until disconnect or ctx cancel
|
|
||||||
return c.readLoop(ctx, conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
// readChallenge reads the first frame from the gateway, which must be a
|
|
||||||
// connect.challenge event.
|
|
||||||
func (c *WSClient) readChallenge(conn *websocket.Conn) error {
|
|
||||||
var frame wsFrame
|
|
||||||
if err := conn.ReadJSON(&frame); err != nil {
|
|
||||||
return fmt.Errorf("read challenge: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if frame.Type != "event" || frame.Event != "connect.challenge" {
|
|
||||||
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Debug("received connect.challenge")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendConnect sends the connect request and waits for the hello-ok response.
|
|
||||||
func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
|
|
||||||
reqID := uuid.New().String()
|
|
||||||
params := connectRequest{
|
|
||||||
MinProtocol: 3,
|
|
||||||
MaxProtocol: 3,
|
|
||||||
Client: connectClientInfo{
|
|
||||||
ID: "control-center",
|
|
||||||
Version: "1.0",
|
|
||||||
Platform: "server",
|
|
||||||
Mode: "operator",
|
|
||||||
},
|
|
||||||
Role: "operator",
|
|
||||||
Scopes: []string{"operator.read"},
|
|
||||||
Auth: connectAuth{
|
|
||||||
Token: c.config.AuthToken,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
paramsJSON, err := json.Marshal(params)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshal connect params: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
reqFrame := wsFrame{
|
|
||||||
Type: "req",
|
|
||||||
ID: reqID,
|
|
||||||
Method: "connect",
|
|
||||||
Params: paramsJSON,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := conn.WriteJSON(reqFrame); err != nil {
|
|
||||||
return nil, fmt.Errorf("write connect request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read response
|
|
||||||
var resFrame wsFrame
|
|
||||||
if err := conn.ReadJSON(&resFrame); err != nil {
|
|
||||||
return nil, fmt.Errorf("read connect response: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if resFrame.Error != nil {
|
|
||||||
return nil, fmt.Errorf("connect rejected: code=%d msg=%s", resFrame.Error.Code, resFrame.Error.Message)
|
|
||||||
}
|
|
||||||
|
|
||||||
if resFrame.ID != reqID {
|
|
||||||
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
var helloOK helloOKResponse
|
|
||||||
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
|
|
||||||
return nil, fmt.Errorf("parse hello-ok: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &helloOK, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// readLoop continuously reads frames from the connection and routes them.
|
|
||||||
// It returns on read error or context cancellation.
|
|
||||||
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
// Clean shutdown: send close frame
|
|
||||||
c.connMu.Lock()
|
|
||||||
c.conn.WriteControl(
|
|
||||||
websocket.CloseMessage,
|
|
||||||
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"),
|
|
||||||
time.Now().Add(5*time.Second),
|
|
||||||
)
|
|
||||||
c.connMu.Unlock()
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
var frame wsFrame
|
|
||||||
if err := conn.ReadJSON(&frame); err != nil {
|
|
||||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
|
||||||
c.logger.Info("ws connection closed by server")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if websocket.IsUnexpectedCloseError(err) {
|
|
||||||
c.logger.Warn("ws connection unexpectedly closed", "error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return fmt.Errorf("read frame: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.routeFrame(frame)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// routeFrame dispatches a received frame to the appropriate handler.
|
|
||||||
func (c *WSClient) routeFrame(frame wsFrame) {
|
|
||||||
switch frame.Type {
|
|
||||||
case "res":
|
|
||||||
c.handleResponse(frame)
|
|
||||||
case "event":
|
|
||||||
c.handleEvent(frame)
|
|
||||||
default:
|
|
||||||
c.logger.Debug("unknown frame type", "type", frame.Type, "id", frame.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleResponse correlates a response frame to a pending request channel.
|
|
||||||
func (c *WSClient) handleResponse(frame wsFrame) {
|
|
||||||
c.mu.Lock()
|
|
||||||
ch, ok := c.pending[frame.ID]
|
|
||||||
if ok {
|
|
||||||
delete(c.pending, frame.ID)
|
|
||||||
}
|
|
||||||
c.mu.Unlock()
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
c.logger.Warn("received response for unknown request", "id", frame.ID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if frame.Error != nil {
|
|
||||||
ch <- nil
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ch <- frame.Result
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleEvent dispatches an event frame to registered handlers.
|
|
||||||
func (c *WSClient) handleEvent(frame wsFrame) {
|
|
||||||
c.mu.Lock()
|
|
||||||
handlers := c.handlers[frame.Event]
|
|
||||||
c.mu.Unlock()
|
|
||||||
|
|
||||||
if len(handlers) == 0 {
|
|
||||||
c.logger.Debug("unhandled event", "event", frame.Event)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, h := range handlers {
|
|
||||||
h(frame.Params)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Send (RPC) ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
// Send sends a JSON-RPC request to the gateway and returns the response
|
|
||||||
// payload. It is safe for concurrent use.
|
|
||||||
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
|
|
||||||
reqID := uuid.New().String()
|
|
||||||
|
|
||||||
var paramsJSON json.RawMessage
|
|
||||||
if params != nil {
|
|
||||||
var err error
|
|
||||||
paramsJSON, err = json.Marshal(params)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshal params: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register pending response channel
|
|
||||||
respCh := make(chan json.RawMessage, 1)
|
|
||||||
c.mu.Lock()
|
|
||||||
c.pending[reqID] = respCh
|
|
||||||
c.mu.Unlock()
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
c.mu.Lock()
|
|
||||||
delete(c.pending, reqID)
|
|
||||||
c.mu.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Build and send frame
|
|
||||||
frame := wsFrame{
|
|
||||||
Type: "req",
|
|
||||||
ID: reqID,
|
|
||||||
Method: method,
|
|
||||||
Params: paramsJSON,
|
|
||||||
}
|
|
||||||
|
|
||||||
c.connMu.Lock()
|
|
||||||
err := c.conn.WriteJSON(frame)
|
|
||||||
c.connMu.Unlock()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("write request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for response with timeout
|
|
||||||
select {
|
|
||||||
case resp := <-respCh:
|
|
||||||
if resp == nil {
|
|
||||||
return nil, fmt.Errorf("gateway returned error for request %s (%s)", reqID, method)
|
|
||||||
}
|
|
||||||
return resp, nil
|
|
||||||
case <-time.After(30 * time.Second):
|
|
||||||
return nil, fmt.Errorf("request %s (%s) timed out", reqID, method)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user