- Add AgentStatusHub with typed IAgentStatusClient interface
- Hub at /hubs/agent-status (matches design spec)
- Fleet group + per-agent group subscription
- AgentStatusChanged and AgentTaskProgress push events
- Extension methods for server-side push via IHubContext
- Add GatewayEventBridgeService background service
- Connects to OpenClaw Gateway WebSocket (v3 protocol)
- Handles challenge → connect → hello-ok handshake
- Bridges sessions.changed, session.message, session.tool events
- Translates Gateway session status to AgentStatus enum
- Maintains in-memory fleet state for snapshot queries
- Add REST API controllers
- GET /api/agents — fleet status snapshot
- GET /api/agents/{agentId} — single agent status
- GET /api/logs/{agentId} — agent session logs (stub)
- POST /api/command/stop/{agentId} — stop agent
- POST /api/command/restart/{agentId} — restart agent
- POST /api/command/steer/{agentId} — inject message
- Add models matching TypeScript spec interfaces
- AgentStatusUpdate, TaskProgressUpdate, AgentCardData
- AgentStatus enum (active/idle/thinking/error)
- Configure CORS with credentials for SignalR WebSocket
- Configure Swagger/OpenAPI with XML doc comments
- Agent role map matching frontend AGENT_ROLES constant
523 lines
18 KiB
C#
523 lines
18 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Net.WebSockets;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using ControlCenter.Hubs;
|
|
using Microsoft.AspNetCore.SignalR;
|
|
|
|
namespace ControlCenter.Services;
|
|
|
|
/// <summary>
|
|
/// Background service that connects to the OpenClaw Gateway WebSocket
|
|
/// and bridges Gateway events to the <see cref="Hubs.AgentStatusHub"/>.
|
|
///
|
|
/// <para>Architecture:</para>
|
|
/// <list type="number">
|
|
/// <item>Connects to the Gateway WS endpoint (configurable via appsettings)</item>
|
|
/// <item>Handles the v3 protocol handshake (challenge → connect → hello-ok)</item>
|
|
/// <item>Subscribes to <c>sessions.changed</c> and related events</item>
|
|
/// <item>Translates session state changes into <see cref="AgentStatusUpdate"/>
|
|
/// and <see cref="TaskProgressUpdate"/> objects</item>
|
|
/// <item>Pushes updates through the <see cref="Hubs.AgentStatusHub"/> SignalR hub</item>
|
|
/// </list>
|
|
///
|
|
/// <para>This is the server-side bridge that allows Angular clients to
|
|
/// receive real-time updates via SignalR instead of connecting directly
|
|
/// to the Gateway WebSocket.</para>
|
|
/// </summary>
|
|
public class GatewayEventBridgeService : BackgroundService
|
|
{
|
|
private readonly ILogger<GatewayEventBridgeService> _logger;
|
|
private readonly IHubContext<Hubs.AgentStatusHub, Hubs.IAgentStatusClient> _hubContext;
|
|
private readonly IConfiguration _configuration;
|
|
|
|
/// <summary>
|
|
/// In-memory fleet state — maps agent IDs to their latest card data.
|
|
/// Updated on every <c>sessions.changed</c> event from the Gateway.
|
|
/// </summary>
|
|
private readonly ConcurrentDictionary<string, AgentCardData> _fleetState = new();
|
|
|
|
/// <summary>
|
|
/// Known agent roles for display in the Command Hub.
|
|
/// Maps agent IDs to their functional descriptions.
|
|
/// </summary>
|
|
private static readonly Dictionary<string, string> AgentRoles = new()
|
|
{
|
|
["main"] = "Primary Assistant",
|
|
["otto"] = "Orchestrator Agent",
|
|
["dave"] = "Network Admin Agent",
|
|
["bob"] = "Content Writer Agent",
|
|
["stuart"] = "Image & Creative Agent",
|
|
["phil"] = "Home Automation Agent",
|
|
["carl"] = "Security Agent",
|
|
["larry"] = "Business Agent",
|
|
["mel"] = "E-Commerce Agent",
|
|
["norbert"] = "Product Agent",
|
|
["jerry"] = "Market Research Agent",
|
|
["rex"] = "Frontend Dev Agent",
|
|
["dex"] = "Backend Dev Agent",
|
|
["hex"] = "Database Agent",
|
|
["pip"] = "Raspberry Pi Agent",
|
|
["nano"] = "ESP32/Firmware Agent",
|
|
["axiom"] = "Utility Agent",
|
|
["bonnie"] = "Music Agent",
|
|
["sketch"] = "UI/UX Design Agent",
|
|
["flip"] = "Mobile Dev Agent",
|
|
["buzz"] = "SEO Agent",
|
|
["aries"] = "Companion Agent"
|
|
};
|
|
|
|
/// <summary>
|
|
/// Maps OpenClaw session status to <see cref="AgentStatus"/>.
|
|
/// </summary>
|
|
private static string MapSessionStatus(string? sessionStatus) => sessionStatus switch
|
|
{
|
|
"running" => "active",
|
|
"streaming" => "thinking",
|
|
"error" or "aborted" => "error",
|
|
"done" => "idle",
|
|
_ => "idle"
|
|
};
|
|
|
|
public GatewayEventBridgeService(
|
|
ILogger<GatewayEventBridgeService> logger,
|
|
IHubContext<Hubs.AgentStatusHub, Hubs.IAgentStatusClient> hubContext,
|
|
IConfiguration configuration)
|
|
{
|
|
_logger = logger;
|
|
_hubContext = hubContext;
|
|
_configuration = configuration;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the current fleet state snapshot.
|
|
/// Used by the hub's <c>GetFleetSnapshot</c> method and by the
|
|
/// <c>AgentsController</c> REST endpoint.
|
|
/// </summary>
|
|
public AgentCardData[] GetFleetSnapshot() =>
|
|
_fleetState.Values.ToArray();
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("Gateway Event Bridge service starting");
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await ConnectAndListenAsync(stoppingToken);
|
|
}
|
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
{
|
|
_logger.LogInformation("Gateway Event Bridge service stopping");
|
|
break;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Gateway connection lost, reconnecting in 5 seconds...");
|
|
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Connects to the OpenClaw Gateway WebSocket and processes events
|
|
/// until the connection is lost or cancellation is requested.
|
|
/// </summary>
|
|
private async Task ConnectAndListenAsync(CancellationToken stoppingToken)
|
|
{
|
|
var gatewayUrl = _configuration["Gateway:WebSocketUrl"]
|
|
?? "ws://localhost:3271/ws";
|
|
var authToken = _configuration["Gateway:AuthToken"] ?? string.Empty;
|
|
|
|
_logger.LogInformation("Connecting to Gateway at {Url}", gatewayUrl);
|
|
|
|
using var ws = new ClientWebSocket();
|
|
|
|
// Set auth header if available
|
|
if (!string.IsNullOrEmpty(authToken))
|
|
{
|
|
ws.Options.SetRequestHeader("Authorization", $"Bearer {authToken}");
|
|
}
|
|
|
|
await ws.ConnectAsync(new Uri(gatewayUrl), stoppingToken);
|
|
_logger.LogInformation("Connected to Gateway WebSocket");
|
|
|
|
// Start receiving messages
|
|
await ReceiveMessagesAsync(ws, stoppingToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Receives and processes WebSocket messages from the Gateway.
|
|
/// Handles the v3 protocol handshake and dispatches events.
|
|
/// </summary>
|
|
private async Task ReceiveMessagesAsync(ClientWebSocket ws, CancellationToken stoppingToken)
|
|
{
|
|
var buffer = new byte[8192];
|
|
var messageBuilder = new StringBuilder();
|
|
|
|
while (ws.State == WebSocketState.Open && !stoppingToken.IsCancellationRequested)
|
|
{
|
|
WebSocketReceiveResult result;
|
|
try
|
|
{
|
|
result = await ws.ReceiveAsync(buffer, stoppingToken);
|
|
}
|
|
catch (WebSocketException ex)
|
|
{
|
|
_logger.LogWarning(ex, "WebSocket receive error");
|
|
break;
|
|
}
|
|
|
|
if (result.MessageType == WebSocketMessageType.Close)
|
|
{
|
|
_logger.LogInformation("Gateway WebSocket closed by server");
|
|
break;
|
|
}
|
|
|
|
messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
|
|
|
|
if (result.EndOfMessage)
|
|
{
|
|
var message = messageBuilder.ToString();
|
|
messageBuilder.Clear();
|
|
await ProcessMessageAsync(ws, message, stoppingToken);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Processes a single WebSocket message from the Gateway.
|
|
/// Routes based on the message type: event, response, or challenge.
|
|
/// </summary>
|
|
private async Task ProcessMessageAsync(
|
|
ClientWebSocket ws, string message, CancellationToken stoppingToken)
|
|
{
|
|
try
|
|
{
|
|
using var doc = JsonDocument.Parse(message);
|
|
var root = doc.RootElement;
|
|
|
|
var type = root.GetProperty("type").GetString();
|
|
|
|
switch (type)
|
|
{
|
|
case "event":
|
|
await HandleGatewayEventAsync(root);
|
|
break;
|
|
|
|
case "res":
|
|
HandleGatewayResponse(root);
|
|
break;
|
|
}
|
|
|
|
// Special handling for connect.challenge events
|
|
if (root.TryGetProperty("event", out var eventName) &&
|
|
eventName.GetString() == "connect.challenge")
|
|
{
|
|
await HandleConnectChallengeAsync(ws, root, stoppingToken);
|
|
}
|
|
}
|
|
catch (JsonException ex)
|
|
{
|
|
_logger.LogWarning(ex, "Failed to parse Gateway message: {Message}",
|
|
message.Length > 200 ? message[..200] + "..." : message);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles the Gateway connect.challenge event by sending
|
|
/// a connect request with authentication credentials.
|
|
/// </summary>
|
|
private async Task HandleConnectChallengeAsync(
|
|
ClientWebSocket ws, JsonElement root, CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("Received connect.challenge from Gateway");
|
|
|
|
var connectRequest = new
|
|
{
|
|
type = "req",
|
|
id = $"bridge-{Guid.NewGuid():N}",
|
|
method = "connect",
|
|
@params = new
|
|
{
|
|
minProtocol = 3,
|
|
maxProtocol = 3,
|
|
client = new
|
|
{
|
|
id = "control-center-backend",
|
|
version = "1.0.0",
|
|
platform = "server",
|
|
mode = "operator"
|
|
},
|
|
role = "operator",
|
|
scopes = new[] { "operator.read", "operator.write" },
|
|
auth = new
|
|
{
|
|
token = _configuration["Gateway:AuthToken"] ?? string.Empty
|
|
},
|
|
locale = "en-US",
|
|
userAgent = "control-center-backend/1.0.0"
|
|
}
|
|
};
|
|
|
|
var json = JsonSerializer.Serialize(connectRequest);
|
|
var bytes = Encoding.UTF8.GetBytes(json);
|
|
await ws.SendAsync(bytes, WebSocketMessageType.Text, true, stoppingToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a Gateway event message by dispatching to the
|
|
/// appropriate handler based on event name.
|
|
/// </summary>
|
|
private async Task HandleGatewayEventAsync(JsonElement root)
|
|
{
|
|
if (!root.TryGetProperty("event", out var eventProp)) return;
|
|
var eventName = eventProp.GetString();
|
|
|
|
_logger.LogDebug("Gateway event: {Event}", eventName);
|
|
|
|
switch (eventName)
|
|
{
|
|
case "sessions.changed":
|
|
await HandleSessionsChangedAsync(root);
|
|
break;
|
|
|
|
case "session.message":
|
|
HandleSessionMessage(root);
|
|
break;
|
|
|
|
case "session.tool":
|
|
HandleSessionTool(root);
|
|
break;
|
|
|
|
case "health":
|
|
HandleHealthEvent(root);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a sessions.changed event from the Gateway.
|
|
/// Updates the fleet state and pushes status changes through SignalR.
|
|
/// </summary>
|
|
private async Task HandleSessionsChangedAsync(JsonElement root)
|
|
{
|
|
if (!root.TryGetProperty("payload", out var payload)) return;
|
|
|
|
// The payload may contain a snapshot of all sessions
|
|
if (payload.TryGetProperty("snapshot", out var snapshot) &&
|
|
snapshot.ValueKind == JsonValueKind.Array)
|
|
{
|
|
foreach (var session in snapshot.EnumerateArray())
|
|
{
|
|
var cardData = SessionToCardData(session);
|
|
if (cardData is null) continue;
|
|
|
|
_fleetState[cardData.Id] = cardData;
|
|
|
|
var update = new AgentStatusUpdate(
|
|
AgentId: cardData.Id,
|
|
DisplayName: cardData.DisplayName,
|
|
Role: cardData.Role,
|
|
Status: cardData.Status,
|
|
CurrentTask: cardData.CurrentTask,
|
|
SessionKey: cardData.SessionKey,
|
|
Channel: cardData.Channel,
|
|
LastActivity: cardData.LastActivity,
|
|
ErrorMessage: cardData.ErrorMessage
|
|
);
|
|
|
|
await _hubContext.PushAgentStatusAsync(update);
|
|
}
|
|
}
|
|
|
|
// Handle individual updates/added/removed
|
|
if (payload.TryGetProperty("updated", out var updated) &&
|
|
updated.ValueKind == JsonValueKind.Array)
|
|
{
|
|
foreach (var sessionKey in updated.EnumerateArray())
|
|
{
|
|
_logger.LogDebug("Session updated: {Key}", sessionKey.GetString());
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a session.message event. Updates the agent's last activity
|
|
/// and pushes a status update if the status changed.
|
|
/// </summary>
|
|
private void HandleSessionMessage(JsonElement root)
|
|
{
|
|
if (!root.TryGetProperty("payload", out var payload)) return;
|
|
if (!payload.TryGetProperty("sessionKey", out var sessionKeyProp)) return;
|
|
|
|
var sessionKey = sessionKeyProp.GetString() ?? string.Empty;
|
|
var agentId = ExtractAgentId(sessionKey);
|
|
|
|
if (string.IsNullOrEmpty(agentId)) return;
|
|
|
|
// Update last activity timestamp
|
|
if (_fleetState.TryGetValue(agentId, out var existing))
|
|
{
|
|
_fleetState[agentId] = existing with
|
|
{
|
|
LastActivity = DateTime.UtcNow.ToString("o"),
|
|
Status = "active"
|
|
};
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a session.tool event. Extracts tool progress information
|
|
/// and pushes a <see cref="TaskProgressUpdate"/> through SignalR.
|
|
/// </summary>
|
|
private void HandleSessionTool(JsonElement root)
|
|
{
|
|
if (!root.TryGetProperty("payload", out var payload)) return;
|
|
if (!payload.TryGetProperty("sessionKey", out var sessionKeyProp)) return;
|
|
|
|
var sessionKey = sessionKeyProp.GetString() ?? string.Empty;
|
|
var agentId = ExtractAgentId(sessionKey);
|
|
|
|
if (string.IsNullOrEmpty(agentId)) return;
|
|
|
|
var toolName = payload.TryGetProperty("toolName", out var tn) ? tn.GetString() : null;
|
|
var toolStatus = payload.TryGetProperty("status", out var ts) ? ts.GetString() : null;
|
|
|
|
if (toolName is null || toolStatus is null) return;
|
|
|
|
var progress = toolStatus switch
|
|
{
|
|
"started" => 0,
|
|
"completed" => 100,
|
|
_ => (int?)null
|
|
};
|
|
|
|
var update = new TaskProgressUpdate(
|
|
AgentId: agentId,
|
|
TaskDescription: $"{toolName} ({toolStatus})",
|
|
Progress: progress,
|
|
Elapsed: null
|
|
);
|
|
|
|
// Fire and forget — don't block the event loop
|
|
_ = _hubContext.PushTaskProgressAsync(update);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a health event from the Gateway.
|
|
/// Logs the health status for diagnostics.
|
|
/// </summary>
|
|
private void HandleHealthEvent(JsonElement root)
|
|
{
|
|
if (!root.TryGetProperty("payload", out var payload)) return;
|
|
|
|
var ok = payload.TryGetProperty("ok", out var okProp) && okProp.GetBoolean();
|
|
var status = payload.TryGetProperty("status", out var s) ? s.GetString() : "unknown";
|
|
|
|
_logger.LogInformation("Gateway health: ok={Ok}, status={Status}", ok, status);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a Gateway response message. Currently only logs for diagnostics.
|
|
/// </summary>
|
|
private void HandleGatewayResponse(JsonElement root)
|
|
{
|
|
if (root.TryGetProperty("ok", out var okProp) && okProp.GetBoolean())
|
|
{
|
|
_logger.LogDebug("Gateway RPC response OK");
|
|
|
|
// Check for hello-ok after connect
|
|
if (root.TryGetProperty("payload", out var payload) &&
|
|
payload.TryGetProperty("type", out var typeProp) &&
|
|
typeProp.GetString() == "hello-ok")
|
|
{
|
|
_logger.LogInformation("Gateway handshake complete (hello-ok received)");
|
|
}
|
|
}
|
|
else if (root.TryGetProperty("error", out var error))
|
|
{
|
|
var errorMsg = error.TryGetProperty("message", out var msg)
|
|
? msg.GetString() : "unknown error";
|
|
_logger.LogWarning("Gateway RPC error: {Error}", errorMsg);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Converts a raw Gateway session JSON element into an
|
|
/// <see cref="AgentCardData"/> record.
|
|
/// </summary>
|
|
private AgentCardData? SessionToCardData(JsonElement session)
|
|
{
|
|
// Extract agent ID from session key or agentId field
|
|
string? agentId = null;
|
|
if (session.TryGetProperty("agentId", out var aid))
|
|
agentId = aid.GetString();
|
|
else if (session.TryGetProperty("key", out var key))
|
|
agentId = ExtractAgentId(key.GetString() ?? string.Empty);
|
|
|
|
if (string.IsNullOrEmpty(agentId)) return null;
|
|
|
|
var sessionKey = session.TryGetProperty("key", out var sk)
|
|
? sk.GetString() ?? string.Empty : string.Empty;
|
|
|
|
var status = session.TryGetProperty("status", out var s)
|
|
? MapSessionStatus(s.GetString()) : "idle";
|
|
|
|
var channel = ExtractChannel(session);
|
|
|
|
var lastActivity = session.TryGetProperty("updatedAt", out var ua)
|
|
? DateTimeOffset.FromUnixTimeMilliseconds(ua.GetInt64()).ToString("o")
|
|
: DateTime.UtcNow.ToString("o");
|
|
|
|
var displayName = char.ToUpperInvariant(agentId![0]) + agentId[1..];
|
|
var role = AgentRoles.GetValueOrDefault(agentId!, "Agent");
|
|
|
|
return new AgentCardData(
|
|
Id: agentId!,
|
|
DisplayName: displayName,
|
|
Role: role,
|
|
Status: status,
|
|
CurrentTask: null,
|
|
TaskProgress: null,
|
|
TaskElapsed: null,
|
|
SessionKey: sessionKey,
|
|
Channel: channel,
|
|
LastActivity: lastActivity,
|
|
ErrorMessage: status == "error" ? "Agent encountered an error" : null
|
|
);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Extracts the agent ID from a session key.
|
|
/// Session key format: "agent:{agentId}:{channel}:..."
|
|
/// </summary>
|
|
private static string? ExtractAgentId(string sessionKey)
|
|
{
|
|
if (string.IsNullOrEmpty(sessionKey)) return null;
|
|
|
|
var parts = sessionKey.Split(':');
|
|
if (parts.Length >= 2 && parts[0] == "agent")
|
|
return parts[1];
|
|
|
|
return null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Extracts the channel from a session element.
|
|
/// </summary>
|
|
private static string ExtractChannel(JsonElement session)
|
|
{
|
|
// Try direct "channel" property
|
|
if (session.TryGetProperty("channel", out var ch))
|
|
return ch.GetString() ?? "unknown";
|
|
|
|
// Try origin.provider
|
|
if (session.TryGetProperty("origin", out var origin) &&
|
|
origin.TryGetProperty("provider", out var provider))
|
|
return provider.GetString() ?? "unknown";
|
|
|
|
return "unknown";
|
|
}
|
|
} |