243 lines
7.7 KiB
Go
243 lines
7.7 KiB
Go
|
|
// Package gateway provides real-time event handlers for the Control Center
|
||
|
|
// WebSocket client. Handlers process gateway events (sessions.changed,
|
||
|
|
// presence, agent.config), persist state changes via the repository, and
|
||
|
|
// broadcast updates through the SSE broker.
|
||
|
|
//
|
||
|
|
// Rule: DB update first, then SSE broadcast. This keeps REST API responses
|
||
|
|
// consistent with SSE events.
|
||
|
|
package gateway
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||
|
|
)
|
||
|
|
|
||
|
|
// ── Event payload types ──────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// sessionChangedPayload represents a single session delta from a
|
||
|
|
// sessions.changed event.
|
||
|
|
type sessionChangedPayload struct {
|
||
|
|
SessionKey string `json:"sessionKey"`
|
||
|
|
AgentID string `json:"agentId"`
|
||
|
|
Status string `json:"status"` // running, streaming, done, error
|
||
|
|
TotalTokens int `json:"totalTokens"`
|
||
|
|
LastActivityAt string `json:"lastActivityAt"`
|
||
|
|
CurrentTask string `json:"currentTask"`
|
||
|
|
TaskProgress *int `json:"taskProgress,omitempty"`
|
||
|
|
TaskElapsed string `json:"taskElapsed"`
|
||
|
|
ErrorMessage string `json:"errorMessage"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// presencePayload represents a device presence update event.
|
||
|
|
type presencePayload struct {
|
||
|
|
AgentID string `json:"agentId"`
|
||
|
|
Connected *bool `json:"connected,omitempty"`
|
||
|
|
LastActivityAt string `json:"lastActivityAt"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// agentConfigPayload represents an agent configuration change event.
|
||
|
|
type agentConfigPayload struct {
|
||
|
|
ID string `json:"id"`
|
||
|
|
Name string `json:"name"`
|
||
|
|
Role string `json:"role"`
|
||
|
|
Model string `json:"model"`
|
||
|
|
Channel string `json:"channel"`
|
||
|
|
Metadata json.RawMessage `json:"metadata"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Handler registration ─────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// registerEventHandlers sets up all live event handlers on the WSClient.
|
||
|
|
// Called once after a successful handshake + initial sync.
|
||
|
|
func (c *WSClient) registerEventHandlers() {
|
||
|
|
c.OnEvent("sessions.changed", c.handleSessionsChanged)
|
||
|
|
c.OnEvent("presence", c.handlePresence)
|
||
|
|
c.OnEvent("agent.config", c.handleAgentConfig)
|
||
|
|
|
||
|
|
c.logger.Info("event handlers registered",
|
||
|
|
"events", []string{"sessions.changed", "presence", "agent.config"})
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── sessions.changed ─────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// handleSessionsChanged processes sessions.changed events from the gateway.
|
||
|
|
// The payload may be a single session object or an array of session deltas.
|
||
|
|
// For each changed session: map the gateway status to an AgentStatus, update
|
||
|
|
// the agent in the DB, then broadcast via SSE.
|
||
|
|
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
|
||
|
|
c.logger.Debug("handleSessionsChanged", "payload", string(payload))
|
||
|
|
|
||
|
|
// Try array first, then single object
|
||
|
|
var deltas []sessionChangedPayload
|
||
|
|
if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 {
|
||
|
|
var single sessionChangedPayload
|
||
|
|
if err := json.Unmarshal(payload, &single); err != nil {
|
||
|
|
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
deltas = []sessionChangedPayload{single}
|
||
|
|
}
|
||
|
|
|
||
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||
|
|
defer cancel()
|
||
|
|
|
||
|
|
for _, d := range deltas {
|
||
|
|
if d.AgentID == "" {
|
||
|
|
c.logger.Debug("sessions.changed: skipping delta with empty agentId")
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
agentStatus := mapSessionStatus(d.Status)
|
||
|
|
|
||
|
|
update := models.UpdateAgentRequest{
|
||
|
|
Status: &agentStatus,
|
||
|
|
}
|
||
|
|
|
||
|
|
if d.CurrentTask != "" {
|
||
|
|
update.CurrentTask = &d.CurrentTask
|
||
|
|
}
|
||
|
|
|
||
|
|
if d.TaskProgress != nil {
|
||
|
|
update.TaskProgress = d.TaskProgress
|
||
|
|
}
|
||
|
|
|
||
|
|
if d.TaskElapsed != "" {
|
||
|
|
update.TaskElapsed = &d.TaskElapsed
|
||
|
|
}
|
||
|
|
|
||
|
|
if d.ErrorMessage != "" {
|
||
|
|
update.ErrorMessage = &d.ErrorMessage
|
||
|
|
}
|
||
|
|
|
||
|
|
// If session ended, clear task and progress
|
||
|
|
if agentStatus == models.AgentStatusIdle {
|
||
|
|
emptyTask := ""
|
||
|
|
update.CurrentTask = &emptyTask
|
||
|
|
zeroProg := 0
|
||
|
|
update.TaskProgress = &zeroProg
|
||
|
|
}
|
||
|
|
|
||
|
|
// DB update first
|
||
|
|
updated, err := c.agents.Update(ctx, d.AgentID, update)
|
||
|
|
if err != nil {
|
||
|
|
c.logger.Warn("sessions.changed: DB update failed",
|
||
|
|
"agentId", d.AgentID, "error", err)
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
// Then SSE broadcast
|
||
|
|
c.broker.Broadcast("agent.status", updated)
|
||
|
|
if d.TaskProgress != nil || d.CurrentTask != "" {
|
||
|
|
c.broker.Broadcast("agent.progress", updated)
|
||
|
|
}
|
||
|
|
|
||
|
|
c.logger.Debug("sessions.changed: agent updated",
|
||
|
|
"agentId", d.AgentID, "status", string(agentStatus))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── presence ─────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// handlePresence processes presence events from the gateway. Updates the
|
||
|
|
// agent's lastActivity and broadcasts status if the connection state changed.
|
||
|
|
func (c *WSClient) handlePresence(payload json.RawMessage) {
|
||
|
|
c.logger.Debug("handlePresence", "payload", string(payload))
|
||
|
|
|
||
|
|
var p presencePayload
|
||
|
|
if err := json.Unmarshal(payload, &p); err != nil {
|
||
|
|
c.logger.Warn("presence: unparseable payload, skipping", "error", err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
if p.AgentID == "" {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||
|
|
defer cancel()
|
||
|
|
|
||
|
|
update := models.UpdateAgentRequest{}
|
||
|
|
|
||
|
|
// If device disconnected, set agent to idle
|
||
|
|
if p.Connected != nil && !*p.Connected {
|
||
|
|
idle := models.AgentStatusIdle
|
||
|
|
update.Status = &idle
|
||
|
|
}
|
||
|
|
|
||
|
|
// DB update first (Update always bumps last_activity)
|
||
|
|
updated, err := c.agents.Update(ctx, p.AgentID, update)
|
||
|
|
if err != nil {
|
||
|
|
c.logger.Warn("presence: DB update failed",
|
||
|
|
"agentId", p.AgentID, "error", err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
if p.LastActivityAt != "" {
|
||
|
|
updated.LastActivity = p.LastActivityAt
|
||
|
|
}
|
||
|
|
|
||
|
|
// Then SSE broadcast
|
||
|
|
c.broker.Broadcast("agent.status", updated)
|
||
|
|
|
||
|
|
c.logger.Debug("presence: agent updated",
|
||
|
|
"agentId", p.AgentID, "connected", p.Connected)
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── agent.config ─────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// handleAgentConfig processes agent.config events from the gateway. Updates
|
||
|
|
// agent metadata (channel) in the DB and broadcasts a fleet.update with the
|
||
|
|
// full fleet snapshot.
|
||
|
|
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
|
||
|
|
c.logger.Debug("handleAgentConfig", "payload", string(payload))
|
||
|
|
|
||
|
|
var cfg agentConfigPayload
|
||
|
|
if err := json.Unmarshal(payload, &cfg); err != nil {
|
||
|
|
c.logger.Warn("agent.config: unparseable payload, skipping", "error", err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
if cfg.ID == "" {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||
|
|
defer cancel()
|
||
|
|
|
||
|
|
update := models.UpdateAgentRequest{}
|
||
|
|
|
||
|
|
if cfg.Channel != "" {
|
||
|
|
update.Channel = &cfg.Channel
|
||
|
|
}
|
||
|
|
|
||
|
|
// DB update first
|
||
|
|
updated, err := c.agents.Update(ctx, cfg.ID, update)
|
||
|
|
if err != nil {
|
||
|
|
c.logger.Warn("agent.config: DB update failed",
|
||
|
|
"agentId", cfg.ID, "error", err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Apply display name/role from config event
|
||
|
|
if cfg.Name != "" {
|
||
|
|
updated.DisplayName = cfg.Name
|
||
|
|
}
|
||
|
|
if cfg.Role != "" {
|
||
|
|
updated.Role = cfg.Role
|
||
|
|
}
|
||
|
|
|
||
|
|
// Broadcast full fleet snapshot so frontend gets updated agent info
|
||
|
|
allAgents, err := c.agents.List(ctx, "")
|
||
|
|
if err != nil {
|
||
|
|
c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err)
|
||
|
|
c.broker.Broadcast("agent.status", updated)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
c.broker.Broadcast("fleet.update", allAgents)
|
||
|
|
|
||
|
|
c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name)
|
||
|
|
}
|