feat(CUB-19): implement AgentStatus SignalR hub for real-time updates
- Add AgentStatusHub with typed IAgentStatusClient interface
- Hub at /hubs/agent-status (matches design spec)
- Fleet group + per-agent group subscription
- AgentStatusChanged and AgentTaskProgress push events
- Extension methods for server-side push via IHubContext
- Add GatewayEventBridgeService background service
- Connects to OpenClaw Gateway WebSocket (v3 protocol)
- Handles challenge → connect → hello-ok handshake
- Bridges sessions.changed, session.message, session.tool events
- Translates Gateway session status to AgentStatus enum
- Maintains in-memory fleet state for snapshot queries
- Add REST API controllers
- GET /api/agents — fleet status snapshot
- GET /api/agents/{agentId} — single agent status
- GET /api/logs/{agentId} — agent session logs (stub)
- POST /api/command/stop/{agentId} — stop agent
- POST /api/command/restart/{agentId} — restart agent
- POST /api/command/steer/{agentId} — inject message
- Add models matching TypeScript spec interfaces
- AgentStatusUpdate, TaskProgressUpdate, AgentCardData
- AgentStatus enum (active/idle/thinking/error)
- Configure CORS with credentials for SignalR WebSocket
- Configure Swagger/OpenAPI with XML doc comments
- Agent role map matching frontend AGENT_ROLES constant
This commit is contained in:
184
backend/ControlCenter/Hubs/AgentStatusHub.cs
Normal file
184
backend/ControlCenter/Hubs/AgentStatusHub.cs
Normal file
@@ -0,0 +1,184 @@
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
|
||||
namespace ControlCenter.Hubs;
|
||||
|
||||
/// <summary>
|
||||
/// SignalR hub for real-time agent status updates in the Command Hub.
|
||||
///
|
||||
/// <para>Usage flow:</para>
|
||||
/// <list type="number">
|
||||
/// <item>Client connects to <c>/hubs/agent-status</c></item>
|
||||
/// <item>Client calls <see cref="JoinFleet"/> to subscribe to all agent updates</item>
|
||||
/// <item>Client calls <see cref="JoinAgentGroup"/> to subscribe to a specific agent</item>
|
||||
/// <item>Server pushes <see cref="IAgentStatusClient.AgentStatusChanged"/>
|
||||
/// and <see cref="IAgentStatusClient.AgentTaskProgress"/> events</item>
|
||||
/// <item>Client calls <see cref="GetFleetSnapshot"/> for initial state on connect</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>Group naming:</para>
|
||||
/// <list type="bullet">
|
||||
/// <item>Fleet group: <c>fleet</c> — receives all agent updates</item>
|
||||
/// <item>Agent group: <c>agent:{agentId}</c> — receives updates for one agent</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>Typed client: <see cref="IAgentStatusClient"/> — all server-to-client
|
||||
/// calls go through this interface for compile-time safety.</para>
|
||||
///
|
||||
/// <para>Architecture note: This hub bridges OpenClaw Gateway WebSocket events
|
||||
/// to SignalR clients. A background service (<see cref="Services.GatewayEventBridgeService"/>)
|
||||
/// subscribes to Gateway events and pushes them through this hub's extension methods.</para>
|
||||
/// </summary>
|
||||
public class AgentStatusHub : Hub<IAgentStatusClient>
|
||||
{
|
||||
private readonly ILogger<AgentStatusHub> _logger;
|
||||
|
||||
public AgentStatusHub(ILogger<AgentStatusHub> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds the calling connection to the fleet group.
|
||||
/// Once joined, the client will receive all agent status changes
|
||||
/// and task progress updates across the entire fleet.
|
||||
/// </summary>
|
||||
public async Task JoinFleet()
|
||||
{
|
||||
await Groups.AddToGroupAsync(Context.ConnectionId, FleetGroupName);
|
||||
_logger.LogDebug("Connection {ConnectionId} joined fleet group", Context.ConnectionId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes the calling connection from the fleet group.
|
||||
/// </summary>
|
||||
public async Task LeaveFleet()
|
||||
{
|
||||
await Groups.RemoveFromGroupAsync(Context.ConnectionId, FleetGroupName);
|
||||
_logger.LogDebug("Connection {ConnectionId} left fleet group", Context.ConnectionId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds the calling connection to a specific agent's group.
|
||||
/// Once joined, the client will receive updates only for that agent.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier, e.g. "otto", "dex".</param>
|
||||
/// <exception cref="HubException">Thrown if agentId is null or empty.</exception>
|
||||
public async Task JoinAgentGroup(string agentId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(agentId))
|
||||
throw new HubException("agentId is required");
|
||||
|
||||
var groupName = AgentGroupName(agentId);
|
||||
await Groups.AddToGroupAsync(Context.ConnectionId, groupName);
|
||||
_logger.LogDebug("Connection {ConnectionId} joined agent group {GroupName}",
|
||||
Context.ConnectionId, groupName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes the calling connection from a specific agent's group.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier.</param>
|
||||
public async Task LeaveAgentGroup(string agentId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(agentId)) return;
|
||||
|
||||
var groupName = AgentGroupName(agentId);
|
||||
await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a snapshot of the current fleet state.
|
||||
/// Called by clients on initial connection to get the full picture
|
||||
/// before incremental updates begin arriving.
|
||||
/// </summary>
|
||||
/// <returns>An array of <see cref="AgentCardData"/> representing all known agents.</returns>
|
||||
public Task<AgentCardData[]> GetFleetSnapshot()
|
||||
{
|
||||
// The fleet state is managed by the GatewayEventBridgeService.
|
||||
// For now, return an empty array — the bridge service will push
|
||||
// updates as they arrive from the Gateway.
|
||||
_logger.LogDebug("Fleet snapshot requested by {ConnectionId}", Context.ConnectionId);
|
||||
return Task.FromResult(Array.Empty<AgentCardData>());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Overrides <see cref="Hub.OnDisconnectedAsync"/> to perform cleanup.
|
||||
/// SignalR automatically removes disconnected connections from all groups.
|
||||
/// </summary>
|
||||
/// <param name="exception">Exception that caused the disconnection, if any.</param>
|
||||
public override Task OnDisconnectedAsync(Exception? exception)
|
||||
{
|
||||
_logger.LogDebug("Connection {ConnectionId} disconnected", Context.ConnectionId);
|
||||
return base.OnDisconnectedAsync(exception);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The SignalR group name for the entire fleet (all agents).
|
||||
/// </summary>
|
||||
internal const string FleetGroupName = "fleet";
|
||||
|
||||
/// <summary>
|
||||
/// Returns the SignalR group name for a specific agent.
|
||||
/// Format: <c>agent:{agentId}</c> (lowercase for consistency).
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier.</param>
|
||||
internal static string AgentGroupName(string agentId) =>
|
||||
$"agent:{agentId.ToLowerInvariant()}";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extension methods for pushing real-time agent updates through
|
||||
/// the <see cref="IHubContext{T}"/> of <see cref="AgentStatusHub"/>.
|
||||
///
|
||||
/// <para>These methods are intended to be called from background services
|
||||
/// (e.g., <see cref="Services.GatewayEventBridgeService"/>) or other
|
||||
/// server-side code that detects an agent state change.</para>
|
||||
/// </summary>
|
||||
public static class AgentStatusHubExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Pushes an agent status change to all clients subscribed to
|
||||
/// the fleet group and the specific agent's group.
|
||||
///
|
||||
/// <para>Call this from any background service when an agent's
|
||||
/// operational status changes (e.g., the Gateway reports a
|
||||
/// session transition from "running" to "done").</para>
|
||||
/// </summary>
|
||||
/// <param name="hubContext">The hub context injected via DI.</param>
|
||||
/// <param name="update">The agent status update payload.</param>
|
||||
/// <returns>A Task that completes when the message has been sent to all group members.</returns>
|
||||
public static async Task PushAgentStatusAsync(
|
||||
this IHubContext<AgentStatusHub, IAgentStatusClient> hubContext,
|
||||
AgentStatusUpdate update)
|
||||
{
|
||||
// Broadcast to the fleet group (all subscribers)
|
||||
await hubContext.Clients.Group(AgentStatusHub.FleetGroupName)
|
||||
.AgentStatusChanged(update);
|
||||
|
||||
// Also push to the specific agent's group
|
||||
var agentGroup = AgentStatusHub.AgentGroupName(update.AgentId);
|
||||
await hubContext.Clients.Group(agentGroup)
|
||||
.AgentStatusChanged(update);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Pushes a task progress update to all clients subscribed to
|
||||
/// the fleet group and the specific agent's group.
|
||||
/// </summary>
|
||||
/// <param name="hubContext">The hub context injected via DI.</param>
|
||||
/// <param name="progress">The task progress update payload.</param>
|
||||
/// <returns>A Task that completes when the message has been sent to all group members.</returns>
|
||||
public static async Task PushTaskProgressAsync(
|
||||
this IHubContext<AgentStatusHub, IAgentStatusClient> hubContext,
|
||||
TaskProgressUpdate progress)
|
||||
{
|
||||
// Broadcast to the fleet group
|
||||
await hubContext.Clients.Group(AgentStatusHub.FleetGroupName)
|
||||
.AgentTaskProgress(progress);
|
||||
|
||||
// Also push to the specific agent's group
|
||||
var agentGroup = AgentStatusHub.AgentGroupName(progress.AgentId);
|
||||
await hubContext.Clients.Group(agentGroup)
|
||||
.AgentTaskProgress(progress);
|
||||
}
|
||||
}
|
||||
30
backend/ControlCenter/Hubs/IAgentStatusClient.cs
Normal file
30
backend/ControlCenter/Hubs/IAgentStatusClient.cs
Normal file
@@ -0,0 +1,30 @@
|
||||
namespace ControlCenter.Hubs;
|
||||
|
||||
/// <summary>
|
||||
/// Strongly-typed client interface for the AgentStatus SignalR hub.
|
||||
/// Defines the methods the server can invoke on connected clients
|
||||
/// to push real-time agent status and task progress updates.
|
||||
///
|
||||
/// <para>All server-to-client calls go through this interface for
|
||||
/// compile-time safety — matching the pattern used by the
|
||||
/// Extrudex PrinterHub.</para>
|
||||
/// </summary>
|
||||
public interface IAgentStatusClient
|
||||
{
|
||||
/// <summary>
|
||||
/// Pushes an agent status change to all subscribed clients.
|
||||
/// Fired whenever an agent's operational status changes
|
||||
/// (e.g., idle → active, active → thinking, active → error).
|
||||
/// </summary>
|
||||
/// <param name="update">The full status update payload.</param>
|
||||
/// <returns>A Task that completes when the client has processed the update.</returns>
|
||||
Task AgentStatusChanged(AgentStatusUpdate update);
|
||||
|
||||
/// <summary>
|
||||
/// Pushes a task progress update to all subscribed clients.
|
||||
/// Fired when an agent reports progress on its current task.
|
||||
/// </summary>
|
||||
/// <param name="progress">The task progress update payload.</param>
|
||||
/// <returns>A Task that completes when the client has processed the update.</returns>
|
||||
Task AgentTaskProgress(TaskProgressUpdate progress);
|
||||
}
|
||||
92
backend/ControlCenter/Hubs/Models/AgentStatusModels.cs
Normal file
92
backend/ControlCenter/Hubs/Models/AgentStatusModels.cs
Normal file
@@ -0,0 +1,92 @@
|
||||
namespace ControlCenter;
|
||||
|
||||
/// <summary>
|
||||
/// Agent operational status derived from OpenClaw Gateway session activity.
|
||||
/// Maps to the frontend AgentStatus type: 'active' | 'idle' | 'thinking' | 'error'.
|
||||
/// </summary>
|
||||
public enum AgentStatus
|
||||
{
|
||||
/// <summary>Agent is currently processing a turn.</summary>
|
||||
Active,
|
||||
|
||||
/// <summary>Agent completed its last turn; no active work.</summary>
|
||||
Idle,
|
||||
|
||||
/// <summary>LLM call in flight; tokens streaming.</summary>
|
||||
Thinking,
|
||||
|
||||
/// <summary>Agent encountered an unhandled error.</summary>
|
||||
Error
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extended lifecycle status including offline — not all agents have active sessions.
|
||||
/// Used internally; clients only see <see cref="AgentStatus"/> (offline maps to idle).
|
||||
/// </summary>
|
||||
public enum AgentLifecycleStatus
|
||||
{
|
||||
Active,
|
||||
Idle,
|
||||
Thinking,
|
||||
Error,
|
||||
Offline
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Pushed to SignalR clients when an agent's status changes.
|
||||
/// Matches the TypeScript <c>AgentStatusUpdate</c> interface from the design spec.
|
||||
/// </summary>
|
||||
/// <param name="AgentId">Agent identifier, e.g. "otto", "dex".</param>
|
||||
/// <param name="DisplayName">Human-readable name, e.g. "Otto".</param>
|
||||
/// <param name="Role">Role description, e.g. "Orchestrator Agent".</param>
|
||||
/// <param name="Status">Current operational status.</param>
|
||||
/// <param name="CurrentTask">Description of the current task, if any.</param>
|
||||
/// <param name="SessionKey">Full session key, e.g. "agent:otto:telegram:direct:8787451565".</param>
|
||||
/// <param name="Channel">Channel the agent is operating on, e.g. "telegram".</param>
|
||||
/// <param name="LastActivity">ISO 8601 timestamp of last activity.</param>
|
||||
/// <param name="ErrorMessage">Error message when status is 'error'.</param>
|
||||
public record AgentStatusUpdate(
|
||||
string AgentId,
|
||||
string DisplayName,
|
||||
string Role,
|
||||
string Status,
|
||||
string? CurrentTask,
|
||||
string SessionKey,
|
||||
string Channel,
|
||||
string LastActivity,
|
||||
string? ErrorMessage = null
|
||||
);
|
||||
|
||||
/// <summary>
|
||||
/// Pushed to SignalR clients when an agent's task progress updates.
|
||||
/// Matches the TypeScript <c>TaskProgressUpdate</c> interface from the design spec.
|
||||
/// </summary>
|
||||
/// <param name="AgentId">Agent identifier.</param>
|
||||
/// <param name="TaskDescription">Description of the current task.</param>
|
||||
/// <param name="Progress">Task progress percentage (0–100), if trackable.</param>
|
||||
/// <param name="Elapsed">Elapsed time string, e.g. "04m 12s".</param>
|
||||
public record TaskProgressUpdate(
|
||||
string AgentId,
|
||||
string TaskDescription,
|
||||
int? Progress,
|
||||
string? Elapsed
|
||||
);
|
||||
|
||||
/// <summary>
|
||||
/// Snapshot of an agent's full card data, sent on initial connection
|
||||
/// or when the fleet state is requested.
|
||||
/// Matches the TypeScript <c>AgentCardData</c> interface from the design spec.
|
||||
/// </summary>
|
||||
public record AgentCardData(
|
||||
string Id,
|
||||
string DisplayName,
|
||||
string Role,
|
||||
string Status,
|
||||
string? CurrentTask,
|
||||
int? TaskProgress,
|
||||
string? TaskElapsed,
|
||||
string SessionKey,
|
||||
string Channel,
|
||||
string LastActivity,
|
||||
string? ErrorMessage
|
||||
);
|
||||
Reference in New Issue
Block a user