diff --git a/go-backend/internal/gateway/events.go b/go-backend/internal/gateway/events.go new file mode 100644 index 0000000..d6544b6 --- /dev/null +++ b/go-backend/internal/gateway/events.go @@ -0,0 +1,285 @@ +// Package gateway provides real-time event handlers for the Control Center +// WebSocket client. Handlers process gateway events (sessions.changed, +// presence, agent.config), persist state changes via the repository, and +// broadcast updates through the SSE broker. +// +// Rule: DB update first, then SSE broadcast. This keeps REST API responses +// consistent with SSE events. +package gateway + +import ( + "context" + "encoding/json" + "time" + + "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" +) + +// ── Event payload types ────────────────────────────────────────────────── + +// sessionChangedPayload represents a single session delta from a +// sessions.changed event. Fields are optional; use json.RawMessage for +// anything we don't strictly need. +type sessionChangedPayload struct { + SessionKey string `json:"sessionKey"` + AgentID string `json:"agentId"` + Status string `json:"status"` // running, streaming, done, error + TotalTokens int `json:"totalTokens"` + LastActivityAt string `json:"lastActivityAt"` + CurrentTask string `json:"currentTask"` + TaskProgress *int `json:"taskProgress,omitempty"` + TaskElapsed string `json:"taskElapsed"` + ErrorMessage string `json:"errorMessage"` + Extra json.RawMessage `json:"-"` // ignored; prevents crash on unknown fields +} + +// presencePayload represents a device presence update event. +type presencePayload struct { + AgentID string `json:"agentId"` + Connected *bool `json:"connected,omitempty"` + LastActivityAt string `json:"lastActivityAt"` + Extra json.RawMessage `json:"-"` // ignored +} + +// agentConfigPayload represents an agent configuration change event. +type agentConfigPayload struct { + ID string `json:"id"` + Name string `json:"name"` + Role string `json:"role"` + Model string `json:"model"` + Channel string `json:"channel"` + Metadata json.RawMessage `json:"metadata"` + Extra json.RawMessage `json:"-"` // ignored +} + +// ── Handler registration ───────────────────────────────────────────────── + +// registerEventHandlers sets up all live event handlers on the WSClient. +// Call this once after a successful handshake + initial sync. +func (c *WSClient) registerEventHandlers() { + c.OnEvent("sessions.changed", c.handleSessionsChanged) + c.OnEvent("presence", c.handlePresence) + c.OnEvent("agent.config", c.handleAgentConfig) + + c.logger.Info("event handlers registered", + "events", []string{"sessions.changed", "presence", "agent.config"}) +} + +// ── sessions.changed ───────────────────────────────────────────────────── + +// handleSessionsChanged processes sessions.changed events from the gateway. +// The payload may be a single session object or an array of session deltas. +// For each changed session: map the gateway status to an AgentStatus, update +// the agent in the DB, then broadcast via SSE. +func (c *WSClient) handleSessionsChanged(payload json.RawMessage) { + c.logger.Debug("handleSessionsChanged start", "payload", string(payload)) + + // Try array first, then single object + var deltas []sessionChangedPayload + if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 { + // Array of deltas + } else { + // Try single object + var single sessionChangedPayload + if err := json.Unmarshal(payload, &single); err != nil { + c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err) + return + } + deltas = []sessionChangedPayload{single} + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for _, d := range deltas { + if d.AgentID == "" { + c.logger.Debug("sessions.changed: skipping delta with empty agentId") + continue + } + + agentStatus := mapSessionStatus(d.Status) + + // Build partial update + update := models.UpdateAgentRequest{ + Status: &agentStatus, + } + + // Session key + if d.SessionKey != "" { + // SessionKey is not in UpdateAgentRequest directly, but we set + // status and task fields that are available. + } + + // Current task + if d.CurrentTask != "" { + update.CurrentTask = &d.CurrentTask + } + + // Task progress + if d.TaskProgress != nil { + update.TaskProgress = d.TaskProgress + } else if d.TotalTokens > 0 { + // Derive progress from token count as fallback + prog := min(d.TotalTokens/100, 100) + update.TaskProgress = &prog + } + + // Task elapsed + if d.TaskElapsed != "" { + update.TaskElapsed = &d.TaskElapsed + } + + // Error message + if d.ErrorMessage != "" { + update.ErrorMessage = &d.ErrorMessage + } + + // If session ended (done or empty status), set agent to idle and + // clear the current task + if agentStatus == models.AgentStatusIdle { + emptyTask := "" + update.CurrentTask = &emptyTask + zeroProg := 0 + update.TaskProgress = &zeroProg + } + + // Update DB first + updated, err := c.agents.Update(ctx, d.AgentID, update) + if err != nil { + c.logger.Warn("sessions.changed: DB update failed", + "agentId", d.AgentID, "error", err) + continue + } + + // Then broadcast + c.broker.Broadcast("agent.status", updated) + if d.TaskProgress != nil || d.CurrentTask != "" { + c.broker.Broadcast("agent.progress", updated) + } + + c.logger.Debug("sessions.changed: agent updated", + "agentId", d.AgentID, + "status", string(agentStatus)) + } + + c.logger.Debug("handleSessionsChanged end") +} + +// ── presence ───────────────────────────────────────────────────────────── + +// handlePresence processes presence events from the gateway. Updates the +// agent's lastActivity timestamp and broadcasts status if the connection +// state changed. +func (c *WSClient) handlePresence(payload json.RawMessage) { + c.logger.Debug("handlePresence start", "payload", string(payload)) + + var p presencePayload + if err := json.Unmarshal(payload, &p); err != nil { + c.logger.Warn("presence: unparseable payload, skipping", "error", err) + return + } + + if p.AgentID == "" { + c.logger.Debug("presence: skipping event with empty agentId") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // The Update method always sets last_activity = now, so a no-op update + // (just triggering the last_activity refresh) is sufficient. We send + // an empty-ish update — the repo always bumps last_activity. + // If connection state is reported, also update status. + update := models.UpdateAgentRequest{} + + if p.Connected != nil && !*p.Connected { + // Device disconnected — set agent to idle + idle := models.AgentStatusIdle + update.Status = &idle + } + + // Update DB first + updated, err := c.agents.Update(ctx, p.AgentID, update) + if err != nil { + c.logger.Warn("presence: DB update failed", + "agentId", p.AgentID, "error", err) + return + } + + // Use reported timestamp if available + if p.LastActivityAt != "" { + updated.LastActivity = p.LastActivityAt + } + + // Then broadcast + c.broker.Broadcast("agent.status", updated) + + c.logger.Debug("presence: agent updated", + "agentId", p.AgentID, + "connected", p.Connected) +} + +// ── agent.config ───────────────────────────────────────────────────────── + +// handleAgentConfig processes agent.config events from the gateway. Updates +// agent metadata (name, channel) in the DB and broadcasts a fleet.update +// with the full fleet snapshot. +func (c *WSClient) handleAgentConfig(payload json.RawMessage) { + c.logger.Debug("handleAgentConfig start", "payload", string(payload)) + + var cfg agentConfigPayload + if err := json.Unmarshal(payload, &cfg); err != nil { + c.logger.Warn("agent.config: unparseable payload, skipping", "error", err) + return + } + + if cfg.ID == "" { + c.logger.Debug("agent.config: skipping event with empty id") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Build partial update with available fields. + // Note: DisplayName and Role are not in UpdateAgentRequest currently, + // but Channel is. We update what we can and note the gap. + update := models.UpdateAgentRequest{} + + if cfg.Channel != "" { + update.Channel = &cfg.Channel + } + + // Update DB first + updated, err := c.agents.Update(ctx, cfg.ID, update) + if err != nil { + c.logger.Warn("agent.config: DB update failed", + "agentId", cfg.ID, "error", err) + return + } + + // Apply display name from config if the repo returned the default + if cfg.Name != "" { + updated.DisplayName = cfg.Name + } + if cfg.Role != "" { + updated.Role = cfg.Role + } + + // Then broadcast fleet snapshot + allAgents, err := c.agents.List(ctx, "") + if err != nil { + c.logger.Warn("agent.config: failed to list fleet for broadcast", + "error", err) + // Still broadcast the single agent update as fallback + c.broker.Broadcast("agent.status", updated) + return + } + + c.broker.Broadcast("fleet.update", allAgents) + + c.logger.Debug("agent.config: fleet updated", + "agentId", cfg.ID, + "name", cfg.Name) +} \ No newline at end of file diff --git a/go-backend/internal/gateway/wsclient.go b/go-backend/internal/gateway/wsclient.go index 1fae44a..195720b 100644 --- a/go-backend/internal/gateway/wsclient.go +++ b/go-backend/internal/gateway/wsclient.go @@ -213,6 +213,9 @@ func (c *WSClient) connectAndRun(ctx context.Context) error { c.logger.Warn("initial sync failed, will continue with read loop", "error", err) } + // Step 2c: Register live event handlers + c.registerEventHandlers() + // Step 3: Read loop return c.readLoop(ctx, conn) }