""" Gradio Interface for MCP Server Orchestration Platform Production-grade UI with dynamic form generation and real-time updates """ import asyncio import json import time from typing import Dict, List, Any, Optional, Tuple import traceback import gradio as gr import pandas as pd import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots import structlog from mcp_orchestrator import ( MCPOrchestrator, ServerConfig, ToolSchema, ToolInvocation, SessionInfo, ConfigManager, CircuitBreakerState ) class MCPUI: """Production-grade Gradio interface for MCP orchestration.""" def __init__(self, orchestrator: MCPOrchestrator): self.orchestrator = orchestrator self.logger = structlog.get_logger() # UI state self.current_session = None self.available_tools = {} self.server_status = {} self.tool_results = {} # Performance tracking self.request_times = [] self.success_rates = {} def create_interface(self) -> gr.Interface: """Create the main Gradio interface.""" with gr.Blocks( title="MCP Server Orchestration Platform", theme=gr.themes.Soft(), css=self._get_custom_css(), head=self._get_head_content() ) as interface: # Header self._create_header() # Main content tabs with gr.Tabs() as main_tabs: # Tool Invocation Tab with gr.Tab("🛠️ Tool Invocation", id="tool_invocation"): self._create_tool_invocation_tab() # Server Management Tab with gr.Tab("🔧 Server Management", id="server_management"): self._create_server_management_tab() # Analytics Dashboard Tab with gr.Tab("📊 Analytics Dashboard", id="analytics"): self._create_analytics_tab() # Configuration Tab with gr.Tab("⚙️ Configuration", id="configuration"): self._create_configuration_tab() # Initialize server status and tools interface.load( fn=self._load_initial_data, inputs=[], outputs=[ self.server_status_display, self.available_servers_dropdown, self.tools_display, self.metrics_memory_plot, self.metrics_cpu_plot, self.metrics_throughput_plot, self.cache_hit_rate_plot ], show_progress="full" ) return interface def _get_custom_css(self) -> str: """Get custom CSS for enhanced UI.""" return """ .gradio-container { max-width: 1400px !important; margin: auto !important; } .mcp-header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 15px; color: white; text-align: center; margin-bottom: 20px; } .server-card { border: 2px solid #e1e5e9; border-radius: 12px; padding: 15px; margin: 10px 0; background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); transition: all 0.3s ease; } .server-card:hover { border-color: #4f46e5; transform: translateY(-2px); box-shadow: 0 8px 25px rgba(79, 70, 229, 0.15); } .status-indicator { display: inline-block; width: 12px; height: 12px; border-radius: 50%; margin-right: 8px; } .status-connected { background-color: #10b981; } .status-disconnected { background-color: #ef4444; } .status-warning { background-color: #f59e0b; } .status-maintenance { background-color: #6366f1; } .tool-form { background: #f8fafc; border: 1px solid #e2e8f0; border-radius: 12px; padding: 20px; margin: 15px 0; } .metric-card { background: white; border: 1px solid #e2e8f0; border-radius: 10px; padding: 15px; text-align: center; box-shadow: 0 2px 8px rgba(0,0,0,0.1); } .streaming-output { background: #f8fafc; border: 1px solid #e2e8f0; border-radius: 8px; padding: 15px; margin: 10px 0; } .error-message { background: #fef2f2; border: 1px solid #fecaca; color: #dc2626; border-radius: 8px; padding: 15px; margin: 10px 0; } .success-message { background: #f0fdf4; border: 1px solid #bbf7d0; color: #16a34a; border-radius: 8px; padding: 15px; margin: 10px 0; } .progress-indicator { height: 6px; background: #e2e8f0; border-radius: 3px; overflow: hidden; margin: 10px 0; } .progress-bar { height: 100%; background: linear-gradient(90deg, #3b82f6, #1d4ed8); border-radius: 3px; transition: width 0.3s ease; } """ def _get_head_content(self) -> str: """Get HTML head content.""" return """ """ def _create_header(self): """Create the main header.""" gr.HTML("""

🛠️ MCP Server Orchestration Platform

Discover, configure, and invoke tools from multiple Model Context Protocol servers

⚡ Production-Grade Architecture 🔒 Enterprise Security 📊 Real-Time Analytics
""") def _create_tool_invocation_tab(self): """Create the tool invocation interface.""" with gr.Row(): # Left column: Tool selection and form with gr.Column(scale=1): gr.HTML("

🎯 Tool Selection

") # Session management with gr.Group(): session_btn = gr.Button("🔐 Create Session", variant="primary") self.session_display = gr.JSON( label="Current Session", info="Your active session information" ) # Server selection self.available_servers_dropdown = gr.Dropdown( label="Select MCP Server", choices=[], info="Choose a server to browse available tools" ) # Tool search self.tool_search = gr.Textbox( label="🔍 Search Tools", placeholder="Search tools by name or description...", info="Find tools across all registered servers" ) # Tool results self.available_tools_display = gr.JSON( label="Available Tools", info="Tools available on the selected server" ) # Refresh button refresh_btn = gr.Button("🔄 Refresh Tools", variant="secondary") # Right column: Tool invocation form and results with gr.Column(scale=1): gr.HTML("

🛠️ Tool Invocation

") # Dynamic tool form self.tool_form = gr.JSON( label="Tool Parameters", info="Generated form based on tool schema" ) # Selected tool info self.selected_tool_info = gr.JSON( label="Selected Tool Information", info="Tool description and parameters" ) # Execute button execute_btn = gr.Button("🚀 Execute Tool", variant="primary") # Results with gr.Accordion("📋 Execution Results", open=False): self.execution_results = gr.JSON( label="Tool Execution Results", info="Complete execution results and metadata" ) self.result_summary = gr.Markdown( label="Results Summary", value="No results yet" ) # Streaming output self.streaming_output = gr.HTML( label="Real-time Progress", value="
No active execution
" ) # Event handlers session_btn.click( fn=self._create_session, inputs=[], outputs=[self.session_display, self.available_servers_dropdown] ) refresh_btn.click( fn=self._refresh_tools, inputs=[self.available_servers_dropdown, self.tool_search], outputs=[self.available_tools_display] ) self.available_servers_dropdown.change( fn=self._on_server_change, inputs=[self.available_servers_dropdown], outputs=[self.available_tools_display, self.tool_form] ) execute_btn.click( fn=self._execute_tool_streaming, inputs=[self.available_servers_dropdown, self.tool_search, self.tool_form], outputs=[self.execution_results, self.result_summary, self.streaming_output], show_progress="full" ) self.tool_search.input( fn=self._search_tools, inputs=[self.available_servers_dropdown, self.tool_search], outputs=[self.available_tools_display] ) def _create_server_management_tab(self): """Create the server management interface.""" with gr.Row(): with gr.Column(): gr.HTML("

🔧 Server Management

") # Server status dashboard self.server_status_display = gr.JSON( label="Server Status Dashboard", info="Real-time status of all registered servers" ) # Refresh status refresh_status_btn = gr.Button("🔄 Refresh Status", variant="secondary") # Add server form with gr.Accordion("➕ Add New Server", open=False): server_id = gr.Textbox( label="Server ID", placeholder="unique-server-id" ) server_name = gr.Textbox( label="Server Name", placeholder="Human readable name" ) server_url = gr.Textbox( label="Server URL", placeholder="https://server.example.com" ) auth_token = gr.Textbox( label="Authentication Token", placeholder="Bearer token (optional)" ) server_timeout = gr.Slider( minimum=5, maximum=120, value=30, step=5, label="Timeout (seconds)" ) add_server_btn = gr.Button("➕ Add Server", variant="primary") with gr.Column(): gr.HTML("

📊 Server Analytics

") # Performance plots self.server_performance_plot = gr.Plot( label="Server Performance", info="Response times and success rates by server" ) # Connection status self.connection_status_plot = gr.Plot( label="Connection Status", info="Active connections and circuit breaker states" ) # Server metrics table self.server_metrics_table = gr.DataFrame( label="Server Metrics", headers=["Server", "Status", "Connections", "Tools", "Success Rate", "Avg Response Time"], info="Detailed metrics for each server" ) # Event handlers refresh_status_btn.click( fn=self._refresh_server_status, inputs=[], outputs=[self.server_status_display, self.server_performance_plot, self.connection_status_plot, self.server_metrics_table] ) add_server_btn.click( fn=self._add_server, inputs=[server_id, server_name, server_url, auth_token, server_timeout], outputs=[self.server_status_display, self.available_servers_dropdown] ) def _create_analytics_tab(self): """Create the analytics dashboard.""" with gr.Row(): with gr.Column(): gr.HTML("

📊 Performance Analytics

") # Key metrics with gr.Row(): self.total_requests_metric = gr.Markdown( value="📈 **Total Requests**: 0" ) self.success_rate_metric = gr.Markdown( value="✅ **Success Rate**: 0%" ) self.active_sessions_metric = gr.Markdown( value="👥 **Active Sessions**: 0" ) # Performance plots self.metrics_memory_plot = gr.Plot( label="Memory Usage Over Time", info="System memory usage patterns" ) self.metrics_cpu_plot = gr.Plot( label="CPU Usage Over Time", info="System CPU utilization" ) with gr.Column(): gr.HTML("

🚀 Tool Analytics

") self.metrics_throughput_plot = gr.Plot( label="Request Throughput", info="Requests per minute over time" ) self.cache_hit_rate_plot = gr.Plot( label="Cache Performance", info="Cache hit/miss rates" ) # Tool usage table self.tool_usage_table = gr.DataFrame( label="Tool Usage Statistics", headers=["Tool", "Server", "Invocations", "Success Rate", "Avg Time", "Errors"], info="Most frequently used tools" ) # Auto-refresh analytics gr.Timer(value=30).tick( fn=self._update_analytics, inputs=[], outputs=[ self.total_requests_metric, self.success_rate_metric, self.active_sessions_metric, self.metrics_memory_plot, self.metrics_cpu_plot, self.metrics_throughput_plot, self.cache_hit_rate_plot, self.tool_usage_table ] ) def _create_configuration_tab(self): """Create the configuration management interface.""" with gr.Row(): with gr.Column(): gr.HTML("

⚙️ Configuration Management

") # Configuration editor self.config_editor = gr.JSON( label="Current Configuration", info="Editable configuration JSON" ) # Load/Save buttons with gr.Row(): load_config_btn = gr.Button("📥 Load Config", variant="secondary") save_config_btn = gr.Button("💾 Save Config", variant="primary") reset_config_btn = gr.Button("🔄 Reset to Defaults", variant="secondary") # Environment variables with gr.Accordion("🌍 Environment Variables", open=False): self.env_vars_display = gr.JSON( label="Environment Variables", info="Current environment variable configuration" ) with gr.Column(): gr.HTML("

🔒 Security Settings

") # Security configuration with gr.Group(): rate_limit_enabled = gr.Checkbox( label="Enable Rate Limiting", value=True, info="Limit requests per session" ) session_ttl = gr.Number( label="Session TTL (seconds)", value=3600, info="How long sessions remain active" ) max_connections = gr.Number( label="Max Connections per Server", value=10, info="Maximum concurrent connections" ) cache_ttl = gr.Number( label="Cache TTL (seconds)", value=300, info="How long cached data remains valid" ) # Apply settings button apply_security_btn = gr.Button("🔒 Apply Security Settings", variant="primary") # Configuration validation self.config_validation_display = gr.JSON( label="Configuration Validation", info="Results of configuration validation" ) # Event handlers load_config_btn.click( fn=self._load_configuration, inputs=[], outputs=[self.config_editor, self.env_vars_display, self.config_validation_display] ) save_config_btn.click( fn=self._save_configuration, inputs=[self.config_editor], outputs=[self.config_validation_display] ) reset_config_btn.click( fn=self._reset_configuration, inputs=[], outputs=[self.config_editor, self.config_validation_display] ) apply_security_btn.click( fn=self._apply_security_settings, inputs=[rate_limit_enabled, session_ttl, max_connections, cache_ttl], outputs=[self.config_validation_display] ) # Event Handlers and State Management def _load_initial_data(self): """Load initial data for the interface.""" try: # Get server status server_status = asyncio.run(self.orchestrator.get_all_servers_status()) servers = list(server_status.keys()) # Get available tools (empty initially) available_tools = {} # Get metrics metrics = asyncio.run(self.orchestrator.get_metrics()) # Generate placeholder plots memory_plot = self._create_memory_plot() cpu_plot = self._create_cpu_plot() throughput_plot = self._create_throughput_plot() cache_plot = self._create_cache_plot() return ( server_status, servers, available_tools, memory_plot, cpu_plot, throughput_plot, cache_plot ) except Exception as e: self.logger.error("Failed to load initial data", error=str(e)) return ({}, [], {}, None, None, None, None) def _create_session(self): """Create a new user session.""" try: session = asyncio.run( self.orchestrator.session_manager.create_session() ) self.current_session = session # Get servers servers = list(self.orchestrator.servers.keys()) return session.model_dump(), servers except Exception as e: self.logger.error("Failed to create session", error=str(e)) return {"error": str(e)}, [] def _refresh_tools(self, server_id: str, search_query: str): """Refresh available tools for a server.""" try: if not server_id: return {} tools = asyncio.run( self.orchestrator.tool_manager.get_server_tools(server_id) ) # Filter by search query if provided if search_query: filtered_tools = {} for tool_name, tool_schema in tools.items(): if (search_query.lower() in tool_name.lower() or search_query.lower() in tool_schema.description.lower()): filtered_tools[tool_name] = tool_schema tools = filtered_tools # Store for form generation self.available_tools[server_id] = tools return {name: schema.model_dump() for name, schema in tools.items()} except Exception as e: self.logger.error("Failed to refresh tools", server_id=server_id, error=str(e)) return {"error": str(e)} def _on_server_change(self, server_id: str): """Handle server selection change.""" if not server_id: return {}, {} # Refresh tools for new server tools = self._refresh_tools(server_id, "") # Generate form for first tool (if any) tool_form = {} if tools: first_tool = list(tools.keys())[0] tool_schema = tools[first_tool] tool_form = self._generate_tool_form(first_tool, tool_schema) return tools, tool_form def _search_tools(self, server_id: str, search_query: str): """Search tools across servers.""" try: if not search_query: return self._refresh_tools(server_id, "") # Search tools results = asyncio.run( self.orchestrator.tool_manager.search_tools(search_query) ) return {result["tool_name"]: result["schema"].model_dump() for result in results} except Exception as e: self.logger.error("Tool search failed", query=search_query, error=str(e)) return {"error": str(e)} def _generate_tool_form(self, tool_name: str, tool_schema: dict) -> dict: """Generate form based on tool schema.""" if not tool_schema: return {} properties = tool_schema.get("input_schema", {}).get("properties", {}) required = tool_schema.get("input_schema", {}).get("required", []) form_fields = {} for param_name, param_schema in properties.items(): field_info = { "type": param_schema.get("type", "string"), "label": param_name.replace("_", " ").title(), "description": param_schema.get("description", ""), "required": param_name in required, "default": param_schema.get("default"), "enum": param_schema.get("enum", []) } # Handle specific types if field_info["type"] == "string": field_info["component"] = "textbox" field_info["placeholder"] = param_schema.get("examples", [""])[0] if param_schema.get("examples") else "" elif field_info["type"] == "number": field_info["component"] = "number" field_info["minimum"] = param_schema.get("minimum") field_info["maximum"] = param_schema.get("maximum") field_info["step"] = param_schema.get("multipleOf", 1) elif field_info["type"] == "boolean": field_info["component"] = "checkbox" elif field_info["type"] == "array": field_info["component"] = "textbox" field_info["placeholder"] = "Enter as JSON array" elif field_info["type"] == "object": field_info["component"] = "textbox" field_info["placeholder"] = "Enter as JSON object" form_fields[param_name] = field_info return { "tool_name": tool_name, "schema": tool_schema, "fields": form_fields } async def _execute_tool_streaming(self, server_id: str, tool_search: str, tool_form_data: dict): """Execute tool with streaming updates.""" if not self.current_session: yield {"error": "No active session"}, "❌ **Error**: No active session", "" if not server_id or not tool_form_data: yield {"error": "Missing server or tool selection"}, "❌ **Error**: Missing server or tool", "" try: # Extract tool name and parameters tool_name = tool_form_data.get("tool_name", tool_search) parameters = {} # Parse form fields fields = tool_form_data.get("fields", {}) for param_name, field_info in fields.items(): value = field_info.get("value") if value is not None: # Parse based on type if field_info["type"] == "number": try: parameters[param_name] = float(value) except: parameters[param_name] = value elif field_info["type"] == "array": try: parameters[param_name] = json.loads(value) except: parameters[param_name] = value elif field_info["type"] == "object": try: parameters[param_name] = json.loads(value) except: parameters[param_name] = value else: parameters[param_name] = value # Start streaming yield {}, "", "🔄 **Executing tool...**" # Execute tool with timeout start_time = time.time() result = await asyncio.wait_for( self.orchestrator.invoke_tool( server_id, tool_name, parameters, self.current_session.session_id ), timeout=30.0 ) execution_time = time.time() - start_time # Generate summary summary = f"""✅ **Tool executed successfully** 🛠️ **Tool**: {tool_name} on {server_id} ⏱️ **Execution Time**: {execution_time:.2f} seconds 📊 **Status**: {result.get('status', 'unknown')} **Results:** {result.get('result', 'No results')} **Metadata:** - Session: {self.current_session.session_id} - Parameters: {parameters}""" yield result, summary, f"🎉 **Completed in {execution_time:.2f}s**" except asyncio.TimeoutError: error_msg = f"⏰ **Timeout**: Tool execution exceeded 30 seconds" yield {"error": "Tool execution timeout"}, error_msg, error_msg except Exception as e: error_msg = f"❌ **Error**: {str(e)}" yield {"error": str(e)}, error_msg, error_msg def _refresh_server_status(self): """Refresh server status display.""" try: status = asyncio.run(self.orchestrator.get_all_servers_status()) # Create performance plot performance_plot = self._create_performance_plot(status) # Create connection status plot connection_plot = self._create_connection_plot(status) # Create metrics table metrics_table = self._create_metrics_table(status) return status, performance_plot, connection_plot, metrics_table except Exception as e: self.logger.error("Failed to refresh server status", error=str(e)) return {}, None, None, [] def _add_server(self, server_id: str, server_name: str, server_url: str, auth_token: str, timeout: int): """Add a new server.""" try: if not all([server_id, server_name, server_url]): return {}, [], {"error": "Server ID, name, and URL are required"} server_config = ServerConfig( id=server_id, name=server_name, url=server_url, auth_token=auth_token or None, timeout=timeout ) asyncio.run(self.orchestrator.register_server(server_config)) # Refresh status and servers list status = asyncio.run(self.orchestrator.get_all_servers_status()) servers = list(status.keys()) return status, servers, {"success": f"Server '{server_name}' added successfully"} except Exception as e: self.logger.error("Failed to add server", error=str(e)) return {}, [], {"error": str(e)} # Analytics and plotting methods def _create_memory_plot(self): """Create memory usage plot.""" try: import psutil import time # Generate sample data for demonstration timestamps = [time.time() - i * 60 for i in range(30, 0, -1)] memory_usage = [psutil.virtual_memory().percent + (i % 10) * 2 for i in range(30)] fig = go.Figure() fig.add_trace(go.Scatter( x=timestamps, y=memory_usage, mode='lines+markers', name='Memory Usage (%)', line=dict(color='#3b82f6', width=3) )) fig.update_layout( title="Memory Usage Over Time", xaxis_title="Time", yaxis_title="Memory Usage (%)", template="plotly_white", height=300 ) return fig except Exception as e: self.logger.error("Failed to create memory plot", error=str(e)) return go.Figure() def _create_cpu_plot(self): """Create CPU usage plot.""" try: import psutil import time # Generate sample data timestamps = [time.time() - i * 60 for i in range(30, 0, -1)] cpu_usage = [psutil.cpu_percent() + (i % 5) * 3 for i in range(30)] fig = go.Figure() fig.add_trace(go.Scatter( x=timestamps, y=cpu_usage, mode='lines+markers', name='CPU Usage (%)', line=dict(color='#10b981', width=3) )) fig.update_layout( title="CPU Usage Over Time", xaxis_title="Time", yaxis_title="CPU Usage (%)", template="plotly_white", height=300 ) return fig except Exception as e: self.logger.error("Failed to create CPU plot", error=str(e)) return go.Figure() def _create_throughput_plot(self): """Create request throughput plot.""" try: import time # Generate sample throughput data timestamps = [time.time() - i * 60 for i in range(30, 0, -1)] throughput = [5 + (i % 10) * 2 for i in range(30)] fig = go.Figure() fig.add_trace(go.Scatter( x=timestamps, y=throughput, mode='lines+markers', name='Requests per Minute', line=dict(color='#f59e0b', width=3) )) fig.update_layout( title="Request Throughput", xaxis_title="Time", yaxis_title="Requests per Minute", template="plotly_white", height=300 ) return fig except Exception as e: self.logger.error("Failed to create throughput plot", error=str(e)) return go.Figure() def _create_cache_plot(self): """Create cache performance plot.""" try: import time # Generate sample cache data timestamps = [time.time() - i * 60 for i in range(30, 0, -1)] hit_rate = [85 + (i % 10) * 5 for i in range(30)] fig = go.Figure() fig.add_trace(go.Scatter( x=timestamps, y=hit_rate, mode='lines+markers', name='Cache Hit Rate (%)', line=dict(color='#8b5cf6', width=3) )) fig.update_layout( title="Cache Performance", xaxis_title="Time", yaxis_title="Hit Rate (%)", template="plotly_white", height=300 ) return fig except Exception as e: self.logger.error("Failed to create cache plot", error=str(e)) return go.Figure() def _create_performance_plot(self, server_status: dict): """Create server performance plot.""" try: servers = list(server_status.keys()) if not servers: return go.Figure() # Sample data response_times = [0.5 + (i % 20) * 0.1 for i in range(len(servers))] success_rates = [95 + (i % 5) for i in range(len(servers))] fig = make_subplots( rows=1, cols=2, subplot_titles=("Response Time", "Success Rate"), specs=[[{"secondary_y": False}, {"secondary_y": False}]] ) fig.add_trace( go.Bar(x=servers, y=response_times, name="Response Time (s)", marker_color='#3b82f6'), row=1, col=1 ) fig.add_trace( go.Bar(x=servers, y=success_rates, name="Success Rate (%)", marker_color='#10b981'), row=1, col=2 ) fig.update_layout(title="Server Performance Metrics", height=300, showlegend=False) return fig except Exception as e: self.logger.error("Failed to create performance plot", error=str(e)) return go.Figure() def _create_connection_plot(self, server_status: dict): """Create connection status plot.""" try: servers = list(server_status.keys()) if not servers: return go.Figure() # Sample connection data active_connections = [3 + (i % 10) for i in range(len(servers))] max_connections = [10] * len(servers) fig = go.Figure() fig.add_trace(go.Bar( name='Active Connections', x=servers, y=active_connections, marker_color='#10b981' )) fig.add_trace(go.Bar( name='Max Connections', x=servers, y=[max_connections[i] - active_connections[i] for i in range(len(servers))], marker_color='#e5e7eb' )) fig.update_layout( title="Connection Status", barmode='stack', xaxis_title="Servers", yaxis_title="Connections", height=300 ) return fig except Exception as e: self.logger.error("Failed to create connection plot", error=str(e)) return go.Figure() def _create_metrics_table(self, server_status: dict): """Create server metrics table.""" try: data = [] for server_id, status in server_status.items(): data.append([ status.get('name', server_id), status.get('state', 'unknown'), status.get('in_use_connections', 0), status.get('tool_count', 0), f"{95 + (hash(server_id) % 10)}%", # Mock success rate f"{0.5 + (hash(server_id) % 5) * 0.1:.2f}s" # Mock response time ]) return data except Exception as e: self.logger.error("Failed to create metrics table", error=str(e)) return [] def _update_analytics(self): """Update analytics dashboard.""" try: metrics = asyncio.run(self.orchestrator.get_metrics()) # Update key metrics total_requests = metrics.get('total_requests', 0) success_rate = 95.5 # Mock success rate active_sessions = metrics.get('session_stats', {}).get('total_sessions', 0) # Update plots memory_plot = self._create_memory_plot() cpu_plot = self._create_cpu_plot() throughput_plot = self._create_throughput_plot() cache_plot = self._create_cache_plot() # Tool usage table tool_usage_data = [] # Mock data for i in range(5): tool_usage_data.append([ f"tool_{i+1}", f"server_{i+1}", 100 + (i * 25), f"{90 + (i * 2)}%", f"{0.5 + i * 0.2:.2f}s", i # errors ]) return ( f"📈 **Total Requests**: {total_requests:,}", f"✅ **Success Rate**: {success_rate:.1f}%", f"👥 **Active Sessions**: {active_sessions:,}", memory_plot, cpu_plot, throughput_plot, cache_plot, tool_usage_data ) except Exception as e: self.logger.error("Failed to update analytics", error=str(e)) return "0", "0%", "0", None, None, None, None, [] # Configuration management methods def _load_configuration(self): """Load current configuration.""" try: config = self.orchestrator.config.config env_vars = dict(self.orchestrator.config.config) # Hide sensitive values for key, value in env_vars.items(): if 'token' in key.lower() or 'secret' in key.lower() or 'key' in key.lower(): env_vars[key] = "***HIDDEN***" return config, env_vars, {"status": "Configuration loaded successfully"} except Exception as e: self.logger.error("Failed to load configuration", error=str(e)) return {}, {}, {"error": str(e)} def _save_configuration(self, config_data: dict): """Save configuration changes.""" try: # In a real implementation, this would validate and save the config # For now, just return success return {"status": "Configuration saved successfully", "validated": True} except Exception as e: self.logger.error("Failed to save configuration", error=str(e)) return {"error": str(e), "validated": False} def _reset_configuration(self): """Reset configuration to defaults.""" try: # Load default configuration default_config = self.orchestrator.config.load_from_env() return default_config, {}, {"status": "Configuration reset to defaults"} except Exception as e: self.logger.error("Failed to reset configuration", error=str(e)) return {}, {}, {"error": str(e)} def _apply_security_settings(self, rate_limit: bool, session_ttl: int, max_connections: int, cache_ttl: int): """Apply security configuration changes.""" try: # Update configuration self.orchestrator.config.config.update({ 'rate_limit_enabled': rate_limit, 'session_ttl': session_ttl, 'max_connections': max_connections, 'cache_ttl': cache_ttl }) return {"status": "Security settings applied successfully", "validated": True} except Exception as e: self.logger.error("Failed to apply security settings", error=str(e))