using System.Collections.Concurrent; using System.Net.WebSockets; using System.Text; using System.Text.Json; using ControlCenter.Hubs; using Microsoft.AspNetCore.SignalR; namespace ControlCenter.Services; /// /// Background service that connects to the OpenClaw Gateway WebSocket /// and bridges Gateway events to the . /// /// Architecture: /// /// Connects to the Gateway WS endpoint (configurable via appsettings) /// Handles the v3 protocol handshake (challenge → connect → hello-ok) /// Subscribes to sessions.changed and related events /// Translates session state changes into /// and objects /// Pushes updates through the SignalR hub /// /// /// This is the server-side bridge that allows Angular clients to /// receive real-time updates via SignalR instead of connecting directly /// to the Gateway WebSocket. /// public class GatewayEventBridgeService : BackgroundService { private readonly ILogger _logger; private readonly IHubContext _hubContext; private readonly IConfiguration _configuration; /// /// In-memory fleet state — maps agent IDs to their latest card data. /// Updated on every sessions.changed event from the Gateway. /// private readonly ConcurrentDictionary _fleetState = new(); /// /// Known agent roles for display in the Command Hub. /// Maps agent IDs to their functional descriptions. /// private static readonly Dictionary AgentRoles = new() { ["main"] = "Primary Assistant", ["otto"] = "Orchestrator Agent", ["dave"] = "Network Admin Agent", ["bob"] = "Content Writer Agent", ["stuart"] = "Image & Creative Agent", ["phil"] = "Home Automation Agent", ["carl"] = "Security Agent", ["larry"] = "Business Agent", ["mel"] = "E-Commerce Agent", ["norbert"] = "Product Agent", ["jerry"] = "Market Research Agent", ["rex"] = "Frontend Dev Agent", ["dex"] = "Backend Dev Agent", ["hex"] = "Database Agent", ["pip"] = "Raspberry Pi Agent", ["nano"] = "ESP32/Firmware Agent", ["axiom"] = "Utility Agent", ["bonnie"] = "Music Agent", ["sketch"] = "UI/UX Design Agent", ["flip"] = "Mobile Dev Agent", ["buzz"] = "SEO Agent", ["aries"] = "Companion Agent" }; /// /// Maps OpenClaw session status to . /// private static string MapSessionStatus(string? sessionStatus) => sessionStatus switch { "running" => "active", "streaming" => "thinking", "error" or "aborted" => "error", "done" => "idle", _ => "idle" }; public GatewayEventBridgeService( ILogger logger, IHubContext hubContext, IConfiguration configuration) { _logger = logger; _hubContext = hubContext; _configuration = configuration; } /// /// Returns the current fleet state snapshot. /// Used by the hub's GetFleetSnapshot method and by the /// AgentsController REST endpoint. /// public AgentCardData[] GetFleetSnapshot() => _fleetState.Values.ToArray(); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Gateway Event Bridge service starting"); while (!stoppingToken.IsCancellationRequested) { try { await ConnectAndListenAsync(stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { _logger.LogInformation("Gateway Event Bridge service stopping"); break; } catch (Exception ex) { _logger.LogError(ex, "Gateway connection lost, reconnecting in 5 seconds..."); await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } } } /// /// Connects to the OpenClaw Gateway WebSocket and processes events /// until the connection is lost or cancellation is requested. /// private async Task ConnectAndListenAsync(CancellationToken stoppingToken) { var gatewayUrl = _configuration["Gateway:WebSocketUrl"] ?? "ws://localhost:3271/ws"; var authToken = _configuration["Gateway:AuthToken"] ?? string.Empty; _logger.LogInformation("Connecting to Gateway at {Url}", gatewayUrl); using var ws = new ClientWebSocket(); // Set auth header if available if (!string.IsNullOrEmpty(authToken)) { ws.Options.SetRequestHeader("Authorization", $"Bearer {authToken}"); } await ws.ConnectAsync(new Uri(gatewayUrl), stoppingToken); _logger.LogInformation("Connected to Gateway WebSocket"); // Start receiving messages await ReceiveMessagesAsync(ws, stoppingToken); } /// /// Receives and processes WebSocket messages from the Gateway. /// Handles the v3 protocol handshake and dispatches events. /// private async Task ReceiveMessagesAsync(ClientWebSocket ws, CancellationToken stoppingToken) { var buffer = new byte[8192]; var messageBuilder = new StringBuilder(); while (ws.State == WebSocketState.Open && !stoppingToken.IsCancellationRequested) { WebSocketReceiveResult result; try { result = await ws.ReceiveAsync(buffer, stoppingToken); } catch (WebSocketException ex) { _logger.LogWarning(ex, "WebSocket receive error"); break; } if (result.MessageType == WebSocketMessageType.Close) { _logger.LogInformation("Gateway WebSocket closed by server"); break; } messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count)); if (result.EndOfMessage) { var message = messageBuilder.ToString(); messageBuilder.Clear(); await ProcessMessageAsync(ws, message, stoppingToken); } } } /// /// Processes a single WebSocket message from the Gateway. /// Routes based on the message type: event, response, or challenge. /// private async Task ProcessMessageAsync( ClientWebSocket ws, string message, CancellationToken stoppingToken) { try { using var doc = JsonDocument.Parse(message); var root = doc.RootElement; var type = root.GetProperty("type").GetString(); switch (type) { case "event": await HandleGatewayEventAsync(root); break; case "res": HandleGatewayResponse(root); break; } // Special handling for connect.challenge events if (root.TryGetProperty("event", out var eventName) && eventName.GetString() == "connect.challenge") { await HandleConnectChallengeAsync(ws, root, stoppingToken); } } catch (JsonException ex) { _logger.LogWarning(ex, "Failed to parse Gateway message: {Message}", message.Length > 200 ? message[..200] + "..." : message); } } /// /// Handles the Gateway connect.challenge event by sending /// a connect request with authentication credentials. /// private async Task HandleConnectChallengeAsync( ClientWebSocket ws, JsonElement root, CancellationToken stoppingToken) { _logger.LogInformation("Received connect.challenge from Gateway"); var connectRequest = new { type = "req", id = $"bridge-{Guid.NewGuid():N}", method = "connect", @params = new { minProtocol = 3, maxProtocol = 3, client = new { id = "control-center-backend", version = "1.0.0", platform = "server", mode = "operator" }, role = "operator", scopes = new[] { "operator.read", "operator.write" }, auth = new { token = _configuration["Gateway:AuthToken"] ?? string.Empty }, locale = "en-US", userAgent = "control-center-backend/1.0.0" } }; var json = JsonSerializer.Serialize(connectRequest); var bytes = Encoding.UTF8.GetBytes(json); await ws.SendAsync(bytes, WebSocketMessageType.Text, true, stoppingToken); } /// /// Handles a Gateway event message by dispatching to the /// appropriate handler based on event name. /// private async Task HandleGatewayEventAsync(JsonElement root) { if (!root.TryGetProperty("event", out var eventProp)) return; var eventName = eventProp.GetString(); _logger.LogDebug("Gateway event: {Event}", eventName); switch (eventName) { case "sessions.changed": await HandleSessionsChangedAsync(root); break; case "session.message": HandleSessionMessage(root); break; case "session.tool": HandleSessionTool(root); break; case "health": HandleHealthEvent(root); break; } } /// /// Handles a sessions.changed event from the Gateway. /// Updates the fleet state and pushes status changes through SignalR. /// private async Task HandleSessionsChangedAsync(JsonElement root) { if (!root.TryGetProperty("payload", out var payload)) return; // The payload may contain a snapshot of all sessions if (payload.TryGetProperty("snapshot", out var snapshot) && snapshot.ValueKind == JsonValueKind.Array) { foreach (var session in snapshot.EnumerateArray()) { var cardData = SessionToCardData(session); if (cardData is null) continue; _fleetState[cardData.Id] = cardData; var update = new AgentStatusUpdate( AgentId: cardData.Id, DisplayName: cardData.DisplayName, Role: cardData.Role, Status: cardData.Status, CurrentTask: cardData.CurrentTask, SessionKey: cardData.SessionKey, Channel: cardData.Channel, LastActivity: cardData.LastActivity, ErrorMessage: cardData.ErrorMessage ); await _hubContext.PushAgentStatusAsync(update); } } // Handle individual updates/added/removed if (payload.TryGetProperty("updated", out var updated) && updated.ValueKind == JsonValueKind.Array) { foreach (var sessionKey in updated.EnumerateArray()) { _logger.LogDebug("Session updated: {Key}", sessionKey.GetString()); } } } /// /// Handles a session.message event. Updates the agent's last activity /// and pushes a status update if the status changed. /// private void HandleSessionMessage(JsonElement root) { if (!root.TryGetProperty("payload", out var payload)) return; if (!payload.TryGetProperty("sessionKey", out var sessionKeyProp)) return; var sessionKey = sessionKeyProp.GetString() ?? string.Empty; var agentId = ExtractAgentId(sessionKey); if (string.IsNullOrEmpty(agentId)) return; // Update last activity timestamp if (_fleetState.TryGetValue(agentId, out var existing)) { _fleetState[agentId] = existing with { LastActivity = DateTime.UtcNow.ToString("o"), Status = "active" }; } } /// /// Handles a session.tool event. Extracts tool progress information /// and pushes a through SignalR. /// private void HandleSessionTool(JsonElement root) { if (!root.TryGetProperty("payload", out var payload)) return; if (!payload.TryGetProperty("sessionKey", out var sessionKeyProp)) return; var sessionKey = sessionKeyProp.GetString() ?? string.Empty; var agentId = ExtractAgentId(sessionKey); if (string.IsNullOrEmpty(agentId)) return; var toolName = payload.TryGetProperty("toolName", out var tn) ? tn.GetString() : null; var toolStatus = payload.TryGetProperty("status", out var ts) ? ts.GetString() : null; if (toolName is null || toolStatus is null) return; var progress = toolStatus switch { "started" => 0, "completed" => 100, _ => (int?)null }; var update = new TaskProgressUpdate( AgentId: agentId, TaskDescription: $"{toolName} ({toolStatus})", Progress: progress, Elapsed: null ); // Fire and forget — don't block the event loop _ = _hubContext.PushTaskProgressAsync(update); } /// /// Handles a health event from the Gateway. /// Logs the health status for diagnostics. /// private void HandleHealthEvent(JsonElement root) { if (!root.TryGetProperty("payload", out var payload)) return; var ok = payload.TryGetProperty("ok", out var okProp) && okProp.GetBoolean(); var status = payload.TryGetProperty("status", out var s) ? s.GetString() : "unknown"; _logger.LogInformation("Gateway health: ok={Ok}, status={Status}", ok, status); } /// /// Handles a Gateway response message. Currently only logs for diagnostics. /// private void HandleGatewayResponse(JsonElement root) { if (root.TryGetProperty("ok", out var okProp) && okProp.GetBoolean()) { _logger.LogDebug("Gateway RPC response OK"); // Check for hello-ok after connect if (root.TryGetProperty("payload", out var payload) && payload.TryGetProperty("type", out var typeProp) && typeProp.GetString() == "hello-ok") { _logger.LogInformation("Gateway handshake complete (hello-ok received)"); } } else if (root.TryGetProperty("error", out var error)) { var errorMsg = error.TryGetProperty("message", out var msg) ? msg.GetString() : "unknown error"; _logger.LogWarning("Gateway RPC error: {Error}", errorMsg); } } /// /// Converts a raw Gateway session JSON element into an /// record. /// private AgentCardData? SessionToCardData(JsonElement session) { // Extract agent ID from session key or agentId field string? agentId = null; if (session.TryGetProperty("agentId", out var aid)) agentId = aid.GetString(); else if (session.TryGetProperty("key", out var key)) agentId = ExtractAgentId(key.GetString() ?? string.Empty); if (string.IsNullOrEmpty(agentId)) return null; var sessionKey = session.TryGetProperty("key", out var sk) ? sk.GetString() ?? string.Empty : string.Empty; var status = session.TryGetProperty("status", out var s) ? MapSessionStatus(s.GetString()) : "idle"; var channel = ExtractChannel(session); var lastActivity = session.TryGetProperty("updatedAt", out var ua) ? DateTimeOffset.FromUnixTimeMilliseconds(ua.GetInt64()).ToString("o") : DateTime.UtcNow.ToString("o"); var displayName = char.ToUpperInvariant(agentId![0]) + agentId[1..]; var role = AgentRoles.GetValueOrDefault(agentId!, "Agent"); return new AgentCardData( Id: agentId!, DisplayName: displayName, Role: role, Status: status, CurrentTask: null, TaskProgress: null, TaskElapsed: null, SessionKey: sessionKey, Channel: channel, LastActivity: lastActivity, ErrorMessage: status == "error" ? "Agent encountered an error" : null ); } /// /// Extracts the agent ID from a session key. /// Session key format: "agent:{agentId}:{channel}:..." /// private static string? ExtractAgentId(string sessionKey) { if (string.IsNullOrEmpty(sessionKey)) return null; var parts = sessionKey.Split(':'); if (parts.Length >= 2 && parts[0] == "agent") return parts[1]; return null; } /// /// Extracts the channel from a session element. /// private static string ExtractChannel(JsonElement session) { // Try direct "channel" property if (session.TryGetProperty("channel", out var ch)) return ch.GetString() ?? "unknown"; // Try origin.provider if (session.TryGetProperty("origin", out var origin) && origin.TryGetProperty("provider", out var provider)) return provider.GetString() ?? "unknown"; return "unknown"; } }