// Package gateway provides real-time event handlers for the Control Center // WebSocket client. Handlers process gateway events (sessions.changed, // presence, agent.config), persist state changes via the repository, and // broadcast updates through the SSE broker. // // Rule: DB update first, then SSE broadcast. This keeps REST API responses // consistent with SSE events. package gateway import ( "context" "encoding/json" "time" "code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models" ) // ── Event payload types ────────────────────────────────────────────────── // sessionChangedPayload represents a single session delta from a // sessions.changed event. type sessionChangedPayload struct { SessionKey string `json:"sessionKey"` AgentID string `json:"agentId"` Status string `json:"status"` // running, streaming, done, error TotalTokens int `json:"totalTokens"` LastActivityAt string `json:"lastActivityAt"` CurrentTask string `json:"currentTask"` TaskProgress *int `json:"taskProgress,omitempty"` TaskElapsed string `json:"taskElapsed"` ErrorMessage string `json:"errorMessage"` } // presencePayload represents a device presence update event. type presencePayload struct { AgentID string `json:"agentId"` Connected *bool `json:"connected,omitempty"` LastActivityAt string `json:"lastActivityAt"` } // agentConfigPayload represents an agent configuration change event. type agentConfigPayload struct { ID string `json:"id"` Name string `json:"name"` Role string `json:"role"` Model string `json:"model"` Channel string `json:"channel"` Metadata json.RawMessage `json:"metadata"` } // ── Handler registration ───────────────────────────────────────────────── // registerEventHandlers sets up all live event handlers on the WSClient. // Call this once after a successful handshake + initial sync. func (c *WSClient) registerEventHandlers() { if c.agents == nil || c.broker == nil { c.logger.Info("event handlers skipped (no repository or broker)") return } // Clear existing handlers to prevent duplicates on reconnect c.mu.Lock() c.handlers = make(map[string][]eventHandler) c.mu.Unlock() c.OnEvent("sessions.changed", c.handleSessionsChanged) c.OnEvent("presence", c.handlePresence) c.OnEvent("agent.config", c.handleAgentConfig) c.logger.Info("event handlers registered", "events", []string{"sessions.changed", "presence", "agent.config"}) } // ── sessions.changed ───────────────────────────────────────────────────── // handleSessionsChanged processes sessions.changed events from the gateway. // The payload may be a single session object or an array of session deltas. // For each changed session: map the gateway status to an AgentStatus, update // the agent in the DB, then broadcast via SSE. func (c *WSClient) handleSessionsChanged(payload json.RawMessage) { c.logger.Debug("handleSessionsChanged start", "payload", string(payload)) // Try array first, then single object var deltas []sessionChangedPayload if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 { // Array of deltas } else { // Try single object var single sessionChangedPayload if err := json.Unmarshal(payload, &single); err != nil { c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err) return } deltas = []sessionChangedPayload{single} } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() for _, d := range deltas { if d.AgentID == "" { c.logger.Debug("sessions.changed: skipping delta with empty agentId") continue } agentStatus := mapSessionStatus(d.Status) // Build partial update update := models.UpdateAgentRequest{ Status: &agentStatus, } // Session key if d.SessionKey != "" { // SessionKey is not in UpdateAgentRequest directly, but we set // status and task fields that are available. } // Current task if d.CurrentTask != "" { update.CurrentTask = &d.CurrentTask } // Task progress if d.TaskProgress != nil { update.TaskProgress = d.TaskProgress } else if d.TotalTokens > 0 { // Derive progress from token count as fallback prog := min(d.TotalTokens/100, 100) update.TaskProgress = &prog } // Task elapsed if d.TaskElapsed != "" { update.TaskElapsed = &d.TaskElapsed } // Error message if d.ErrorMessage != "" { update.ErrorMessage = &d.ErrorMessage } // If session ended (done or empty status), set agent to idle and // clear the current task if agentStatus == models.AgentStatusIdle { emptyTask := "" update.CurrentTask = &emptyTask zeroProg := 0 update.TaskProgress = &zeroProg } // Update DB first updated, err := c.agents.Update(ctx, d.AgentID, update) if err != nil { c.logger.Warn("sessions.changed: DB update failed", "agentId", d.AgentID, "error", err) continue } // Then broadcast c.broker.Broadcast("agent.status", updated) if d.TaskProgress != nil || d.CurrentTask != "" { c.broker.Broadcast("agent.progress", updated) } c.logger.Debug("sessions.changed: agent updated", "agentId", d.AgentID, "status", string(agentStatus)) } c.logger.Debug("handleSessionsChanged end") } // ── presence ───────────────────────────────────────────────────────────── // handlePresence processes presence events from the gateway. Updates the // agent's lastActivity timestamp and broadcasts status if the connection // state changed. func (c *WSClient) handlePresence(payload json.RawMessage) { c.logger.Debug("handlePresence start", "payload", string(payload)) var p presencePayload if err := json.Unmarshal(payload, &p); err != nil { c.logger.Warn("presence: unparseable payload, skipping", "error", err) return } if p.AgentID == "" { c.logger.Debug("presence: skipping event with empty agentId") return } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // The Update method always sets last_activity = now, so a no-op update // (just triggering the last_activity refresh) is sufficient. We send // an empty-ish update — the repo always bumps last_activity. // If connection state is reported, also update status. update := models.UpdateAgentRequest{} if p.Connected != nil && !*p.Connected { // Device disconnected — set agent to idle idle := models.AgentStatusIdle update.Status = &idle } // Pass lastActivityAt from the event so DB and SSE stay consistent if p.LastActivityAt != "" { update.LastActivityAt = &p.LastActivityAt } // Update DB first updated, err := c.agents.Update(ctx, p.AgentID, update) if err != nil { c.logger.Warn("presence: DB update failed", "agentId", p.AgentID, "error", err) return } // Then broadcast c.broker.Broadcast("agent.status", updated) c.logger.Debug("presence: agent updated", "agentId", p.AgentID, "connected", p.Connected) } // ── agent.config ───────────────────────────────────────────────────────── // handleAgentConfig processes agent.config events from the gateway. Updates // agent metadata (name, channel) in the DB and broadcasts a fleet.update // with the full fleet snapshot. func (c *WSClient) handleAgentConfig(payload json.RawMessage) { c.logger.Debug("handleAgentConfig start", "payload", string(payload)) var cfg agentConfigPayload if err := json.Unmarshal(payload, &cfg); err != nil { c.logger.Warn("agent.config: unparseable payload, skipping", "error", err) return } if cfg.ID == "" { c.logger.Debug("agent.config: skipping event with empty id") return } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Build partial update with available fields. update := models.UpdateAgentRequest{} if cfg.Name != "" { update.DisplayName = &cfg.Name } if cfg.Role != "" { update.Role = &cfg.Role } if cfg.Channel != "" { update.Channel = &cfg.Channel } // Update DB first updated, err := c.agents.Update(ctx, cfg.ID, update) if err != nil { c.logger.Warn("agent.config: DB update failed", "agentId", cfg.ID, "error", err) return } // Then broadcast fleet snapshot allAgents, err := c.agents.List(ctx, "") if err != nil { c.logger.Warn("agent.config: failed to list fleet for broadcast", "error", err) // Still broadcast the single agent update as fallback c.broker.Broadcast("agent.status", updated) return } c.broker.Broadcast("fleet.update", allAgents) c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name) }