CUB-206: WSClient integration tests with mock WebSocket server #43
285
go-backend/internal/gateway/events.go
Normal file
285
go-backend/internal/gateway/events.go
Normal file
@@ -0,0 +1,285 @@
|
||||
// Package gateway provides real-time event handlers for the Control Center
|
||||
// WebSocket client. Handlers process gateway events (sessions.changed,
|
||||
// presence, agent.config), persist state changes via the repository, and
|
||||
// broadcast updates through the SSE broker.
|
||||
//
|
||||
// Rule: DB update first, then SSE broadcast. This keeps REST API responses
|
||||
// consistent with SSE events.
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||
)
|
||||
|
||||
// ── Event payload types ──────────────────────────────────────────────────
|
||||
|
||||
// sessionChangedPayload represents a single session delta from a
|
||||
// sessions.changed event. Fields are optional; use json.RawMessage for
|
||||
// anything we don't strictly need.
|
||||
type sessionChangedPayload struct {
|
||||
SessionKey string `json:"sessionKey"`
|
||||
AgentID string `json:"agentId"`
|
||||
Status string `json:"status"` // running, streaming, done, error
|
||||
TotalTokens int `json:"totalTokens"`
|
||||
LastActivityAt string `json:"lastActivityAt"`
|
||||
CurrentTask string `json:"currentTask"`
|
||||
TaskProgress *int `json:"taskProgress,omitempty"`
|
||||
TaskElapsed string `json:"taskElapsed"`
|
||||
ErrorMessage string `json:"errorMessage"`
|
||||
Extra json.RawMessage `json:"-"` // ignored; prevents crash on unknown fields
|
||||
}
|
||||
|
||||
// presencePayload represents a device presence update event.
|
||||
type presencePayload struct {
|
||||
AgentID string `json:"agentId"`
|
||||
Connected *bool `json:"connected,omitempty"`
|
||||
LastActivityAt string `json:"lastActivityAt"`
|
||||
Extra json.RawMessage `json:"-"` // ignored
|
||||
}
|
||||
|
||||
// agentConfigPayload represents an agent configuration change event.
|
||||
type agentConfigPayload struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Role string `json:"role"`
|
||||
Model string `json:"model"`
|
||||
Channel string `json:"channel"`
|
||||
Metadata json.RawMessage `json:"metadata"`
|
||||
Extra json.RawMessage `json:"-"` // ignored
|
||||
}
|
||||
|
||||
// ── Handler registration ─────────────────────────────────────────────────
|
||||
|
||||
// registerEventHandlers sets up all live event handlers on the WSClient.
|
||||
// Call this once after a successful handshake + initial sync.
|
||||
func (c *WSClient) registerEventHandlers() {
|
||||
c.OnEvent("sessions.changed", c.handleSessionsChanged)
|
||||
c.OnEvent("presence", c.handlePresence)
|
||||
c.OnEvent("agent.config", c.handleAgentConfig)
|
||||
|
||||
c.logger.Info("event handlers registered",
|
||||
"events", []string{"sessions.changed", "presence", "agent.config"})
|
||||
}
|
||||
|
||||
// ── sessions.changed ─────────────────────────────────────────────────────
|
||||
|
||||
// handleSessionsChanged processes sessions.changed events from the gateway.
|
||||
// The payload may be a single session object or an array of session deltas.
|
||||
// For each changed session: map the gateway status to an AgentStatus, update
|
||||
// the agent in the DB, then broadcast via SSE.
|
||||
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
||||
c.logger.Debug("handleSessionsChanged start", "payload", string(payload))
|
||||
|
||||
// Try array first, then single object
|
||||
var deltas []sessionChangedPayload
|
||||
if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 {
|
||||
// Array of deltas
|
||||
} else {
|
||||
// Try single object
|
||||
var single sessionChangedPayload
|
||||
if err := json.Unmarshal(payload, &single); err != nil {
|
||||
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
|
||||
return
|
||||
}
|
||||
deltas = []sessionChangedPayload{single}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
for _, d := range deltas {
|
||||
if d.AgentID == "" {
|
||||
c.logger.Debug("sessions.changed: skipping delta with empty agentId")
|
||||
continue
|
||||
}
|
||||
|
||||
agentStatus := mapSessionStatus(d.Status)
|
||||
|
||||
// Build partial update
|
||||
update := models.UpdateAgentRequest{
|
||||
Status: &agentStatus,
|
||||
}
|
||||
|
||||
// Session key
|
||||
if d.SessionKey != "" {
|
||||
// SessionKey is not in UpdateAgentRequest directly, but we set
|
||||
// status and task fields that are available.
|
||||
}
|
||||
|
||||
// Current task
|
||||
if d.CurrentTask != "" {
|
||||
update.CurrentTask = &d.CurrentTask
|
||||
}
|
||||
|
||||
// Task progress
|
||||
if d.TaskProgress != nil {
|
||||
update.TaskProgress = d.TaskProgress
|
||||
} else if d.TotalTokens > 0 {
|
||||
// Derive progress from token count as fallback
|
||||
prog := min(d.TotalTokens/100, 100)
|
||||
update.TaskProgress = &prog
|
||||
}
|
||||
|
||||
// Task elapsed
|
||||
if d.TaskElapsed != "" {
|
||||
update.TaskElapsed = &d.TaskElapsed
|
||||
}
|
||||
|
||||
// Error message
|
||||
if d.ErrorMessage != "" {
|
||||
update.ErrorMessage = &d.ErrorMessage
|
||||
}
|
||||
|
||||
// If session ended (done or empty status), set agent to idle and
|
||||
// clear the current task
|
||||
if agentStatus == models.AgentStatusIdle {
|
||||
emptyTask := ""
|
||||
update.CurrentTask = &emptyTask
|
||||
zeroProg := 0
|
||||
update.TaskProgress = &zeroProg
|
||||
}
|
||||
|
||||
// Update DB first
|
||||
updated, err := c.agents.Update(ctx, d.AgentID, update)
|
||||
if err != nil {
|
||||
c.logger.Warn("sessions.changed: DB update failed",
|
||||
"agentId", d.AgentID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Then broadcast
|
||||
c.broker.Broadcast("agent.status", updated)
|
||||
if d.TaskProgress != nil || d.CurrentTask != "" {
|
||||
c.broker.Broadcast("agent.progress", updated)
|
||||
}
|
||||
|
||||
c.logger.Debug("sessions.changed: agent updated",
|
||||
"agentId", d.AgentID,
|
||||
"status", string(agentStatus))
|
||||
}
|
||||
|
||||
c.logger.Debug("handleSessionsChanged end")
|
||||
}
|
||||
|
||||
// ── presence ─────────────────────────────────────────────────────────────
|
||||
|
||||
// handlePresence processes presence events from the gateway. Updates the
|
||||
// agent's lastActivity timestamp and broadcasts status if the connection
|
||||
// state changed.
|
||||
func (c *WSClient) handlePresence(payload json.RawMessage) {
|
||||
c.logger.Debug("handlePresence start", "payload", string(payload))
|
||||
|
||||
var p presencePayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
c.logger.Warn("presence: unparseable payload, skipping", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if p.AgentID == "" {
|
||||
c.logger.Debug("presence: skipping event with empty agentId")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// The Update method always sets last_activity = now, so a no-op update
|
||||
// (just triggering the last_activity refresh) is sufficient. We send
|
||||
// an empty-ish update — the repo always bumps last_activity.
|
||||
// If connection state is reported, also update status.
|
||||
update := models.UpdateAgentRequest{}
|
||||
|
||||
if p.Connected != nil && !*p.Connected {
|
||||
// Device disconnected — set agent to idle
|
||||
idle := models.AgentStatusIdle
|
||||
update.Status = &idle
|
||||
}
|
||||
|
||||
// Update DB first
|
||||
updated, err := c.agents.Update(ctx, p.AgentID, update)
|
||||
if err != nil {
|
||||
c.logger.Warn("presence: DB update failed",
|
||||
"agentId", p.AgentID, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Use reported timestamp if available
|
||||
if p.LastActivityAt != "" {
|
||||
updated.LastActivity = p.LastActivityAt
|
||||
}
|
||||
|
||||
// Then broadcast
|
||||
c.broker.Broadcast("agent.status", updated)
|
||||
|
||||
c.logger.Debug("presence: agent updated",
|
||||
"agentId", p.AgentID,
|
||||
"connected", p.Connected)
|
||||
}
|
||||
|
||||
// ── agent.config ─────────────────────────────────────────────────────────
|
||||
|
||||
// handleAgentConfig processes agent.config events from the gateway. Updates
|
||||
// agent metadata (name, channel) in the DB and broadcasts a fleet.update
|
||||
// with the full fleet snapshot.
|
||||
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
||||
c.logger.Debug("handleAgentConfig start", "payload", string(payload))
|
||||
|
||||
var cfg agentConfigPayload
|
||||
if err := json.Unmarshal(payload, &cfg); err != nil {
|
||||
c.logger.Warn("agent.config: unparseable payload, skipping", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if cfg.ID == "" {
|
||||
c.logger.Debug("agent.config: skipping event with empty id")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Build partial update with available fields.
|
||||
// Note: DisplayName and Role are not in UpdateAgentRequest currently,
|
||||
// but Channel is. We update what we can and note the gap.
|
||||
update := models.UpdateAgentRequest{}
|
||||
|
||||
if cfg.Channel != "" {
|
||||
update.Channel = &cfg.Channel
|
||||
}
|
||||
|
||||
// Update DB first
|
||||
updated, err := c.agents.Update(ctx, cfg.ID, update)
|
||||
if err != nil {
|
||||
c.logger.Warn("agent.config: DB update failed",
|
||||
"agentId", cfg.ID, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Apply display name from config if the repo returned the default
|
||||
if cfg.Name != "" {
|
||||
updated.DisplayName = cfg.Name
|
||||
}
|
||||
if cfg.Role != "" {
|
||||
updated.Role = cfg.Role
|
||||
}
|
||||
|
||||
// Then broadcast fleet snapshot
|
||||
allAgents, err := c.agents.List(ctx, "")
|
||||
if err != nil {
|
||||
c.logger.Warn("agent.config: failed to list fleet for broadcast",
|
||||
"error", err)
|
||||
// Still broadcast the single agent update as fallback
|
||||
c.broker.Broadcast("agent.status", updated)
|
||||
return
|
||||
}
|
||||
|
||||
c.broker.Broadcast("fleet.update", allAgents)
|
||||
|
||||
c.logger.Debug("agent.config: fleet updated",
|
||||
"agentId", cfg.ID,
|
||||
"name", cfg.Name)
|
||||
}
|
||||
@@ -213,6 +213,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||
}
|
||||
|
||||
// Step 2c: Register live event handlers
|
||||
c.registerEventHandlers()
|
||||
|
||||
// Step 3: Read loop
|
||||
return c.readLoop(ctx, conn)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user