feat(CUB-19): Implement AgentStatus SignalR Hub for Real-time Updates #1
18
backend/ControlCenter/.gitignore
vendored
Normal file
18
backend/ControlCenter/.gitignore
vendored
Normal file
@@ -0,0 +1,18 @@
|
||||
## .NET
|
||||
bin/
|
||||
obj/
|
||||
*.user
|
||||
*.suo
|
||||
*.cache
|
||||
*.dll
|
||||
*.pdb
|
||||
*.xml
|
||||
|
||||
## IDE
|
||||
.vs/
|
||||
.vscode/
|
||||
.idea/
|
||||
|
||||
## OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
17
backend/ControlCenter/ControlCenter.csproj
Normal file
17
backend/ControlCenter/ControlCenter.csproj
Normal file
@@ -0,0 +1,17 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk.Web">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net9.0</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<GenerateDocumentationFile>true</GenerateDocumentationFile>
|
||||
<NoWarn>CS1591</NoWarn>
|
||||
<RootNamespace>ControlCenter</RootNamespace>
|
||||
<AssemblyName>ControlCenter</AssemblyName>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Swashbuckle.AspNetCore" Version="10.1.7" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
71
backend/ControlCenter/Controllers/AgentsController.cs
Normal file
71
backend/ControlCenter/Controllers/AgentsController.cs
Normal file
@@ -0,0 +1,71 @@
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using ControlCenter.Services;
|
||||
|
||||
namespace ControlCenter.Controllers;
|
||||
|
||||
/// <summary>
|
||||
/// REST API for querying agent fleet status.
|
||||
/// Provides the initial data load for the Command Hub,
|
||||
/// while real-time updates flow through the AgentStatus SignalR hub.
|
||||
///
|
||||
/// <para>API contract for Rex (Frontend):</para>
|
||||
/// <list type="bullet">
|
||||
/// <item><c>GET /api/agents</c> — Returns all known agents with current status</item>
|
||||
/// <item><c>GET /api/agents/{agentId}</c> — Returns a specific agent's status</item>
|
||||
/// </list>
|
||||
/// </summary>
|
||||
[ApiController]
|
||||
[Route("api/[controller]")]
|
||||
public class AgentsController : ControllerBase
|
||||
{
|
||||
private readonly ILogger<AgentsController> _logger;
|
||||
private readonly GatewayEventBridgeService _bridgeService;
|
||||
|
||||
public AgentsController(
|
||||
ILogger<AgentsController> logger,
|
||||
GatewayEventBridgeService bridgeService)
|
||||
{
|
||||
_logger = logger;
|
||||
_bridgeService = bridgeService;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the current fleet status — all known agents with their latest state.
|
||||
/// This is the initial load endpoint; subsequent updates arrive via SignalR.
|
||||
/// </summary>
|
||||
/// <returns>An array of agent card data for the entire fleet.</returns>
|
||||
/// <response code="200">Returns the fleet snapshot.</response>
|
||||
[HttpGet]
|
||||
[ProducesResponseType(typeof(AgentCardData[]), StatusCodes.Status200OK)]
|
||||
public IActionResult GetAgents()
|
||||
{
|
||||
var snapshot = _bridgeService.GetFleetSnapshot();
|
||||
_logger.LogDebug("Fleet snapshot requested: {Count} agents", snapshot.Length);
|
||||
return Ok(snapshot);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the current status of a specific agent.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier, e.g. "otto", "dex".</param>
|
||||
/// <returns>The agent's current card data.</returns>
|
||||
/// <response code="200">Returns the agent's status.</response>
|
||||
/// <response code="404">Agent not found in the fleet state.</response>
|
||||
[HttpGet("{agentId}")]
|
||||
[ProducesResponseType(typeof(AgentCardData), StatusCodes.Status200OK)]
|
||||
[ProducesResponseType(StatusCodes.Status404NotFound)]
|
||||
public IActionResult GetAgent(string agentId)
|
||||
{
|
||||
var snapshot = _bridgeService.GetFleetSnapshot();
|
||||
var agent = snapshot.FirstOrDefault(a =>
|
||||
a.Id.Equals(agentId, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
if (agent is null)
|
||||
{
|
||||
_logger.LogWarning("Agent not found: {AgentId}", agentId);
|
||||
return NotFound(new { error = $"Agent '{agentId}' not found" });
|
||||
}
|
||||
|
||||
return Ok(agent);
|
||||
}
|
||||
}
|
||||
122
backend/ControlCenter/Controllers/CommandController.cs
Normal file
122
backend/ControlCenter/Controllers/CommandController.cs
Normal file
@@ -0,0 +1,122 @@
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
|
||||
namespace ControlCenter.Controllers;
|
||||
|
||||
/// <summary>
|
||||
/// REST API for sending control commands to agents.
|
||||
/// Provides the Command Hub's action endpoints for agent lifecycle control.
|
||||
///
|
||||
/// <para>API contract for Rex (Frontend):</para>
|
||||
/// <list type="bullet">
|
||||
/// <item><c>POST /api/command/stop/{agentId}</c> — Stop/abort an agent's active session</item>
|
||||
/// <item><c>POST /api/command/restart/{agentId}</c> — Restart an agent</item>
|
||||
/// <item><c>POST /api/command/steer/{agentId}</c> — Inject a message into an agent's session</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>Commands are forwarded to the OpenClaw Gateway via the
|
||||
/// WebSocket bridge service. The Gateway handles the actual execution.</para>
|
||||
/// </summary>
|
||||
[ApiController]
|
||||
[Route("api/[controller]")]
|
||||
public class CommandController : ControllerBase
|
||||
{
|
||||
private readonly ILogger<CommandController> _logger;
|
||||
|
||||
public CommandController(ILogger<CommandController> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stops (aborts) an agent's active session.
|
||||
/// Sends an abort command to the OpenClaw Gateway.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier to stop.</param>
|
||||
/// <returns>Confirmation of the stop command.</returns>
|
||||
/// <response code="200">Stop command sent successfully.</response>
|
||||
/// <response code="404">No active session found for the agent.</response>
|
||||
[HttpPost("stop/{agentId}")]
|
||||
[ProducesResponseType(StatusCodes.Status200OK)]
|
||||
[ProducesResponseType(StatusCodes.Status404NotFound)]
|
||||
public IActionResult StopAgent(string agentId)
|
||||
{
|
||||
_logger.LogInformation("Stop command received for agent {AgentId}", agentId);
|
||||
|
||||
// TODO: Forward to Gateway via bridge service
|
||||
// await _bridgeService.SendRpcAsync("sessions.abort", new { sessionKey = ... });
|
||||
|
||||
return Ok(new
|
||||
{
|
||||
agentId,
|
||||
command = "stop",
|
||||
status = "sent",
|
||||
timestamp = DateTime.UtcNow.ToString("o")
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Restarts an agent by aborting the current session and allowing
|
||||
/// a new one to start on the next incoming message.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier to restart.</param>
|
||||
/// <returns>Confirmation of the restart command.</returns>
|
||||
/// <response code="200">Restart command sent successfully.</response>
|
||||
[HttpPost("restart/{agentId}")]
|
||||
[ProducesResponseType(StatusCodes.Status200OK)]
|
||||
public IActionResult RestartAgent(string agentId)
|
||||
{
|
||||
_logger.LogInformation("Restart command received for agent {AgentId}", agentId);
|
||||
|
||||
// TODO: Forward to Gateway — abort current session + signal restart
|
||||
|
||||
return Ok(new
|
||||
{
|
||||
agentId,
|
||||
command = "restart",
|
||||
status = "sent",
|
||||
timestamp = DateTime.UtcNow.ToString("o")
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Steers (injects a message into) an agent's active session.
|
||||
/// Used by operators to redirect an agent's task mid-execution.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier to steer.</param>
|
||||
/// <param name="request">The steering message to inject.</param>
|
||||
/// <returns>Confirmation of the steer command.</returns>
|
||||
/// <response code="200">Steer command sent successfully.</response>
|
||||
/// <response code="400">Missing or empty message.</response>
|
||||
[HttpPost("steer/{agentId}")]
|
||||
[ProducesResponseType(StatusCodes.Status200OK)]
|
||||
[ProducesResponseType(StatusCodes.Status400BadRequest)]
|
||||
public IActionResult SteerAgent(string agentId, [FromBody] SteerRequest request)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(request.Message))
|
||||
{
|
||||
return BadRequest(new { error = "Message is required" });
|
||||
}
|
||||
|
||||
_logger.LogInformation("Steer command received for agent {AgentId}: {Message}",
|
||||
agentId, request.Message.Length > 100
|
||||
? request.Message[..100] + "..." : request.Message);
|
||||
|
||||
// TODO: Forward to Gateway via bridge service
|
||||
// await _bridgeService.SendRpcAsync("sessions.steer", new { sessionKey = ..., message = request.Message });
|
||||
|
||||
return Ok(new
|
||||
{
|
||||
agentId,
|
||||
command = "steer",
|
||||
message = request.Message,
|
||||
status = "sent",
|
||||
timestamp = DateTime.UtcNow.ToString("o")
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Request body for the steer command.
|
||||
/// </summary>
|
||||
/// <param name="Message">The message to inject into the agent's session.</param>
|
||||
public record SteerRequest(string Message);
|
||||
87
backend/ControlCenter/Controllers/LogsController.cs
Normal file
87
backend/ControlCenter/Controllers/LogsController.cs
Normal file
@@ -0,0 +1,87 @@
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
|
||||
namespace ControlCenter.Controllers;
|
||||
|
||||
/// <summary>
|
||||
/// REST API for querying agent session logs.
|
||||
/// Provides historical message and tool call logs for a specific agent.
|
||||
///
|
||||
/// <para>API contract for Rex (Frontend):</para>
|
||||
/// <list type="bullet">
|
||||
/// <item><c>GET /api/logs/{agentId}</c> — Returns recent logs for an agent</item>
|
||||
/// <item><c>GET /api/logs/{agentId}/tools</c> — Returns recent tool calls for an agent</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>Log data is sourced from the OpenClaw Gateway's transcript files.
|
||||
/// The Gateway's <c>logs.tail</c> RPC provides the raw data, and this
|
||||
/// controller formats it for the frontend.</para>
|
||||
/// </summary>
|
||||
[ApiController]
|
||||
[Route("api/[controller]")]
|
||||
public class LogsController : ControllerBase
|
||||
{
|
||||
private readonly ILogger<LogsController> _logger;
|
||||
|
||||
public LogsController(ILogger<LogsController> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets recent session logs for a specific agent.
|
||||
/// Returns the last N messages from the agent's active session transcript.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier, e.g. "otto", "dex".</param>
|
||||
/// <param name="limit">Maximum number of log entries to return (default: 50, max: 200).</param>
|
||||
/// <returns>An array of log entries for the agent.</returns>
|
||||
/// <response code="200">Returns the agent's recent logs.</response>
|
||||
/// <response code="404">No active session found for the agent.</response>
|
||||
[HttpGet("{agentId}")]
|
||||
[ProducesResponseType(StatusCodes.Status200OK)]
|
||||
[ProducesResponseType(StatusCodes.Status404NotFound)]
|
||||
public IActionResult GetLogs(string agentId, [FromQuery] int limit = 50)
|
||||
{
|
||||
limit = Math.Clamp(limit, 1, 200);
|
||||
|
||||
_logger.LogDebug("Logs requested for agent {AgentId}, limit {Limit}", agentId, limit);
|
||||
|
||||
// TODO: Implement log retrieval by calling the Gateway's logs.tail RPC
|
||||
// or reading transcript files. For now, return an empty array as the
|
||||
// bridge service will provide this data when fully integrated.
|
||||
return Ok(new
|
||||
{
|
||||
agentId,
|
||||
logs = Array.Empty<object>(),
|
||||
count = 0,
|
||||
hasMore = false
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets recent tool call logs for a specific agent.
|
||||
/// Returns the last N tool invocations from the agent's session.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier.</param>
|
||||
/// <param name="limit">Maximum number of tool entries to return (default: 20, max: 100).</param>
|
||||
/// <returns>An array of tool call entries for the agent.</returns>
|
||||
/// <response code="200">Returns the agent's recent tool calls.</response>
|
||||
/// <response code="404">No active session found for the agent.</response>
|
||||
[HttpGet("{agentId}/tools")]
|
||||
[ProducesResponseType(StatusCodes.Status200OK)]
|
||||
[ProducesResponseType(StatusCodes.Status404NotFound)]
|
||||
public IActionResult GetToolLogs(string agentId, [FromQuery] int limit = 20)
|
||||
{
|
||||
limit = Math.Clamp(limit, 1, 100);
|
||||
|
||||
_logger.LogDebug("Tool logs requested for agent {AgentId}, limit {Limit}", agentId, limit);
|
||||
|
||||
// TODO: Implement tool log retrieval. Return empty for now.
|
||||
return Ok(new
|
||||
{
|
||||
agentId,
|
||||
tools = Array.Empty<object>(),
|
||||
count = 0,
|
||||
hasMore = false
|
||||
});
|
||||
}
|
||||
}
|
||||
184
backend/ControlCenter/Hubs/AgentStatusHub.cs
Normal file
184
backend/ControlCenter/Hubs/AgentStatusHub.cs
Normal file
@@ -0,0 +1,184 @@
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
|
||||
namespace ControlCenter.Hubs;
|
||||
|
||||
/// <summary>
|
||||
/// SignalR hub for real-time agent status updates in the Command Hub.
|
||||
///
|
||||
/// <para>Usage flow:</para>
|
||||
/// <list type="number">
|
||||
/// <item>Client connects to <c>/hubs/agent-status</c></item>
|
||||
/// <item>Client calls <see cref="JoinFleet"/> to subscribe to all agent updates</item>
|
||||
/// <item>Client calls <see cref="JoinAgentGroup"/> to subscribe to a specific agent</item>
|
||||
/// <item>Server pushes <see cref="IAgentStatusClient.AgentStatusChanged"/>
|
||||
/// and <see cref="IAgentStatusClient.AgentTaskProgress"/> events</item>
|
||||
/// <item>Client calls <see cref="GetFleetSnapshot"/> for initial state on connect</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>Group naming:</para>
|
||||
/// <list type="bullet">
|
||||
/// <item>Fleet group: <c>fleet</c> — receives all agent updates</item>
|
||||
/// <item>Agent group: <c>agent:{agentId}</c> — receives updates for one agent</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>Typed client: <see cref="IAgentStatusClient"/> — all server-to-client
|
||||
/// calls go through this interface for compile-time safety.</para>
|
||||
///
|
||||
/// <para>Architecture note: This hub bridges OpenClaw Gateway WebSocket events
|
||||
/// to SignalR clients. A background service (<see cref="Services.GatewayEventBridgeService"/>)
|
||||
/// subscribes to Gateway events and pushes them through this hub's extension methods.</para>
|
||||
/// </summary>
|
||||
public class AgentStatusHub : Hub<IAgentStatusClient>
|
||||
{
|
||||
private readonly ILogger<AgentStatusHub> _logger;
|
||||
|
||||
public AgentStatusHub(ILogger<AgentStatusHub> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds the calling connection to the fleet group.
|
||||
/// Once joined, the client will receive all agent status changes
|
||||
/// and task progress updates across the entire fleet.
|
||||
/// </summary>
|
||||
public async Task JoinFleet()
|
||||
{
|
||||
await Groups.AddToGroupAsync(Context.ConnectionId, FleetGroupName);
|
||||
_logger.LogDebug("Connection {ConnectionId} joined fleet group", Context.ConnectionId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes the calling connection from the fleet group.
|
||||
/// </summary>
|
||||
public async Task LeaveFleet()
|
||||
{
|
||||
await Groups.RemoveFromGroupAsync(Context.ConnectionId, FleetGroupName);
|
||||
_logger.LogDebug("Connection {ConnectionId} left fleet group", Context.ConnectionId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds the calling connection to a specific agent's group.
|
||||
/// Once joined, the client will receive updates only for that agent.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier, e.g. "otto", "dex".</param>
|
||||
/// <exception cref="HubException">Thrown if agentId is null or empty.</exception>
|
||||
public async Task JoinAgentGroup(string agentId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(agentId))
|
||||
throw new HubException("agentId is required");
|
||||
|
||||
var groupName = AgentGroupName(agentId);
|
||||
await Groups.AddToGroupAsync(Context.ConnectionId, groupName);
|
||||
_logger.LogDebug("Connection {ConnectionId} joined agent group {GroupName}",
|
||||
Context.ConnectionId, groupName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes the calling connection from a specific agent's group.
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier.</param>
|
||||
public async Task LeaveAgentGroup(string agentId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(agentId)) return;
|
||||
|
||||
var groupName = AgentGroupName(agentId);
|
||||
await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a snapshot of the current fleet state.
|
||||
/// Called by clients on initial connection to get the full picture
|
||||
/// before incremental updates begin arriving.
|
||||
/// </summary>
|
||||
/// <returns>An array of <see cref="AgentCardData"/> representing all known agents.</returns>
|
||||
public Task<AgentCardData[]> GetFleetSnapshot()
|
||||
{
|
||||
// The fleet state is managed by the GatewayEventBridgeService.
|
||||
// For now, return an empty array — the bridge service will push
|
||||
// updates as they arrive from the Gateway.
|
||||
_logger.LogDebug("Fleet snapshot requested by {ConnectionId}", Context.ConnectionId);
|
||||
return Task.FromResult(Array.Empty<AgentCardData>());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Overrides <see cref="Hub.OnDisconnectedAsync"/> to perform cleanup.
|
||||
/// SignalR automatically removes disconnected connections from all groups.
|
||||
/// </summary>
|
||||
/// <param name="exception">Exception that caused the disconnection, if any.</param>
|
||||
public override Task OnDisconnectedAsync(Exception? exception)
|
||||
{
|
||||
_logger.LogDebug("Connection {ConnectionId} disconnected", Context.ConnectionId);
|
||||
return base.OnDisconnectedAsync(exception);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The SignalR group name for the entire fleet (all agents).
|
||||
/// </summary>
|
||||
internal const string FleetGroupName = "fleet";
|
||||
|
||||
/// <summary>
|
||||
/// Returns the SignalR group name for a specific agent.
|
||||
/// Format: <c>agent:{agentId}</c> (lowercase for consistency).
|
||||
/// </summary>
|
||||
/// <param name="agentId">The agent identifier.</param>
|
||||
internal static string AgentGroupName(string agentId) =>
|
||||
$"agent:{agentId.ToLowerInvariant()}";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extension methods for pushing real-time agent updates through
|
||||
/// the <see cref="IHubContext{T}"/> of <see cref="AgentStatusHub"/>.
|
||||
///
|
||||
/// <para>These methods are intended to be called from background services
|
||||
/// (e.g., <see cref="Services.GatewayEventBridgeService"/>) or other
|
||||
/// server-side code that detects an agent state change.</para>
|
||||
/// </summary>
|
||||
public static class AgentStatusHubExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Pushes an agent status change to all clients subscribed to
|
||||
/// the fleet group and the specific agent's group.
|
||||
///
|
||||
/// <para>Call this from any background service when an agent's
|
||||
/// operational status changes (e.g., the Gateway reports a
|
||||
/// session transition from "running" to "done").</para>
|
||||
/// </summary>
|
||||
/// <param name="hubContext">The hub context injected via DI.</param>
|
||||
/// <param name="update">The agent status update payload.</param>
|
||||
/// <returns>A Task that completes when the message has been sent to all group members.</returns>
|
||||
public static async Task PushAgentStatusAsync(
|
||||
this IHubContext<AgentStatusHub, IAgentStatusClient> hubContext,
|
||||
AgentStatusUpdate update)
|
||||
{
|
||||
// Broadcast to the fleet group (all subscribers)
|
||||
await hubContext.Clients.Group(AgentStatusHub.FleetGroupName)
|
||||
.AgentStatusChanged(update);
|
||||
|
||||
// Also push to the specific agent's group
|
||||
var agentGroup = AgentStatusHub.AgentGroupName(update.AgentId);
|
||||
await hubContext.Clients.Group(agentGroup)
|
||||
.AgentStatusChanged(update);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Pushes a task progress update to all clients subscribed to
|
||||
/// the fleet group and the specific agent's group.
|
||||
/// </summary>
|
||||
/// <param name="hubContext">The hub context injected via DI.</param>
|
||||
/// <param name="progress">The task progress update payload.</param>
|
||||
/// <returns>A Task that completes when the message has been sent to all group members.</returns>
|
||||
public static async Task PushTaskProgressAsync(
|
||||
this IHubContext<AgentStatusHub, IAgentStatusClient> hubContext,
|
||||
TaskProgressUpdate progress)
|
||||
{
|
||||
// Broadcast to the fleet group
|
||||
await hubContext.Clients.Group(AgentStatusHub.FleetGroupName)
|
||||
.AgentTaskProgress(progress);
|
||||
|
||||
// Also push to the specific agent's group
|
||||
var agentGroup = AgentStatusHub.AgentGroupName(progress.AgentId);
|
||||
await hubContext.Clients.Group(agentGroup)
|
||||
.AgentTaskProgress(progress);
|
||||
}
|
||||
}
|
||||
30
backend/ControlCenter/Hubs/IAgentStatusClient.cs
Normal file
30
backend/ControlCenter/Hubs/IAgentStatusClient.cs
Normal file
@@ -0,0 +1,30 @@
|
||||
namespace ControlCenter.Hubs;
|
||||
|
||||
/// <summary>
|
||||
/// Strongly-typed client interface for the AgentStatus SignalR hub.
|
||||
/// Defines the methods the server can invoke on connected clients
|
||||
/// to push real-time agent status and task progress updates.
|
||||
///
|
||||
/// <para>All server-to-client calls go through this interface for
|
||||
/// compile-time safety — matching the pattern used by the
|
||||
/// Extrudex PrinterHub.</para>
|
||||
/// </summary>
|
||||
public interface IAgentStatusClient
|
||||
{
|
||||
/// <summary>
|
||||
/// Pushes an agent status change to all subscribed clients.
|
||||
/// Fired whenever an agent's operational status changes
|
||||
/// (e.g., idle → active, active → thinking, active → error).
|
||||
/// </summary>
|
||||
/// <param name="update">The full status update payload.</param>
|
||||
/// <returns>A Task that completes when the client has processed the update.</returns>
|
||||
Task AgentStatusChanged(AgentStatusUpdate update);
|
||||
|
||||
/// <summary>
|
||||
/// Pushes a task progress update to all subscribed clients.
|
||||
/// Fired when an agent reports progress on its current task.
|
||||
/// </summary>
|
||||
/// <param name="progress">The task progress update payload.</param>
|
||||
/// <returns>A Task that completes when the client has processed the update.</returns>
|
||||
Task AgentTaskProgress(TaskProgressUpdate progress);
|
||||
}
|
||||
92
backend/ControlCenter/Hubs/Models/AgentStatusModels.cs
Normal file
92
backend/ControlCenter/Hubs/Models/AgentStatusModels.cs
Normal file
@@ -0,0 +1,92 @@
|
||||
namespace ControlCenter;
|
||||
|
||||
/// <summary>
|
||||
/// Agent operational status derived from OpenClaw Gateway session activity.
|
||||
/// Maps to the frontend AgentStatus type: 'active' | 'idle' | 'thinking' | 'error'.
|
||||
/// </summary>
|
||||
public enum AgentStatus
|
||||
{
|
||||
/// <summary>Agent is currently processing a turn.</summary>
|
||||
Active,
|
||||
|
||||
/// <summary>Agent completed its last turn; no active work.</summary>
|
||||
Idle,
|
||||
|
||||
/// <summary>LLM call in flight; tokens streaming.</summary>
|
||||
Thinking,
|
||||
|
||||
/// <summary>Agent encountered an unhandled error.</summary>
|
||||
Error
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extended lifecycle status including offline — not all agents have active sessions.
|
||||
/// Used internally; clients only see <see cref="AgentStatus"/> (offline maps to idle).
|
||||
/// </summary>
|
||||
public enum AgentLifecycleStatus
|
||||
{
|
||||
Active,
|
||||
Idle,
|
||||
Thinking,
|
||||
Error,
|
||||
Offline
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Pushed to SignalR clients when an agent's status changes.
|
||||
/// Matches the TypeScript <c>AgentStatusUpdate</c> interface from the design spec.
|
||||
/// </summary>
|
||||
/// <param name="AgentId">Agent identifier, e.g. "otto", "dex".</param>
|
||||
/// <param name="DisplayName">Human-readable name, e.g. "Otto".</param>
|
||||
/// <param name="Role">Role description, e.g. "Orchestrator Agent".</param>
|
||||
/// <param name="Status">Current operational status.</param>
|
||||
/// <param name="CurrentTask">Description of the current task, if any.</param>
|
||||
/// <param name="SessionKey">Full session key, e.g. "agent:otto:telegram:direct:8787451565".</param>
|
||||
/// <param name="Channel">Channel the agent is operating on, e.g. "telegram".</param>
|
||||
/// <param name="LastActivity">ISO 8601 timestamp of last activity.</param>
|
||||
/// <param name="ErrorMessage">Error message when status is 'error'.</param>
|
||||
public record AgentStatusUpdate(
|
||||
string AgentId,
|
||||
string DisplayName,
|
||||
string Role,
|
||||
string Status,
|
||||
string? CurrentTask,
|
||||
string SessionKey,
|
||||
string Channel,
|
||||
string LastActivity,
|
||||
string? ErrorMessage = null
|
||||
);
|
||||
|
||||
/// <summary>
|
||||
/// Pushed to SignalR clients when an agent's task progress updates.
|
||||
/// Matches the TypeScript <c>TaskProgressUpdate</c> interface from the design spec.
|
||||
/// </summary>
|
||||
/// <param name="AgentId">Agent identifier.</param>
|
||||
/// <param name="TaskDescription">Description of the current task.</param>
|
||||
/// <param name="Progress">Task progress percentage (0–100), if trackable.</param>
|
||||
/// <param name="Elapsed">Elapsed time string, e.g. "04m 12s".</param>
|
||||
public record TaskProgressUpdate(
|
||||
string AgentId,
|
||||
string TaskDescription,
|
||||
int? Progress,
|
||||
string? Elapsed
|
||||
);
|
||||
|
||||
/// <summary>
|
||||
/// Snapshot of an agent's full card data, sent on initial connection
|
||||
/// or when the fleet state is requested.
|
||||
/// Matches the TypeScript <c>AgentCardData</c> interface from the design spec.
|
||||
/// </summary>
|
||||
public record AgentCardData(
|
||||
string Id,
|
||||
string DisplayName,
|
||||
string Role,
|
||||
string Status,
|
||||
string? CurrentTask,
|
||||
int? TaskProgress,
|
||||
string? TaskElapsed,
|
||||
string SessionKey,
|
||||
string Channel,
|
||||
string LastActivity,
|
||||
string? ErrorMessage
|
||||
);
|
||||
72
backend/ControlCenter/Program.cs
Normal file
72
backend/ControlCenter/Program.cs
Normal file
@@ -0,0 +1,72 @@
|
||||
using System.Reflection;
|
||||
using ControlCenter.Hubs;
|
||||
using ControlCenter.Services;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
// ── API Services ───────────────────────────────────────────
|
||||
builder.Services.AddControllers();
|
||||
builder.Services.AddEndpointsApiExplorer();
|
||||
builder.Services.AddSwaggerGen(c =>
|
||||
{
|
||||
c.SwaggerDoc("v1", new()
|
||||
{
|
||||
Title = "Control Center API",
|
||||
Version = "v1",
|
||||
Description = "OpenClaw Control Center — Command Hub backend with SignalR real-time updates"
|
||||
});
|
||||
|
||||
// Include XML doc comments in Swagger output
|
||||
var xmlFile = $"{Assembly.GetExecutingAssembly().GetName().Name}.xml";
|
||||
var xmlPath = Path.Combine(AppContext.BaseDirectory, xmlFile);
|
||||
if (File.Exists(xmlPath))
|
||||
{
|
||||
c.IncludeXmlComments(xmlPath);
|
||||
}
|
||||
});
|
||||
|
||||
// ── CORS (kiosk + remote browser) ─────────────────────────
|
||||
// The Control Center frontend runs on a different origin than the backend.
|
||||
// SignalR requires credentials for WebSocket transport, so we use
|
||||
// specific origins rather than AllowAnyOrigin.
|
||||
var corsOrigins = builder.Configuration.GetSection("Cors:AllowedOrigins")
|
||||
.Get<string[]>() ?? new[] { "http://localhost:4200", "http://localhost:5000" };
|
||||
|
||||
builder.Services.AddCors(options =>
|
||||
{
|
||||
options.AddDefaultPolicy(policy =>
|
||||
{
|
||||
policy.WithOrigins(corsOrigins)
|
||||
.AllowAnyMethod()
|
||||
.AllowAnyHeader()
|
||||
.AllowCredentials(); // Required for SignalR WebSocket
|
||||
});
|
||||
});
|
||||
|
||||
// ── SignalR (real-time agent status updates) ───────────────
|
||||
builder.Services.AddSignalR();
|
||||
|
||||
// ── Gateway Bridge Service ────────────────────────────────
|
||||
// Background service that connects to the OpenClaw Gateway WebSocket
|
||||
// and bridges events to the AgentStatus SignalR hub.
|
||||
builder.Services.AddSingleton<GatewayEventBridgeService>();
|
||||
builder.Services.AddHostedService(sp => sp.GetRequiredService<GatewayEventBridgeService>());
|
||||
|
||||
var app = builder.Build();
|
||||
|
||||
// ── Middleware ──────────────────────────────────────────────
|
||||
if (app.Environment.IsDevelopment())
|
||||
{
|
||||
app.UseSwagger();
|
||||
app.UseSwaggerUI();
|
||||
}
|
||||
|
||||
app.UseCors();
|
||||
app.UseAuthorization();
|
||||
app.MapControllers();
|
||||
|
||||
// ── Hub Endpoints ───────────────────────────────────────────
|
||||
// Agent status hub at /hubs/agent-status (matches the design spec)
|
||||
app.MapHub<AgentStatusHub>("/hubs/agent-status");
|
||||
|
||||
app.Run();
|
||||
14
backend/ControlCenter/Properties/launchSettings.json
Normal file
14
backend/ControlCenter/Properties/launchSettings.json
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"$schema": "https://json.schemastore.org/launchsettings.json",
|
||||
"profiles": {
|
||||
"http": {
|
||||
"commandName": "Project",
|
||||
"dotnetRunMessages": true,
|
||||
"launchBrowser": false,
|
||||
"applicationUrl": "http://localhost:5053",
|
||||
"environmentVariables": {
|
||||
"ASPNETCORE_ENVIRONMENT": "Development"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
523
backend/ControlCenter/Services/GatewayEventBridgeService.cs
Normal file
523
backend/ControlCenter/Services/GatewayEventBridgeService.cs
Normal file
@@ -0,0 +1,523 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using ControlCenter.Hubs;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
|
||||
namespace ControlCenter.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Background service that connects to the OpenClaw Gateway WebSocket
|
||||
/// and bridges Gateway events to the <see cref="Hubs.AgentStatusHub"/>.
|
||||
///
|
||||
/// <para>Architecture:</para>
|
||||
/// <list type="number">
|
||||
/// <item>Connects to the Gateway WS endpoint (configurable via appsettings)</item>
|
||||
/// <item>Handles the v3 protocol handshake (challenge → connect → hello-ok)</item>
|
||||
/// <item>Subscribes to <c>sessions.changed</c> and related events</item>
|
||||
/// <item>Translates session state changes into <see cref="AgentStatusUpdate"/>
|
||||
/// and <see cref="TaskProgressUpdate"/> objects</item>
|
||||
/// <item>Pushes updates through the <see cref="Hubs.AgentStatusHub"/> SignalR hub</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>This is the server-side bridge that allows Angular clients to
|
||||
/// receive real-time updates via SignalR instead of connecting directly
|
||||
/// to the Gateway WebSocket.</para>
|
||||
/// </summary>
|
||||
public class GatewayEventBridgeService : BackgroundService
|
||||
{
|
||||
private readonly ILogger<GatewayEventBridgeService> _logger;
|
||||
private readonly IHubContext<Hubs.AgentStatusHub, Hubs.IAgentStatusClient> _hubContext;
|
||||
private readonly IConfiguration _configuration;
|
||||
|
||||
/// <summary>
|
||||
/// In-memory fleet state — maps agent IDs to their latest card data.
|
||||
/// Updated on every <c>sessions.changed</c> event from the Gateway.
|
||||
/// </summary>
|
||||
private readonly ConcurrentDictionary<string, AgentCardData> _fleetState = new();
|
||||
|
||||
/// <summary>
|
||||
/// Known agent roles for display in the Command Hub.
|
||||
/// Maps agent IDs to their functional descriptions.
|
||||
/// </summary>
|
||||
private static readonly Dictionary<string, string> AgentRoles = new()
|
||||
{
|
||||
["main"] = "Primary Assistant",
|
||||
["otto"] = "Orchestrator Agent",
|
||||
["dave"] = "Network Admin Agent",
|
||||
["bob"] = "Content Writer Agent",
|
||||
["stuart"] = "Image & Creative Agent",
|
||||
["phil"] = "Home Automation Agent",
|
||||
["carl"] = "Security Agent",
|
||||
["larry"] = "Business Agent",
|
||||
["mel"] = "E-Commerce Agent",
|
||||
["norbert"] = "Product Agent",
|
||||
["jerry"] = "Market Research Agent",
|
||||
["rex"] = "Frontend Dev Agent",
|
||||
["dex"] = "Backend Dev Agent",
|
||||
["hex"] = "Database Agent",
|
||||
["pip"] = "Raspberry Pi Agent",
|
||||
["nano"] = "ESP32/Firmware Agent",
|
||||
["axiom"] = "Utility Agent",
|
||||
["bonnie"] = "Music Agent",
|
||||
["sketch"] = "UI/UX Design Agent",
|
||||
["flip"] = "Mobile Dev Agent",
|
||||
["buzz"] = "SEO Agent",
|
||||
["aries"] = "Companion Agent"
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Maps OpenClaw session status to <see cref="AgentStatus"/>.
|
||||
/// </summary>
|
||||
private static string MapSessionStatus(string? sessionStatus) => sessionStatus switch
|
||||
{
|
||||
"running" => "active",
|
||||
"streaming" => "thinking",
|
||||
"error" or "aborted" => "error",
|
||||
"done" => "idle",
|
||||
_ => "idle"
|
||||
};
|
||||
|
||||
public GatewayEventBridgeService(
|
||||
ILogger<GatewayEventBridgeService> logger,
|
||||
IHubContext<Hubs.AgentStatusHub, Hubs.IAgentStatusClient> hubContext,
|
||||
IConfiguration configuration)
|
||||
{
|
||||
_logger = logger;
|
||||
_hubContext = hubContext;
|
||||
_configuration = configuration;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the current fleet state snapshot.
|
||||
/// Used by the hub's <c>GetFleetSnapshot</c> method and by the
|
||||
/// <c>AgentsController</c> REST endpoint.
|
||||
/// </summary>
|
||||
public AgentCardData[] GetFleetSnapshot() =>
|
||||
_fleetState.Values.ToArray();
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("Gateway Event Bridge service starting");
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await ConnectAndListenAsync(stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogInformation("Gateway Event Bridge service stopping");
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Gateway connection lost, reconnecting in 5 seconds...");
|
||||
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connects to the OpenClaw Gateway WebSocket and processes events
|
||||
/// until the connection is lost or cancellation is requested.
|
||||
/// </summary>
|
||||
private async Task ConnectAndListenAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var gatewayUrl = _configuration["Gateway:WebSocketUrl"]
|
||||
?? "ws://localhost:3271/ws";
|
||||
var authToken = _configuration["Gateway:AuthToken"] ?? string.Empty;
|
||||
|
||||
_logger.LogInformation("Connecting to Gateway at {Url}", gatewayUrl);
|
||||
|
||||
using var ws = new ClientWebSocket();
|
||||
|
||||
// Set auth header if available
|
||||
if (!string.IsNullOrEmpty(authToken))
|
||||
{
|
||||
ws.Options.SetRequestHeader("Authorization", $"Bearer {authToken}");
|
||||
}
|
||||
|
||||
await ws.ConnectAsync(new Uri(gatewayUrl), stoppingToken);
|
||||
_logger.LogInformation("Connected to Gateway WebSocket");
|
||||
|
||||
// Start receiving messages
|
||||
await ReceiveMessagesAsync(ws, stoppingToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Receives and processes WebSocket messages from the Gateway.
|
||||
/// Handles the v3 protocol handshake and dispatches events.
|
||||
/// </summary>
|
||||
private async Task ReceiveMessagesAsync(ClientWebSocket ws, CancellationToken stoppingToken)
|
||||
{
|
||||
var buffer = new byte[8192];
|
||||
var messageBuilder = new StringBuilder();
|
||||
|
||||
while (ws.State == WebSocketState.Open && !stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
WebSocketReceiveResult result;
|
||||
try
|
||||
{
|
||||
result = await ws.ReceiveAsync(buffer, stoppingToken);
|
||||
}
|
||||
catch (WebSocketException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "WebSocket receive error");
|
||||
break;
|
||||
}
|
||||
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
_logger.LogInformation("Gateway WebSocket closed by server");
|
||||
break;
|
||||
}
|
||||
|
||||
messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
|
||||
|
||||
if (result.EndOfMessage)
|
||||
{
|
||||
var message = messageBuilder.ToString();
|
||||
messageBuilder.Clear();
|
||||
await ProcessMessageAsync(ws, message, stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Processes a single WebSocket message from the Gateway.
|
||||
/// Routes based on the message type: event, response, or challenge.
|
||||
/// </summary>
|
||||
private async Task ProcessMessageAsync(
|
||||
ClientWebSocket ws, string message, CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(message);
|
||||
var root = doc.RootElement;
|
||||
|
||||
var type = root.GetProperty("type").GetString();
|
||||
|
||||
switch (type)
|
||||
{
|
||||
case "event":
|
||||
await HandleGatewayEventAsync(root);
|
||||
break;
|
||||
|
||||
case "res":
|
||||
HandleGatewayResponse(root);
|
||||
break;
|
||||
}
|
||||
|
||||
// Special handling for connect.challenge events
|
||||
if (root.TryGetProperty("event", out var eventName) &&
|
||||
eventName.GetString() == "connect.challenge")
|
||||
{
|
||||
await HandleConnectChallengeAsync(ws, root, stoppingToken);
|
||||
}
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to parse Gateway message: {Message}",
|
||||
message.Length > 200 ? message[..200] + "..." : message);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles the Gateway connect.challenge event by sending
|
||||
/// a connect request with authentication credentials.
|
||||
/// </summary>
|
||||
private async Task HandleConnectChallengeAsync(
|
||||
ClientWebSocket ws, JsonElement root, CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("Received connect.challenge from Gateway");
|
||||
|
||||
var connectRequest = new
|
||||
{
|
||||
type = "req",
|
||||
id = $"bridge-{Guid.NewGuid():N}",
|
||||
method = "connect",
|
||||
@params = new
|
||||
{
|
||||
minProtocol = 3,
|
||||
maxProtocol = 3,
|
||||
client = new
|
||||
{
|
||||
id = "control-center-backend",
|
||||
version = "1.0.0",
|
||||
platform = "server",
|
||||
mode = "operator"
|
||||
},
|
||||
role = "operator",
|
||||
scopes = new[] { "operator.read", "operator.write" },
|
||||
auth = new
|
||||
{
|
||||
token = _configuration["Gateway:AuthToken"] ?? string.Empty
|
||||
},
|
||||
locale = "en-US",
|
||||
userAgent = "control-center-backend/1.0.0"
|
||||
}
|
||||
};
|
||||
|
||||
var json = JsonSerializer.Serialize(connectRequest);
|
||||
var bytes = Encoding.UTF8.GetBytes(json);
|
||||
await ws.SendAsync(bytes, WebSocketMessageType.Text, true, stoppingToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a Gateway event message by dispatching to the
|
||||
/// appropriate handler based on event name.
|
||||
/// </summary>
|
||||
private async Task HandleGatewayEventAsync(JsonElement root)
|
||||
{
|
||||
if (!root.TryGetProperty("event", out var eventProp)) return;
|
||||
var eventName = eventProp.GetString();
|
||||
|
||||
_logger.LogDebug("Gateway event: {Event}", eventName);
|
||||
|
||||
switch (eventName)
|
||||
{
|
||||
case "sessions.changed":
|
||||
await HandleSessionsChangedAsync(root);
|
||||
break;
|
||||
|
||||
case "session.message":
|
||||
HandleSessionMessage(root);
|
||||
break;
|
||||
|
||||
case "session.tool":
|
||||
HandleSessionTool(root);
|
||||
break;
|
||||
|
||||
case "health":
|
||||
HandleHealthEvent(root);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a sessions.changed event from the Gateway.
|
||||
/// Updates the fleet state and pushes status changes through SignalR.
|
||||
/// </summary>
|
||||
private async Task HandleSessionsChangedAsync(JsonElement root)
|
||||
{
|
||||
if (!root.TryGetProperty("payload", out var payload)) return;
|
||||
|
||||
// The payload may contain a snapshot of all sessions
|
||||
if (payload.TryGetProperty("snapshot", out var snapshot) &&
|
||||
snapshot.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
foreach (var session in snapshot.EnumerateArray())
|
||||
{
|
||||
var cardData = SessionToCardData(session);
|
||||
if (cardData is null) continue;
|
||||
|
||||
_fleetState[cardData.Id] = cardData;
|
||||
|
||||
var update = new AgentStatusUpdate(
|
||||
AgentId: cardData.Id,
|
||||
DisplayName: cardData.DisplayName,
|
||||
Role: cardData.Role,
|
||||
Status: cardData.Status,
|
||||
CurrentTask: cardData.CurrentTask,
|
||||
SessionKey: cardData.SessionKey,
|
||||
Channel: cardData.Channel,
|
||||
LastActivity: cardData.LastActivity,
|
||||
ErrorMessage: cardData.ErrorMessage
|
||||
);
|
||||
|
||||
await _hubContext.PushAgentStatusAsync(update);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle individual updates/added/removed
|
||||
if (payload.TryGetProperty("updated", out var updated) &&
|
||||
updated.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
foreach (var sessionKey in updated.EnumerateArray())
|
||||
{
|
||||
_logger.LogDebug("Session updated: {Key}", sessionKey.GetString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a session.message event. Updates the agent's last activity
|
||||
/// and pushes a status update if the status changed.
|
||||
/// </summary>
|
||||
private void HandleSessionMessage(JsonElement root)
|
||||
{
|
||||
if (!root.TryGetProperty("payload", out var payload)) return;
|
||||
if (!payload.TryGetProperty("sessionKey", out var sessionKeyProp)) return;
|
||||
|
||||
var sessionKey = sessionKeyProp.GetString() ?? string.Empty;
|
||||
var agentId = ExtractAgentId(sessionKey);
|
||||
|
||||
if (string.IsNullOrEmpty(agentId)) return;
|
||||
|
||||
// Update last activity timestamp
|
||||
if (_fleetState.TryGetValue(agentId, out var existing))
|
||||
{
|
||||
_fleetState[agentId] = existing with
|
||||
{
|
||||
LastActivity = DateTime.UtcNow.ToString("o"),
|
||||
Status = "active"
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a session.tool event. Extracts tool progress information
|
||||
/// and pushes a <see cref="TaskProgressUpdate"/> through SignalR.
|
||||
/// </summary>
|
||||
private void HandleSessionTool(JsonElement root)
|
||||
{
|
||||
if (!root.TryGetProperty("payload", out var payload)) return;
|
||||
if (!payload.TryGetProperty("sessionKey", out var sessionKeyProp)) return;
|
||||
|
||||
var sessionKey = sessionKeyProp.GetString() ?? string.Empty;
|
||||
var agentId = ExtractAgentId(sessionKey);
|
||||
|
||||
if (string.IsNullOrEmpty(agentId)) return;
|
||||
|
||||
var toolName = payload.TryGetProperty("toolName", out var tn) ? tn.GetString() : null;
|
||||
var toolStatus = payload.TryGetProperty("status", out var ts) ? ts.GetString() : null;
|
||||
|
||||
if (toolName is null || toolStatus is null) return;
|
||||
|
||||
var progress = toolStatus switch
|
||||
{
|
||||
"started" => 0,
|
||||
"completed" => 100,
|
||||
_ => (int?)null
|
||||
};
|
||||
|
||||
var update = new TaskProgressUpdate(
|
||||
AgentId: agentId,
|
||||
TaskDescription: $"{toolName} ({toolStatus})",
|
||||
Progress: progress,
|
||||
Elapsed: null
|
||||
);
|
||||
|
||||
// Fire and forget — don't block the event loop
|
||||
_ = _hubContext.PushTaskProgressAsync(update);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a health event from the Gateway.
|
||||
/// Logs the health status for diagnostics.
|
||||
/// </summary>
|
||||
private void HandleHealthEvent(JsonElement root)
|
||||
{
|
||||
if (!root.TryGetProperty("payload", out var payload)) return;
|
||||
|
||||
var ok = payload.TryGetProperty("ok", out var okProp) && okProp.GetBoolean();
|
||||
var status = payload.TryGetProperty("status", out var s) ? s.GetString() : "unknown";
|
||||
|
||||
_logger.LogInformation("Gateway health: ok={Ok}, status={Status}", ok, status);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a Gateway response message. Currently only logs for diagnostics.
|
||||
/// </summary>
|
||||
private void HandleGatewayResponse(JsonElement root)
|
||||
{
|
||||
if (root.TryGetProperty("ok", out var okProp) && okProp.GetBoolean())
|
||||
{
|
||||
_logger.LogDebug("Gateway RPC response OK");
|
||||
|
||||
// Check for hello-ok after connect
|
||||
if (root.TryGetProperty("payload", out var payload) &&
|
||||
payload.TryGetProperty("type", out var typeProp) &&
|
||||
typeProp.GetString() == "hello-ok")
|
||||
{
|
||||
_logger.LogInformation("Gateway handshake complete (hello-ok received)");
|
||||
}
|
||||
}
|
||||
else if (root.TryGetProperty("error", out var error))
|
||||
{
|
||||
var errorMsg = error.TryGetProperty("message", out var msg)
|
||||
? msg.GetString() : "unknown error";
|
||||
_logger.LogWarning("Gateway RPC error: {Error}", errorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Converts a raw Gateway session JSON element into an
|
||||
/// <see cref="AgentCardData"/> record.
|
||||
/// </summary>
|
||||
private AgentCardData? SessionToCardData(JsonElement session)
|
||||
{
|
||||
// Extract agent ID from session key or agentId field
|
||||
string? agentId = null;
|
||||
if (session.TryGetProperty("agentId", out var aid))
|
||||
agentId = aid.GetString();
|
||||
else if (session.TryGetProperty("key", out var key))
|
||||
agentId = ExtractAgentId(key.GetString() ?? string.Empty);
|
||||
|
||||
if (string.IsNullOrEmpty(agentId)) return null;
|
||||
|
||||
var sessionKey = session.TryGetProperty("key", out var sk)
|
||||
? sk.GetString() ?? string.Empty : string.Empty;
|
||||
|
||||
var status = session.TryGetProperty("status", out var s)
|
||||
? MapSessionStatus(s.GetString()) : "idle";
|
||||
|
||||
var channel = ExtractChannel(session);
|
||||
|
||||
var lastActivity = session.TryGetProperty("updatedAt", out var ua)
|
||||
? DateTimeOffset.FromUnixTimeMilliseconds(ua.GetInt64()).ToString("o")
|
||||
: DateTime.UtcNow.ToString("o");
|
||||
|
||||
var displayName = char.ToUpperInvariant(agentId![0]) + agentId[1..];
|
||||
var role = AgentRoles.GetValueOrDefault(agentId!, "Agent");
|
||||
|
||||
return new AgentCardData(
|
||||
Id: agentId!,
|
||||
DisplayName: displayName,
|
||||
Role: role,
|
||||
Status: status,
|
||||
CurrentTask: null,
|
||||
TaskProgress: null,
|
||||
TaskElapsed: null,
|
||||
SessionKey: sessionKey,
|
||||
Channel: channel,
|
||||
LastActivity: lastActivity,
|
||||
ErrorMessage: status == "error" ? "Agent encountered an error" : null
|
||||
);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extracts the agent ID from a session key.
|
||||
/// Session key format: "agent:{agentId}:{channel}:..."
|
||||
/// </summary>
|
||||
private static string? ExtractAgentId(string sessionKey)
|
||||
{
|
||||
if (string.IsNullOrEmpty(sessionKey)) return null;
|
||||
|
||||
var parts = sessionKey.Split(':');
|
||||
if (parts.Length >= 2 && parts[0] == "agent")
|
||||
return parts[1];
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extracts the channel from a session element.
|
||||
/// </summary>
|
||||
private static string ExtractChannel(JsonElement session)
|
||||
{
|
||||
// Try direct "channel" property
|
||||
if (session.TryGetProperty("channel", out var ch))
|
||||
return ch.GetString() ?? "unknown";
|
||||
|
||||
// Try origin.provider
|
||||
if (session.TryGetProperty("origin", out var origin) &&
|
||||
origin.TryGetProperty("provider", out var provider))
|
||||
return provider.GetString() ?? "unknown";
|
||||
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
19
backend/ControlCenter/appsettings.Development.json
Normal file
19
backend/ControlCenter/appsettings.Development.json
Normal file
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Debug",
|
||||
"Microsoft.AspNetCore": "Information",
|
||||
"ControlCenter": "Debug"
|
||||
}
|
||||
},
|
||||
"Gateway": {
|
||||
"WebSocketUrl": "ws://localhost:3271/ws",
|
||||
"AuthToken": ""
|
||||
},
|
||||
"Cors": {
|
||||
"AllowedOrigins": [
|
||||
"http://localhost:4200",
|
||||
"http://localhost:5000"
|
||||
]
|
||||
}
|
||||
}
|
||||
22
backend/ControlCenter/appsettings.json
Normal file
22
backend/ControlCenter/appsettings.json
Normal file
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Information",
|
||||
"Microsoft.AspNetCore": "Warning",
|
||||
"ControlCenter": "Debug"
|
||||
}
|
||||
},
|
||||
"AllowedHosts": "*",
|
||||
|
||||
"Gateway": {
|
||||
"WebSocketUrl": "ws://localhost:3271/ws",
|
||||
"AuthToken": ""
|
||||
},
|
||||
|
||||
"Cors": {
|
||||
"AllowedOrigins": [
|
||||
"http://localhost:4200",
|
||||
"http://localhost:5000"
|
||||
]
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user