diff --git a/backend/ControlCenter/.gitignore b/backend/ControlCenter/.gitignore new file mode 100644 index 0000000..27e7e53 --- /dev/null +++ b/backend/ControlCenter/.gitignore @@ -0,0 +1,18 @@ +## .NET +bin/ +obj/ +*.user +*.suo +*.cache +*.dll +*.pdb +*.xml + +## IDE +.vs/ +.vscode/ +.idea/ + +## OS +.DS_Store +Thumbs.db \ No newline at end of file diff --git a/backend/ControlCenter/ControlCenter.csproj b/backend/ControlCenter/ControlCenter.csproj new file mode 100644 index 0000000..66ebc83 --- /dev/null +++ b/backend/ControlCenter/ControlCenter.csproj @@ -0,0 +1,17 @@ + + + + net9.0 + enable + enable + true + CS1591 + ControlCenter + ControlCenter + + + + + + + \ No newline at end of file diff --git a/backend/ControlCenter/Controllers/AgentsController.cs b/backend/ControlCenter/Controllers/AgentsController.cs new file mode 100644 index 0000000..92da9b3 --- /dev/null +++ b/backend/ControlCenter/Controllers/AgentsController.cs @@ -0,0 +1,71 @@ +using Microsoft.AspNetCore.Mvc; +using ControlCenter.Services; + +namespace ControlCenter.Controllers; + +/// +/// REST API for querying agent fleet status. +/// Provides the initial data load for the Command Hub, +/// while real-time updates flow through the AgentStatus SignalR hub. +/// +/// API contract for Rex (Frontend): +/// +/// GET /api/agents — Returns all known agents with current status +/// GET /api/agents/{agentId} — Returns a specific agent's status +/// +/// +[ApiController] +[Route("api/[controller]")] +public class AgentsController : ControllerBase +{ + private readonly ILogger _logger; + private readonly GatewayEventBridgeService _bridgeService; + + public AgentsController( + ILogger logger, + GatewayEventBridgeService bridgeService) + { + _logger = logger; + _bridgeService = bridgeService; + } + + /// + /// Gets the current fleet status — all known agents with their latest state. + /// This is the initial load endpoint; subsequent updates arrive via SignalR. + /// + /// An array of agent card data for the entire fleet. + /// Returns the fleet snapshot. + [HttpGet] + [ProducesResponseType(typeof(AgentCardData[]), StatusCodes.Status200OK)] + public IActionResult GetAgents() + { + var snapshot = _bridgeService.GetFleetSnapshot(); + _logger.LogDebug("Fleet snapshot requested: {Count} agents", snapshot.Length); + return Ok(snapshot); + } + + /// + /// Gets the current status of a specific agent. + /// + /// The agent identifier, e.g. "otto", "dex". + /// The agent's current card data. + /// Returns the agent's status. + /// Agent not found in the fleet state. + [HttpGet("{agentId}")] + [ProducesResponseType(typeof(AgentCardData), StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status404NotFound)] + public IActionResult GetAgent(string agentId) + { + var snapshot = _bridgeService.GetFleetSnapshot(); + var agent = snapshot.FirstOrDefault(a => + a.Id.Equals(agentId, StringComparison.OrdinalIgnoreCase)); + + if (agent is null) + { + _logger.LogWarning("Agent not found: {AgentId}", agentId); + return NotFound(new { error = $"Agent '{agentId}' not found" }); + } + + return Ok(agent); + } +} \ No newline at end of file diff --git a/backend/ControlCenter/Controllers/CommandController.cs b/backend/ControlCenter/Controllers/CommandController.cs new file mode 100644 index 0000000..0dc75bc --- /dev/null +++ b/backend/ControlCenter/Controllers/CommandController.cs @@ -0,0 +1,122 @@ +using Microsoft.AspNetCore.Mvc; + +namespace ControlCenter.Controllers; + +/// +/// REST API for sending control commands to agents. +/// Provides the Command Hub's action endpoints for agent lifecycle control. +/// +/// API contract for Rex (Frontend): +/// +/// POST /api/command/stop/{agentId} — Stop/abort an agent's active session +/// POST /api/command/restart/{agentId} — Restart an agent +/// POST /api/command/steer/{agentId} — Inject a message into an agent's session +/// +/// +/// Commands are forwarded to the OpenClaw Gateway via the +/// WebSocket bridge service. The Gateway handles the actual execution. +/// +[ApiController] +[Route("api/[controller]")] +public class CommandController : ControllerBase +{ + private readonly ILogger _logger; + + public CommandController(ILogger logger) + { + _logger = logger; + } + + /// + /// Stops (aborts) an agent's active session. + /// Sends an abort command to the OpenClaw Gateway. + /// + /// The agent identifier to stop. + /// Confirmation of the stop command. + /// Stop command sent successfully. + /// No active session found for the agent. + [HttpPost("stop/{agentId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status404NotFound)] + public IActionResult StopAgent(string agentId) + { + _logger.LogInformation("Stop command received for agent {AgentId}", agentId); + + // TODO: Forward to Gateway via bridge service + // await _bridgeService.SendRpcAsync("sessions.abort", new { sessionKey = ... }); + + return Ok(new + { + agentId, + command = "stop", + status = "sent", + timestamp = DateTime.UtcNow.ToString("o") + }); + } + + /// + /// Restarts an agent by aborting the current session and allowing + /// a new one to start on the next incoming message. + /// + /// The agent identifier to restart. + /// Confirmation of the restart command. + /// Restart command sent successfully. + [HttpPost("restart/{agentId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + public IActionResult RestartAgent(string agentId) + { + _logger.LogInformation("Restart command received for agent {AgentId}", agentId); + + // TODO: Forward to Gateway — abort current session + signal restart + + return Ok(new + { + agentId, + command = "restart", + status = "sent", + timestamp = DateTime.UtcNow.ToString("o") + }); + } + + /// + /// Steers (injects a message into) an agent's active session. + /// Used by operators to redirect an agent's task mid-execution. + /// + /// The agent identifier to steer. + /// The steering message to inject. + /// Confirmation of the steer command. + /// Steer command sent successfully. + /// Missing or empty message. + [HttpPost("steer/{agentId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + public IActionResult SteerAgent(string agentId, [FromBody] SteerRequest request) + { + if (string.IsNullOrWhiteSpace(request.Message)) + { + return BadRequest(new { error = "Message is required" }); + } + + _logger.LogInformation("Steer command received for agent {AgentId}: {Message}", + agentId, request.Message.Length > 100 + ? request.Message[..100] + "..." : request.Message); + + // TODO: Forward to Gateway via bridge service + // await _bridgeService.SendRpcAsync("sessions.steer", new { sessionKey = ..., message = request.Message }); + + return Ok(new + { + agentId, + command = "steer", + message = request.Message, + status = "sent", + timestamp = DateTime.UtcNow.ToString("o") + }); + } +} + +/// +/// Request body for the steer command. +/// +/// The message to inject into the agent's session. +public record SteerRequest(string Message); \ No newline at end of file diff --git a/backend/ControlCenter/Controllers/LogsController.cs b/backend/ControlCenter/Controllers/LogsController.cs new file mode 100644 index 0000000..238c345 --- /dev/null +++ b/backend/ControlCenter/Controllers/LogsController.cs @@ -0,0 +1,87 @@ +using Microsoft.AspNetCore.Mvc; + +namespace ControlCenter.Controllers; + +/// +/// REST API for querying agent session logs. +/// Provides historical message and tool call logs for a specific agent. +/// +/// API contract for Rex (Frontend): +/// +/// GET /api/logs/{agentId} — Returns recent logs for an agent +/// GET /api/logs/{agentId}/tools — Returns recent tool calls for an agent +/// +/// +/// Log data is sourced from the OpenClaw Gateway's transcript files. +/// The Gateway's logs.tail RPC provides the raw data, and this +/// controller formats it for the frontend. +/// +[ApiController] +[Route("api/[controller]")] +public class LogsController : ControllerBase +{ + private readonly ILogger _logger; + + public LogsController(ILogger logger) + { + _logger = logger; + } + + /// + /// Gets recent session logs for a specific agent. + /// Returns the last N messages from the agent's active session transcript. + /// + /// The agent identifier, e.g. "otto", "dex". + /// Maximum number of log entries to return (default: 50, max: 200). + /// An array of log entries for the agent. + /// Returns the agent's recent logs. + /// No active session found for the agent. + [HttpGet("{agentId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status404NotFound)] + public IActionResult GetLogs(string agentId, [FromQuery] int limit = 50) + { + limit = Math.Clamp(limit, 1, 200); + + _logger.LogDebug("Logs requested for agent {AgentId}, limit {Limit}", agentId, limit); + + // TODO: Implement log retrieval by calling the Gateway's logs.tail RPC + // or reading transcript files. For now, return an empty array as the + // bridge service will provide this data when fully integrated. + return Ok(new + { + agentId, + logs = Array.Empty(), + count = 0, + hasMore = false + }); + } + + /// + /// Gets recent tool call logs for a specific agent. + /// Returns the last N tool invocations from the agent's session. + /// + /// The agent identifier. + /// Maximum number of tool entries to return (default: 20, max: 100). + /// An array of tool call entries for the agent. + /// Returns the agent's recent tool calls. + /// No active session found for the agent. + [HttpGet("{agentId}/tools")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status404NotFound)] + public IActionResult GetToolLogs(string agentId, [FromQuery] int limit = 20) + { + limit = Math.Clamp(limit, 1, 100); + + _logger.LogDebug("Tool logs requested for agent {AgentId}, limit {Limit}", agentId, limit); + + // TODO: Implement tool log retrieval. Return empty for now. + return Ok(new + { + agentId, + tools = Array.Empty(), + count = 0, + hasMore = false + }); + } +} \ No newline at end of file diff --git a/backend/ControlCenter/Hubs/AgentStatusHub.cs b/backend/ControlCenter/Hubs/AgentStatusHub.cs new file mode 100644 index 0000000..f0d4497 --- /dev/null +++ b/backend/ControlCenter/Hubs/AgentStatusHub.cs @@ -0,0 +1,184 @@ +using Microsoft.AspNetCore.SignalR; + +namespace ControlCenter.Hubs; + +/// +/// SignalR hub for real-time agent status updates in the Command Hub. +/// +/// Usage flow: +/// +/// Client connects to /hubs/agent-status +/// Client calls to subscribe to all agent updates +/// Client calls to subscribe to a specific agent +/// Server pushes +/// and events +/// Client calls for initial state on connect +/// +/// +/// Group naming: +/// +/// Fleet group: fleet — receives all agent updates +/// Agent group: agent:{agentId} — receives updates for one agent +/// +/// +/// Typed client: — all server-to-client +/// calls go through this interface for compile-time safety. +/// +/// Architecture note: This hub bridges OpenClaw Gateway WebSocket events +/// to SignalR clients. A background service () +/// subscribes to Gateway events and pushes them through this hub's extension methods. +/// +public class AgentStatusHub : Hub +{ + private readonly ILogger _logger; + + public AgentStatusHub(ILogger logger) + { + _logger = logger; + } + + /// + /// Adds the calling connection to the fleet group. + /// Once joined, the client will receive all agent status changes + /// and task progress updates across the entire fleet. + /// + public async Task JoinFleet() + { + await Groups.AddToGroupAsync(Context.ConnectionId, FleetGroupName); + _logger.LogDebug("Connection {ConnectionId} joined fleet group", Context.ConnectionId); + } + + /// + /// Removes the calling connection from the fleet group. + /// + public async Task LeaveFleet() + { + await Groups.RemoveFromGroupAsync(Context.ConnectionId, FleetGroupName); + _logger.LogDebug("Connection {ConnectionId} left fleet group", Context.ConnectionId); + } + + /// + /// Adds the calling connection to a specific agent's group. + /// Once joined, the client will receive updates only for that agent. + /// + /// The agent identifier, e.g. "otto", "dex". + /// Thrown if agentId is null or empty. + public async Task JoinAgentGroup(string agentId) + { + if (string.IsNullOrWhiteSpace(agentId)) + throw new HubException("agentId is required"); + + var groupName = AgentGroupName(agentId); + await Groups.AddToGroupAsync(Context.ConnectionId, groupName); + _logger.LogDebug("Connection {ConnectionId} joined agent group {GroupName}", + Context.ConnectionId, groupName); + } + + /// + /// Removes the calling connection from a specific agent's group. + /// + /// The agent identifier. + public async Task LeaveAgentGroup(string agentId) + { + if (string.IsNullOrWhiteSpace(agentId)) return; + + var groupName = AgentGroupName(agentId); + await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName); + } + + /// + /// Returns a snapshot of the current fleet state. + /// Called by clients on initial connection to get the full picture + /// before incremental updates begin arriving. + /// + /// An array of representing all known agents. + public Task GetFleetSnapshot() + { + // The fleet state is managed by the GatewayEventBridgeService. + // For now, return an empty array — the bridge service will push + // updates as they arrive from the Gateway. + _logger.LogDebug("Fleet snapshot requested by {ConnectionId}", Context.ConnectionId); + return Task.FromResult(Array.Empty()); + } + + /// + /// Overrides to perform cleanup. + /// SignalR automatically removes disconnected connections from all groups. + /// + /// Exception that caused the disconnection, if any. + public override Task OnDisconnectedAsync(Exception? exception) + { + _logger.LogDebug("Connection {ConnectionId} disconnected", Context.ConnectionId); + return base.OnDisconnectedAsync(exception); + } + + /// + /// The SignalR group name for the entire fleet (all agents). + /// + internal const string FleetGroupName = "fleet"; + + /// + /// Returns the SignalR group name for a specific agent. + /// Format: agent:{agentId} (lowercase for consistency). + /// + /// The agent identifier. + internal static string AgentGroupName(string agentId) => + $"agent:{agentId.ToLowerInvariant()}"; +} + +/// +/// Extension methods for pushing real-time agent updates through +/// the of . +/// +/// These methods are intended to be called from background services +/// (e.g., ) or other +/// server-side code that detects an agent state change. +/// +public static class AgentStatusHubExtensions +{ + /// + /// Pushes an agent status change to all clients subscribed to + /// the fleet group and the specific agent's group. + /// + /// Call this from any background service when an agent's + /// operational status changes (e.g., the Gateway reports a + /// session transition from "running" to "done"). + /// + /// The hub context injected via DI. + /// The agent status update payload. + /// A Task that completes when the message has been sent to all group members. + public static async Task PushAgentStatusAsync( + this IHubContext hubContext, + AgentStatusUpdate update) + { + // Broadcast to the fleet group (all subscribers) + await hubContext.Clients.Group(AgentStatusHub.FleetGroupName) + .AgentStatusChanged(update); + + // Also push to the specific agent's group + var agentGroup = AgentStatusHub.AgentGroupName(update.AgentId); + await hubContext.Clients.Group(agentGroup) + .AgentStatusChanged(update); + } + + /// + /// Pushes a task progress update to all clients subscribed to + /// the fleet group and the specific agent's group. + /// + /// The hub context injected via DI. + /// The task progress update payload. + /// A Task that completes when the message has been sent to all group members. + public static async Task PushTaskProgressAsync( + this IHubContext hubContext, + TaskProgressUpdate progress) + { + // Broadcast to the fleet group + await hubContext.Clients.Group(AgentStatusHub.FleetGroupName) + .AgentTaskProgress(progress); + + // Also push to the specific agent's group + var agentGroup = AgentStatusHub.AgentGroupName(progress.AgentId); + await hubContext.Clients.Group(agentGroup) + .AgentTaskProgress(progress); + } +} \ No newline at end of file diff --git a/backend/ControlCenter/Hubs/IAgentStatusClient.cs b/backend/ControlCenter/Hubs/IAgentStatusClient.cs new file mode 100644 index 0000000..4f3e1ea --- /dev/null +++ b/backend/ControlCenter/Hubs/IAgentStatusClient.cs @@ -0,0 +1,30 @@ +namespace ControlCenter.Hubs; + +/// +/// Strongly-typed client interface for the AgentStatus SignalR hub. +/// Defines the methods the server can invoke on connected clients +/// to push real-time agent status and task progress updates. +/// +/// All server-to-client calls go through this interface for +/// compile-time safety — matching the pattern used by the +/// Extrudex PrinterHub. +/// +public interface IAgentStatusClient +{ + /// + /// Pushes an agent status change to all subscribed clients. + /// Fired whenever an agent's operational status changes + /// (e.g., idle → active, active → thinking, active → error). + /// + /// The full status update payload. + /// A Task that completes when the client has processed the update. + Task AgentStatusChanged(AgentStatusUpdate update); + + /// + /// Pushes a task progress update to all subscribed clients. + /// Fired when an agent reports progress on its current task. + /// + /// The task progress update payload. + /// A Task that completes when the client has processed the update. + Task AgentTaskProgress(TaskProgressUpdate progress); +} \ No newline at end of file diff --git a/backend/ControlCenter/Hubs/Models/AgentStatusModels.cs b/backend/ControlCenter/Hubs/Models/AgentStatusModels.cs new file mode 100644 index 0000000..3c9c97d --- /dev/null +++ b/backend/ControlCenter/Hubs/Models/AgentStatusModels.cs @@ -0,0 +1,92 @@ +namespace ControlCenter; + +/// +/// Agent operational status derived from OpenClaw Gateway session activity. +/// Maps to the frontend AgentStatus type: 'active' | 'idle' | 'thinking' | 'error'. +/// +public enum AgentStatus +{ + /// Agent is currently processing a turn. + Active, + + /// Agent completed its last turn; no active work. + Idle, + + /// LLM call in flight; tokens streaming. + Thinking, + + /// Agent encountered an unhandled error. + Error +} + +/// +/// Extended lifecycle status including offline — not all agents have active sessions. +/// Used internally; clients only see (offline maps to idle). +/// +public enum AgentLifecycleStatus +{ + Active, + Idle, + Thinking, + Error, + Offline +} + +/// +/// Pushed to SignalR clients when an agent's status changes. +/// Matches the TypeScript AgentStatusUpdate interface from the design spec. +/// +/// Agent identifier, e.g. "otto", "dex". +/// Human-readable name, e.g. "Otto". +/// Role description, e.g. "Orchestrator Agent". +/// Current operational status. +/// Description of the current task, if any. +/// Full session key, e.g. "agent:otto:telegram:direct:8787451565". +/// Channel the agent is operating on, e.g. "telegram". +/// ISO 8601 timestamp of last activity. +/// Error message when status is 'error'. +public record AgentStatusUpdate( + string AgentId, + string DisplayName, + string Role, + string Status, + string? CurrentTask, + string SessionKey, + string Channel, + string LastActivity, + string? ErrorMessage = null +); + +/// +/// Pushed to SignalR clients when an agent's task progress updates. +/// Matches the TypeScript TaskProgressUpdate interface from the design spec. +/// +/// Agent identifier. +/// Description of the current task. +/// Task progress percentage (0–100), if trackable. +/// Elapsed time string, e.g. "04m 12s". +public record TaskProgressUpdate( + string AgentId, + string TaskDescription, + int? Progress, + string? Elapsed +); + +/// +/// Snapshot of an agent's full card data, sent on initial connection +/// or when the fleet state is requested. +/// Matches the TypeScript AgentCardData interface from the design spec. +/// +public record AgentCardData( + string Id, + string DisplayName, + string Role, + string Status, + string? CurrentTask, + int? TaskProgress, + string? TaskElapsed, + string SessionKey, + string Channel, + string LastActivity, + string? ErrorMessage +); \ No newline at end of file diff --git a/backend/ControlCenter/Program.cs b/backend/ControlCenter/Program.cs new file mode 100644 index 0000000..757b20a --- /dev/null +++ b/backend/ControlCenter/Program.cs @@ -0,0 +1,72 @@ +using System.Reflection; +using ControlCenter.Hubs; +using ControlCenter.Services; + +var builder = WebApplication.CreateBuilder(args); + +// ── API Services ─────────────────────────────────────────── +builder.Services.AddControllers(); +builder.Services.AddEndpointsApiExplorer(); +builder.Services.AddSwaggerGen(c => +{ + c.SwaggerDoc("v1", new() + { + Title = "Control Center API", + Version = "v1", + Description = "OpenClaw Control Center — Command Hub backend with SignalR real-time updates" + }); + + // Include XML doc comments in Swagger output + var xmlFile = $"{Assembly.GetExecutingAssembly().GetName().Name}.xml"; + var xmlPath = Path.Combine(AppContext.BaseDirectory, xmlFile); + if (File.Exists(xmlPath)) + { + c.IncludeXmlComments(xmlPath); + } +}); + +// ── CORS (kiosk + remote browser) ───────────────────────── +// The Control Center frontend runs on a different origin than the backend. +// SignalR requires credentials for WebSocket transport, so we use +// specific origins rather than AllowAnyOrigin. +var corsOrigins = builder.Configuration.GetSection("Cors:AllowedOrigins") + .Get() ?? new[] { "http://localhost:4200", "http://localhost:5000" }; + +builder.Services.AddCors(options => +{ + options.AddDefaultPolicy(policy => + { + policy.WithOrigins(corsOrigins) + .AllowAnyMethod() + .AllowAnyHeader() + .AllowCredentials(); // Required for SignalR WebSocket + }); +}); + +// ── SignalR (real-time agent status updates) ─────────────── +builder.Services.AddSignalR(); + +// ── Gateway Bridge Service ──────────────────────────────── +// Background service that connects to the OpenClaw Gateway WebSocket +// and bridges events to the AgentStatus SignalR hub. +builder.Services.AddSingleton(); +builder.Services.AddHostedService(sp => sp.GetRequiredService()); + +var app = builder.Build(); + +// ── Middleware ────────────────────────────────────────────── +if (app.Environment.IsDevelopment()) +{ + app.UseSwagger(); + app.UseSwaggerUI(); +} + +app.UseCors(); +app.UseAuthorization(); +app.MapControllers(); + +// ── Hub Endpoints ─────────────────────────────────────────── +// Agent status hub at /hubs/agent-status (matches the design spec) +app.MapHub("/hubs/agent-status"); + +app.Run(); \ No newline at end of file diff --git a/backend/ControlCenter/Properties/launchSettings.json b/backend/ControlCenter/Properties/launchSettings.json new file mode 100644 index 0000000..5596756 --- /dev/null +++ b/backend/ControlCenter/Properties/launchSettings.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "http://localhost:5053", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/backend/ControlCenter/Services/GatewayEventBridgeService.cs b/backend/ControlCenter/Services/GatewayEventBridgeService.cs new file mode 100644 index 0000000..0f37285 --- /dev/null +++ b/backend/ControlCenter/Services/GatewayEventBridgeService.cs @@ -0,0 +1,523 @@ +using System.Collections.Concurrent; +using System.Net.WebSockets; +using System.Text; +using System.Text.Json; +using ControlCenter.Hubs; +using Microsoft.AspNetCore.SignalR; + +namespace ControlCenter.Services; + +/// +/// Background service that connects to the OpenClaw Gateway WebSocket +/// and bridges Gateway events to the . +/// +/// Architecture: +/// +/// Connects to the Gateway WS endpoint (configurable via appsettings) +/// Handles the v3 protocol handshake (challenge → connect → hello-ok) +/// Subscribes to sessions.changed and related events +/// Translates session state changes into +/// and objects +/// Pushes updates through the SignalR hub +/// +/// +/// This is the server-side bridge that allows Angular clients to +/// receive real-time updates via SignalR instead of connecting directly +/// to the Gateway WebSocket. +/// +public class GatewayEventBridgeService : BackgroundService +{ + private readonly ILogger _logger; + private readonly IHubContext _hubContext; + private readonly IConfiguration _configuration; + + /// + /// In-memory fleet state — maps agent IDs to their latest card data. + /// Updated on every sessions.changed event from the Gateway. + /// + private readonly ConcurrentDictionary _fleetState = new(); + + /// + /// Known agent roles for display in the Command Hub. + /// Maps agent IDs to their functional descriptions. + /// + private static readonly Dictionary AgentRoles = new() + { + ["main"] = "Primary Assistant", + ["otto"] = "Orchestrator Agent", + ["dave"] = "Network Admin Agent", + ["bob"] = "Content Writer Agent", + ["stuart"] = "Image & Creative Agent", + ["phil"] = "Home Automation Agent", + ["carl"] = "Security Agent", + ["larry"] = "Business Agent", + ["mel"] = "E-Commerce Agent", + ["norbert"] = "Product Agent", + ["jerry"] = "Market Research Agent", + ["rex"] = "Frontend Dev Agent", + ["dex"] = "Backend Dev Agent", + ["hex"] = "Database Agent", + ["pip"] = "Raspberry Pi Agent", + ["nano"] = "ESP32/Firmware Agent", + ["axiom"] = "Utility Agent", + ["bonnie"] = "Music Agent", + ["sketch"] = "UI/UX Design Agent", + ["flip"] = "Mobile Dev Agent", + ["buzz"] = "SEO Agent", + ["aries"] = "Companion Agent" + }; + + /// + /// Maps OpenClaw session status to . + /// + private static string MapSessionStatus(string? sessionStatus) => sessionStatus switch + { + "running" => "active", + "streaming" => "thinking", + "error" or "aborted" => "error", + "done" => "idle", + _ => "idle" + }; + + public GatewayEventBridgeService( + ILogger logger, + IHubContext hubContext, + IConfiguration configuration) + { + _logger = logger; + _hubContext = hubContext; + _configuration = configuration; + } + + /// + /// Returns the current fleet state snapshot. + /// Used by the hub's GetFleetSnapshot method and by the + /// AgentsController REST endpoint. + /// + public AgentCardData[] GetFleetSnapshot() => + _fleetState.Values.ToArray(); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Gateway Event Bridge service starting"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await ConnectAndListenAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + _logger.LogInformation("Gateway Event Bridge service stopping"); + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Gateway connection lost, reconnecting in 5 seconds..."); + await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); + } + } + } + + /// + /// Connects to the OpenClaw Gateway WebSocket and processes events + /// until the connection is lost or cancellation is requested. + /// + private async Task ConnectAndListenAsync(CancellationToken stoppingToken) + { + var gatewayUrl = _configuration["Gateway:WebSocketUrl"] + ?? "ws://localhost:3271/ws"; + var authToken = _configuration["Gateway:AuthToken"] ?? string.Empty; + + _logger.LogInformation("Connecting to Gateway at {Url}", gatewayUrl); + + using var ws = new ClientWebSocket(); + + // Set auth header if available + if (!string.IsNullOrEmpty(authToken)) + { + ws.Options.SetRequestHeader("Authorization", $"Bearer {authToken}"); + } + + await ws.ConnectAsync(new Uri(gatewayUrl), stoppingToken); + _logger.LogInformation("Connected to Gateway WebSocket"); + + // Start receiving messages + await ReceiveMessagesAsync(ws, stoppingToken); + } + + /// + /// Receives and processes WebSocket messages from the Gateway. + /// Handles the v3 protocol handshake and dispatches events. + /// + private async Task ReceiveMessagesAsync(ClientWebSocket ws, CancellationToken stoppingToken) + { + var buffer = new byte[8192]; + var messageBuilder = new StringBuilder(); + + while (ws.State == WebSocketState.Open && !stoppingToken.IsCancellationRequested) + { + WebSocketReceiveResult result; + try + { + result = await ws.ReceiveAsync(buffer, stoppingToken); + } + catch (WebSocketException ex) + { + _logger.LogWarning(ex, "WebSocket receive error"); + break; + } + + if (result.MessageType == WebSocketMessageType.Close) + { + _logger.LogInformation("Gateway WebSocket closed by server"); + break; + } + + messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count)); + + if (result.EndOfMessage) + { + var message = messageBuilder.ToString(); + messageBuilder.Clear(); + await ProcessMessageAsync(ws, message, stoppingToken); + } + } + } + + /// + /// Processes a single WebSocket message from the Gateway. + /// Routes based on the message type: event, response, or challenge. + /// + private async Task ProcessMessageAsync( + ClientWebSocket ws, string message, CancellationToken stoppingToken) + { + try + { + using var doc = JsonDocument.Parse(message); + var root = doc.RootElement; + + var type = root.GetProperty("type").GetString(); + + switch (type) + { + case "event": + await HandleGatewayEventAsync(root); + break; + + case "res": + HandleGatewayResponse(root); + break; + } + + // Special handling for connect.challenge events + if (root.TryGetProperty("event", out var eventName) && + eventName.GetString() == "connect.challenge") + { + await HandleConnectChallengeAsync(ws, root, stoppingToken); + } + } + catch (JsonException ex) + { + _logger.LogWarning(ex, "Failed to parse Gateway message: {Message}", + message.Length > 200 ? message[..200] + "..." : message); + } + } + + /// + /// Handles the Gateway connect.challenge event by sending + /// a connect request with authentication credentials. + /// + private async Task HandleConnectChallengeAsync( + ClientWebSocket ws, JsonElement root, CancellationToken stoppingToken) + { + _logger.LogInformation("Received connect.challenge from Gateway"); + + var connectRequest = new + { + type = "req", + id = $"bridge-{Guid.NewGuid():N}", + method = "connect", + @params = new + { + minProtocol = 3, + maxProtocol = 3, + client = new + { + id = "control-center-backend", + version = "1.0.0", + platform = "server", + mode = "operator" + }, + role = "operator", + scopes = new[] { "operator.read", "operator.write" }, + auth = new + { + token = _configuration["Gateway:AuthToken"] ?? string.Empty + }, + locale = "en-US", + userAgent = "control-center-backend/1.0.0" + } + }; + + var json = JsonSerializer.Serialize(connectRequest); + var bytes = Encoding.UTF8.GetBytes(json); + await ws.SendAsync(bytes, WebSocketMessageType.Text, true, stoppingToken); + } + + /// + /// Handles a Gateway event message by dispatching to the + /// appropriate handler based on event name. + /// + private async Task HandleGatewayEventAsync(JsonElement root) + { + if (!root.TryGetProperty("event", out var eventProp)) return; + var eventName = eventProp.GetString(); + + _logger.LogDebug("Gateway event: {Event}", eventName); + + switch (eventName) + { + case "sessions.changed": + await HandleSessionsChangedAsync(root); + break; + + case "session.message": + HandleSessionMessage(root); + break; + + case "session.tool": + HandleSessionTool(root); + break; + + case "health": + HandleHealthEvent(root); + break; + } + } + + /// + /// Handles a sessions.changed event from the Gateway. + /// Updates the fleet state and pushes status changes through SignalR. + /// + private async Task HandleSessionsChangedAsync(JsonElement root) + { + if (!root.TryGetProperty("payload", out var payload)) return; + + // The payload may contain a snapshot of all sessions + if (payload.TryGetProperty("snapshot", out var snapshot) && + snapshot.ValueKind == JsonValueKind.Array) + { + foreach (var session in snapshot.EnumerateArray()) + { + var cardData = SessionToCardData(session); + if (cardData is null) continue; + + _fleetState[cardData.Id] = cardData; + + var update = new AgentStatusUpdate( + AgentId: cardData.Id, + DisplayName: cardData.DisplayName, + Role: cardData.Role, + Status: cardData.Status, + CurrentTask: cardData.CurrentTask, + SessionKey: cardData.SessionKey, + Channel: cardData.Channel, + LastActivity: cardData.LastActivity, + ErrorMessage: cardData.ErrorMessage + ); + + await _hubContext.PushAgentStatusAsync(update); + } + } + + // Handle individual updates/added/removed + if (payload.TryGetProperty("updated", out var updated) && + updated.ValueKind == JsonValueKind.Array) + { + foreach (var sessionKey in updated.EnumerateArray()) + { + _logger.LogDebug("Session updated: {Key}", sessionKey.GetString()); + } + } + } + + /// + /// Handles a session.message event. Updates the agent's last activity + /// and pushes a status update if the status changed. + /// + private void HandleSessionMessage(JsonElement root) + { + if (!root.TryGetProperty("payload", out var payload)) return; + if (!payload.TryGetProperty("sessionKey", out var sessionKeyProp)) return; + + var sessionKey = sessionKeyProp.GetString() ?? string.Empty; + var agentId = ExtractAgentId(sessionKey); + + if (string.IsNullOrEmpty(agentId)) return; + + // Update last activity timestamp + if (_fleetState.TryGetValue(agentId, out var existing)) + { + _fleetState[agentId] = existing with + { + LastActivity = DateTime.UtcNow.ToString("o"), + Status = "active" + }; + } + } + + /// + /// Handles a session.tool event. Extracts tool progress information + /// and pushes a through SignalR. + /// + private void HandleSessionTool(JsonElement root) + { + if (!root.TryGetProperty("payload", out var payload)) return; + if (!payload.TryGetProperty("sessionKey", out var sessionKeyProp)) return; + + var sessionKey = sessionKeyProp.GetString() ?? string.Empty; + var agentId = ExtractAgentId(sessionKey); + + if (string.IsNullOrEmpty(agentId)) return; + + var toolName = payload.TryGetProperty("toolName", out var tn) ? tn.GetString() : null; + var toolStatus = payload.TryGetProperty("status", out var ts) ? ts.GetString() : null; + + if (toolName is null || toolStatus is null) return; + + var progress = toolStatus switch + { + "started" => 0, + "completed" => 100, + _ => (int?)null + }; + + var update = new TaskProgressUpdate( + AgentId: agentId, + TaskDescription: $"{toolName} ({toolStatus})", + Progress: progress, + Elapsed: null + ); + + // Fire and forget — don't block the event loop + _ = _hubContext.PushTaskProgressAsync(update); + } + + /// + /// Handles a health event from the Gateway. + /// Logs the health status for diagnostics. + /// + private void HandleHealthEvent(JsonElement root) + { + if (!root.TryGetProperty("payload", out var payload)) return; + + var ok = payload.TryGetProperty("ok", out var okProp) && okProp.GetBoolean(); + var status = payload.TryGetProperty("status", out var s) ? s.GetString() : "unknown"; + + _logger.LogInformation("Gateway health: ok={Ok}, status={Status}", ok, status); + } + + /// + /// Handles a Gateway response message. Currently only logs for diagnostics. + /// + private void HandleGatewayResponse(JsonElement root) + { + if (root.TryGetProperty("ok", out var okProp) && okProp.GetBoolean()) + { + _logger.LogDebug("Gateway RPC response OK"); + + // Check for hello-ok after connect + if (root.TryGetProperty("payload", out var payload) && + payload.TryGetProperty("type", out var typeProp) && + typeProp.GetString() == "hello-ok") + { + _logger.LogInformation("Gateway handshake complete (hello-ok received)"); + } + } + else if (root.TryGetProperty("error", out var error)) + { + var errorMsg = error.TryGetProperty("message", out var msg) + ? msg.GetString() : "unknown error"; + _logger.LogWarning("Gateway RPC error: {Error}", errorMsg); + } + } + + /// + /// Converts a raw Gateway session JSON element into an + /// record. + /// + private AgentCardData? SessionToCardData(JsonElement session) + { + // Extract agent ID from session key or agentId field + string? agentId = null; + if (session.TryGetProperty("agentId", out var aid)) + agentId = aid.GetString(); + else if (session.TryGetProperty("key", out var key)) + agentId = ExtractAgentId(key.GetString() ?? string.Empty); + + if (string.IsNullOrEmpty(agentId)) return null; + + var sessionKey = session.TryGetProperty("key", out var sk) + ? sk.GetString() ?? string.Empty : string.Empty; + + var status = session.TryGetProperty("status", out var s) + ? MapSessionStatus(s.GetString()) : "idle"; + + var channel = ExtractChannel(session); + + var lastActivity = session.TryGetProperty("updatedAt", out var ua) + ? DateTimeOffset.FromUnixTimeMilliseconds(ua.GetInt64()).ToString("o") + : DateTime.UtcNow.ToString("o"); + + var displayName = char.ToUpperInvariant(agentId![0]) + agentId[1..]; + var role = AgentRoles.GetValueOrDefault(agentId!, "Agent"); + + return new AgentCardData( + Id: agentId!, + DisplayName: displayName, + Role: role, + Status: status, + CurrentTask: null, + TaskProgress: null, + TaskElapsed: null, + SessionKey: sessionKey, + Channel: channel, + LastActivity: lastActivity, + ErrorMessage: status == "error" ? "Agent encountered an error" : null + ); + } + + /// + /// Extracts the agent ID from a session key. + /// Session key format: "agent:{agentId}:{channel}:..." + /// + private static string? ExtractAgentId(string sessionKey) + { + if (string.IsNullOrEmpty(sessionKey)) return null; + + var parts = sessionKey.Split(':'); + if (parts.Length >= 2 && parts[0] == "agent") + return parts[1]; + + return null; + } + + /// + /// Extracts the channel from a session element. + /// + private static string ExtractChannel(JsonElement session) + { + // Try direct "channel" property + if (session.TryGetProperty("channel", out var ch)) + return ch.GetString() ?? "unknown"; + + // Try origin.provider + if (session.TryGetProperty("origin", out var origin) && + origin.TryGetProperty("provider", out var provider)) + return provider.GetString() ?? "unknown"; + + return "unknown"; + } +} \ No newline at end of file diff --git a/backend/ControlCenter/appsettings.Development.json b/backend/ControlCenter/appsettings.Development.json new file mode 100644 index 0000000..626b82e --- /dev/null +++ b/backend/ControlCenter/appsettings.Development.json @@ -0,0 +1,19 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Debug", + "Microsoft.AspNetCore": "Information", + "ControlCenter": "Debug" + } + }, + "Gateway": { + "WebSocketUrl": "ws://localhost:3271/ws", + "AuthToken": "" + }, + "Cors": { + "AllowedOrigins": [ + "http://localhost:4200", + "http://localhost:5000" + ] + } +} \ No newline at end of file diff --git a/backend/ControlCenter/appsettings.json b/backend/ControlCenter/appsettings.json new file mode 100644 index 0000000..60f801d --- /dev/null +++ b/backend/ControlCenter/appsettings.json @@ -0,0 +1,22 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning", + "ControlCenter": "Debug" + } + }, + "AllowedHosts": "*", + + "Gateway": { + "WebSocketUrl": "ws://localhost:3271/ws", + "AuthToken": "" + }, + + "Cors": { + "AllowedOrigins": [ + "http://localhost:4200", + "http://localhost:5000" + ] + } +} \ No newline at end of file