using Microsoft.AspNetCore.SignalR; namespace ControlCenter.Hubs; /// /// SignalR hub for real-time agent status updates in the Command Hub. /// /// Usage flow: /// /// Client connects to /hubs/agent-status /// Client calls to subscribe to all agent updates /// Client calls to subscribe to a specific agent /// Server pushes /// and events /// Client calls for initial state on connect /// /// /// Group naming: /// /// Fleet group: fleet — receives all agent updates /// Agent group: agent:{agentId} — receives updates for one agent /// /// /// Typed client: — all server-to-client /// calls go through this interface for compile-time safety. /// /// Architecture note: This hub bridges OpenClaw Gateway WebSocket events /// to SignalR clients. A background service () /// subscribes to Gateway events and pushes them through this hub's extension methods. /// public class AgentStatusHub : Hub { private readonly ILogger _logger; public AgentStatusHub(ILogger logger) { _logger = logger; } /// /// Adds the calling connection to the fleet group. /// Once joined, the client will receive all agent status changes /// and task progress updates across the entire fleet. /// public async Task JoinFleet() { await Groups.AddToGroupAsync(Context.ConnectionId, FleetGroupName); _logger.LogDebug("Connection {ConnectionId} joined fleet group", Context.ConnectionId); } /// /// Removes the calling connection from the fleet group. /// public async Task LeaveFleet() { await Groups.RemoveFromGroupAsync(Context.ConnectionId, FleetGroupName); _logger.LogDebug("Connection {ConnectionId} left fleet group", Context.ConnectionId); } /// /// Adds the calling connection to a specific agent's group. /// Once joined, the client will receive updates only for that agent. /// /// The agent identifier, e.g. "otto", "dex". /// Thrown if agentId is null or empty. public async Task JoinAgentGroup(string agentId) { if (string.IsNullOrWhiteSpace(agentId)) throw new HubException("agentId is required"); var groupName = AgentGroupName(agentId); await Groups.AddToGroupAsync(Context.ConnectionId, groupName); _logger.LogDebug("Connection {ConnectionId} joined agent group {GroupName}", Context.ConnectionId, groupName); } /// /// Removes the calling connection from a specific agent's group. /// /// The agent identifier. public async Task LeaveAgentGroup(string agentId) { if (string.IsNullOrWhiteSpace(agentId)) return; var groupName = AgentGroupName(agentId); await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName); } /// /// Returns a snapshot of the current fleet state. /// Called by clients on initial connection to get the full picture /// before incremental updates begin arriving. /// /// An array of representing all known agents. public Task GetFleetSnapshot() { // The fleet state is managed by the GatewayEventBridgeService. // For now, return an empty array — the bridge service will push // updates as they arrive from the Gateway. _logger.LogDebug("Fleet snapshot requested by {ConnectionId}", Context.ConnectionId); return Task.FromResult(Array.Empty()); } /// /// Overrides to perform cleanup. /// SignalR automatically removes disconnected connections from all groups. /// /// Exception that caused the disconnection, if any. public override Task OnDisconnectedAsync(Exception? exception) { _logger.LogDebug("Connection {ConnectionId} disconnected", Context.ConnectionId); return base.OnDisconnectedAsync(exception); } /// /// The SignalR group name for the entire fleet (all agents). /// internal const string FleetGroupName = "fleet"; /// /// Returns the SignalR group name for a specific agent. /// Format: agent:{agentId} (lowercase for consistency). /// /// The agent identifier. internal static string AgentGroupName(string agentId) => $"agent:{agentId.ToLowerInvariant()}"; } /// /// Extension methods for pushing real-time agent updates through /// the of . /// /// These methods are intended to be called from background services /// (e.g., ) or other /// server-side code that detects an agent state change. /// public static class AgentStatusHubExtensions { /// /// Pushes an agent status change to all clients subscribed to /// the fleet group and the specific agent's group. /// /// Call this from any background service when an agent's /// operational status changes (e.g., the Gateway reports a /// session transition from "running" to "done"). /// /// The hub context injected via DI. /// The agent status update payload. /// A Task that completes when the message has been sent to all group members. public static async Task PushAgentStatusAsync( this IHubContext hubContext, AgentStatusUpdate update) { // Broadcast to the fleet group (all subscribers) await hubContext.Clients.Group(AgentStatusHub.FleetGroupName) .AgentStatusChanged(update); // Also push to the specific agent's group var agentGroup = AgentStatusHub.AgentGroupName(update.AgentId); await hubContext.Clients.Group(agentGroup) .AgentStatusChanged(update); } /// /// Pushes a task progress update to all clients subscribed to /// the fleet group and the specific agent's group. /// /// The hub context injected via DI. /// The task progress update payload. /// A Task that completes when the message has been sent to all group members. public static async Task PushTaskProgressAsync( this IHubContext hubContext, TaskProgressUpdate progress) { // Broadcast to the fleet group await hubContext.Clients.Group(AgentStatusHub.FleetGroupName) .AgentTaskProgress(progress); // Also push to the specific agent's group var agentGroup = AgentStatusHub.AgentGroupName(progress.AgentId); await hubContext.Clients.Group(agentGroup) .AgentTaskProgress(progress); } }