# app.py - Complete fixed version with Plotly compatibility
# ๐ ARF Ultimate Investor Demo v3.3.9 - ENTERPRISE EDITION
# Enhanced with clear OSS vs Enterprise boundaries
# UPDATED: Added realism panel integration for enterprise-seasoned SRE experience
# UPDATED: Added dynamic performance metrics for Phase 2
import logging
import sys
import traceback
import json
import datetime
import asyncio
import time
import random
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# ===========================================
# CONFIGURE LOGGING FIRST
# ===========================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('arf_demo.log')
]
)
logger = logging.getLogger(__name__)
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
# ===========================================
# FIX FOR ASYNC EVENT LOOP ISSUES
# ===========================================
try:
import nest_asyncio
nest_asyncio.apply()
logger.info("โ
Applied nest_asyncio for async event loop compatibility")
except ImportError:
logger.warning("โ ๏ธ nest_asyncio not available, async operations may have issues")
# ===========================================
# IMPORT UTILITY CLASSES FIRST
# ===========================================
from utils.installation import InstallationHelper
from demo.guidance import DemoPsychologyController, get_demo_controller
# ===========================================
# BOUNDARY MANAGEMENT SYSTEM
# ===========================================
class BoundaryManager:
"""Manages clear boundaries between OSS and Enterprise"""
@staticmethod
def get_system_boundaries():
"""Get current system boundaries"""
installation = get_installation_status()
return {
"oss": {
"available": installation["oss_installed"],
"version": installation["oss_version"] or "mock",
"label": installation["badges"]["oss"]["text"],
"color": installation["badges"]["oss"]["color"],
"icon": installation["badges"]["oss"]["icon"],
"capabilities": ["advisory_analysis", "rag_search", "healing_intent"],
"license": "Apache 2.0"
},
"enterprise": {
"available": installation["enterprise_installed"],
"version": installation["enterprise_version"] or "simulated",
"label": installation["badges"]["enterprise"]["text"],
"color": installation["badges"]["enterprise"]["color"],
"icon": installation["badges"]["enterprise"]["icon"],
"capabilities": ["autonomous_execution", "rollback_guarantee", "mcp_integration", "enterprise_support"],
"license": "Commercial"
},
"demo_mode": {
"active": True,
"architecture": "OSS advises โ Enterprise executes",
"boundary_visible": settings.show_boundaries
}
}
@staticmethod
def get_boundary_badges() -> str:
"""Get HTML badges showing system boundaries"""
boundaries = BoundaryManager.get_system_boundaries()
return f"""
{boundaries['oss']['icon']}
{boundaries['oss']['label']}
Apache 2.0 โข Advisory Intelligence
{boundaries['enterprise']['icon']}
{boundaries['enterprise']['label']}
Commercial โข Autonomous Execution
๐๏ธ
Architecture Boundary
OSS advises โ Enterprise executes
"""
@staticmethod
def create_boundary_indicator(action: str, is_simulated: bool = True) -> str:
"""Create clear execution boundary indicator"""
if is_simulated:
return f"""
๐ญ
SIMULATED ENTERPRISE EXECUTION
Action: {action}
Mode: Enterprise Simulation (not real execution)
Boundary: OSS advises โ Enterprise would execute
DEMO BOUNDARY
In production, Enterprise edition would execute against real infrastructure
"""
else:
return f"""
โก
REAL ENTERPRISE EXECUTION
Action: {action}
Mode: Enterprise Autonomous
Boundary: Real execution with safety guarantees
ENTERPRISE+
"""
# ===========================================
# ASYNC UTILITIES
# ===========================================
class AsyncRunner:
"""Enhanced async runner with better error handling"""
@staticmethod
def run_async(coro):
"""Run async coroutine in sync context"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
except Exception as e:
logger.error(f"Async execution failed: {e}")
return {"error": str(e), "status": "failed", "boundary_note": "Execution boundary reached"}
@staticmethod
def async_to_sync(async_func):
"""Decorator to convert async function to sync"""
def wrapper(*args, **kwargs):
try:
return AsyncRunner.run_until_complete(async_func(*args, **kwargs))
except Exception as e:
logger.error(f"Async to sync conversion failed: {e}")
return {"error": str(e), "status": "failed", "boundary_context": "OSS advisory only - execution requires Enterprise"}
return wrapper
# ===========================================
# SIMPLE SETTINGS - FIXED: Added missing attribute
# ===========================================
class Settings:
"""Simple settings class - FIXED: Added default_savings_rate"""
def __init__(self):
self.arf_mode = "demo"
self.use_true_arf = True
self.default_scenario = "Cache Miss Storm"
self.max_history_items = 100
self.auto_refresh_seconds = 30
self.show_boundaries = True
self.architectural_honesty = True
self.engineer_annual_cost = 200000
self.default_savings_rate = 0.25 # FIXED: Added missing attribute
settings = Settings()
# ===========================================
# ARF INSTALLATION CHECK - FIXED VERSION
# ===========================================
def check_arf_installation():
"""Check if real ARF packages are installed - Fixed version"""
results = {
"oss_installed": False,
"enterprise_installed": False,
"oss_version": None,
"enterprise_version": None,
"oss_edition": "unknown",
"oss_license": "unknown",
"execution_allowed": False,
"recommendations": [],
"boundaries": {
"oss_can": ["advisory_analysis", "rag_search", "healing_intent"],
"oss_cannot": ["execute", "modify_infra", "autonomous_healing"],
"enterprise_requires": ["license", "infra_access", "safety_controls"]
},
"badges": {
"oss": {"text": "โ ๏ธ Mock ARF", "color": "#f59e0b", "icon": "โ ๏ธ"},
"enterprise": {"text": "๐ Enterprise Required", "color": "#64748b", "icon": "๐"}
},
"timestamp": datetime.datetime.now().isoformat()
}
# Check OSS package using InstallationHelper
installation_helper = InstallationHelper()
status = installation_helper.check_installation()
results["oss_installed"] = status["oss_installed"]
results["oss_version"] = status["oss_version"]
results["enterprise_installed"] = status["enterprise_installed"]
results["enterprise_version"] = status["enterprise_version"]
results["recommendations"] = status["recommendations"]
if results["oss_installed"]:
results["badges"]["oss"] = {
"text": f"โ
ARF OSS v{results['oss_version']}",
"color": "#10b981",
"icon": "โ
"
}
logger.info(f"โ
ARF OSS v{results['oss_version']} detected")
else:
results["badges"]["oss"] = {
"text": "โ
ARF OSS v3.3.9",
"color": "#10b981",
"icon": "โ
"
}
logger.info("โ
ARF OSS v3.3.9 (demo mode)")
if results["enterprise_installed"]:
results["badges"]["enterprise"] = {
"text": f"๐ Enterprise v{results['enterprise_version']}",
"color": "#8b5cf6",
"icon": "๐"
}
logger.info(f"โ
ARF Enterprise v{results['enterprise_version']} detected")
else:
# โ
ADD THIS MISSING ENTERPRISE ELSE BLOCK:
results["badges"]["enterprise"] = {
"text": "๐ข Enterprise Edition", # Changed from "๐ Enterprise Required"
"color": "#3b82f6", # Changed from "#64748b" (gray to blue)
"icon": "๐ข" # Changed from "๐"
}
logger.info("๐ข Enterprise Edition (simulated)")
return results
_installation_status = None
def get_installation_status():
"""Get cached installation status"""
global _installation_status
if _installation_status is None:
_installation_status = check_arf_installation()
return _installation_status
# ===========================================
# PLOTLY CONFIGURATION FOR GRADIO COMPATIBILITY
# ===========================================
import plotly.graph_objects as go
import plotly.express as px
import plotly.io as pio
import pandas as pd
import numpy as np
# Configure Plotly for Gradio compatibility
pio.templates.default = "plotly_white"
logger.info("โ
Plotly configured for Gradio compatibility")
# ===========================================
# ENHANCED VISUALIZATION FUNCTIONS WITH GRADIO COMPATIBILITY
# ===========================================
def create_simple_telemetry_plot(scenario_name: str, is_real_arf: bool = True) -> go.Figure:
"""
FIXED: Enhanced for Gradio compatibility with better error handling
"""
try:
# Generate sample telemetry data
times = pd.date_range(start=datetime.datetime.now() - datetime.timedelta(minutes=10),
end=datetime.datetime.now(),
periods=60)
# Different patterns based on scenario
if "Cache" in scenario_name:
normal_values = np.random.normal(30, 5, 30).tolist()
anomaly_values = np.random.normal(85, 10, 30).tolist()
data = normal_values + anomaly_values
title = f"Cache Hit Rate: {scenario_name}"
y_label = "Hit Rate (%)"
threshold = 75
elif "Database" in scenario_name:
normal_values = np.random.normal(15, 3, 30).tolist()
anomaly_values = np.random.normal(95, 5, 30).tolist()
data = normal_values + anomaly_values
title = f"Database Connections: {scenario_name}"
y_label = "Connections (%)"
threshold = 90
elif "Kubernetes" in scenario_name:
normal_values = np.random.normal(40, 8, 30).tolist()
anomaly_values = np.random.normal(95, 2, 30).tolist()
data = normal_values + anomaly_values
title = f"Memory Usage: {scenario_name}"
y_label = "Memory (%)"
threshold = 85
else:
normal_values = np.random.normal(50, 10, 30).tolist()
anomaly_values = np.random.normal(90, 5, 30).tolist()
data = normal_values + anomaly_values
title = f"System Metrics: {scenario_name}"
y_label = "Metric (%)"
threshold = 80
# Create Plotly figure
fig = go.Figure()
# Add normal region
fig.add_trace(go.Scatter(
x=times[:30],
y=data[:30],
mode='lines',
name='Normal',
line=dict(color='#10b981', width=3),
fill='tozeroy',
fillcolor='rgba(16, 185, 129, 0.1)'
))
# Add anomaly region
fig.add_trace(go.Scatter(
x=times[30:],
y=data[30:],
mode='lines',
name='Anomaly',
line=dict(color='#ef4444', width=3)
))
# Add threshold line
fig.add_hline(y=threshold, line_dash="dash",
line_color="#f59e0b",
annotation_text="Alert Threshold",
annotation_position="top right")
# Update layout - FIXED: Simplified for Gradio compatibility
fig.update_layout(
title={
'text': title,
'font': dict(size=18, color='#1e293b', family="Arial, sans-serif"),
'x': 0.5
},
xaxis_title="Time",
yaxis_title=y_label,
height=300,
margin=dict(l=40, r=20, t=50, b=40),
plot_bgcolor='white',
paper_bgcolor='white',
showlegend=True,
hovermode='x unified'
)
logger.info(f"โ
Created telemetry plot for {scenario_name}")
return fig
except Exception as e:
logger.error(f"Error creating telemetry plot: {e}")
# Return a simple valid Plotly figure as fallback
fig = go.Figure()
fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode='lines', name='Fallback'))
fig.update_layout(
title=f"Telemetry: {scenario_name}",
height=300,
plot_bgcolor='white'
)
return fig
def create_simple_impact_plot(scenario_name: str, is_real_arf: bool = True) -> go.Figure:
"""
FIXED: Enhanced for Gradio compatibility
"""
try:
# Impact values based on scenario
impact_values = {
"Cache Miss Storm": 8500,
"Database Connection Pool Exhaustion": 4200,
"Kubernetes Memory Leak": 5500,
"API Rate Limit Storm": 3800,
"Network Partition": 12000,
"Storage I/O Saturation": 6800
}
impact = impact_values.get(scenario_name, 5000)
# Create gauge chart - FIXED: Enhanced for Gradio
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=impact,
domain={'x': [0, 1], 'y': [0, 1]},
title={
'text': f"Revenue Impact: ${impact:,}/hour",
'font': dict(size=16, family="Arial, sans-serif")
},
number={
'prefix': "$",
'suffix': "/hour",
'font': dict(size=28, family="Arial, sans-serif")
},
gauge={
'axis': {'range': [None, impact * 1.2], 'tickwidth': 1},
'bar': {'color': "#ef4444"},
'bgcolor': "white",
'borderwidth': 2,
'bordercolor': "gray",
'steps': [
{'range': [0, impact * 0.3], 'color': '#10b981'},
{'range': [impact * 0.3, impact * 0.7], 'color': '#f59e0b'},
{'range': [impact * 0.7, impact], 'color': '#ef4444'}
],
'threshold': {
'line': {'color': "black", 'width': 4},
'thickness': 0.75,
'value': impact
}
}
))
# Update layout - FIXED: Enhanced for Gradio
fig.update_layout(
height=400,
margin=dict(l=30, r=30, t=70, b=30),
paper_bgcolor='white',
font=dict(family="Arial, sans-serif")
)
logger.info(f"โ
Created impact plot for {scenario_name}")
return fig
except Exception as e:
logger.error(f"Error creating impact plot: {e}")
# Return a simple valid gauge as fallback
fig = go.Figure(go.Indicator(
mode="gauge",
value=0,
title={'text': "Impact (fallback)"}
))
fig.update_layout(height=400)
return fig
def create_empty_plot(title: str, is_real_arf: bool = True) -> go.Figure:
"""
FIXED: Enhanced for Gradio compatibility
"""
try:
fig = go.Figure()
# Add text annotation - FIXED: Enhanced
fig.add_annotation(
x=0.5, y=0.5,
text=title,
showarrow=False,
font=dict(size=18, color="#64748b", family="Arial, sans-serif"),
xref="paper",
yref="paper"
)
# Add boundary indicator if needed
if is_real_arf:
fig.add_annotation(
x=0.02, y=0.98,
text="โ
REAL ARF",
showarrow=False,
font=dict(size=12, color="#10b981", family="Arial, sans-serif"),
xref="paper",
yref="paper",
bgcolor="white",
bordercolor="#10b981",
borderwidth=1,
borderpad=4
)
fig.update_layout(
title={
'text': "Visualization Placeholder",
'font': dict(size=14, color="#94a3b8", family="Arial, sans-serif")
},
height=300,
plot_bgcolor='white',
paper_bgcolor='white',
xaxis={'visible': False},
yaxis={'visible': False},
margin=dict(l=20, r=20, t=50, b=20)
)
return fig
except Exception as e:
logger.error(f"Error creating empty plot: {e}")
# Ultra-simple fallback
fig = go.Figure()
fig.update_layout(height=300)
return fig
# Keep the HTML fallback functions for other uses
def create_html_telemetry_fallback(scenario_name: str, is_real_arf: bool) -> str:
"""HTML fallback for telemetry visualization (unchanged)"""
severity_colors = {
"Cache Miss Storm": "#f59e0b",
"Database Connection Pool Exhaustion": "#ef4444",
"Kubernetes Memory Leak": "#8b5cf6",
"API Rate Limit Storm": "#ec4899",
"Network Partition": "#14b8a6",
"Storage I/O Saturation": "#84cc16"
}
color = severity_colors.get(scenario_name, "#64748b")
boundary_indicator = "๐ข ENTERPRISE" if is_real_arf else "๐ OSS ONLY"
return f"""
{boundary_indicator}
๐ Telemetry: {scenario_name}
Real-time metrics showing anomalous behavior pattern detection.
ARF analyzes 45+ data points per second.
Boundary: This visualization shows {'real' if is_real_arf else 'simulated'}
telemetry analysis. {'Enterprise' if is_real_arf else 'OSS'} edition provides enhanced
anomaly detection.
"""
def create_html_impact_fallback(scenario_name: str, is_real_arf: bool) -> str:
"""HTML fallback for impact visualization (unchanged)"""
impact_values = {
"Cache Miss Storm": 8500,
"Database Connection Pool Exhaustion": 4200,
"Kubernetes Memory Leak": 5500,
"API Rate Limit Storm": 3800,
"Network Partition": 12000,
"Storage I/O Saturation": 6800
}
impact = impact_values.get(scenario_name, 5000)
savings = int(impact * 0.85)
boundary_text = "Enterprise Autonomous" if is_real_arf else "OSS Advisory"
boundary_color = "#8b5cf6" if is_real_arf else "#10b981"
return f"""
๐ฐ Business Impact Analysis
{boundary_text}
${impact:,}
Revenue Loss/Hour
$0
${impact//2:,}
${impact:,}
Without ARF
45 min
Mean time to resolve
With ARF
12 min
Autonomous recovery
๐
Potential ROI: 5.2ร
ARF saves 85% of potential revenue loss through autonomous recovery
Boundary Context: {'Enterprise' if is_real_arf else 'OSS'} analysis shows
{'real' if is_real_arf else 'simulated'} impact metrics.
{'Commercial license enables autonomous execution.' if is_real_arf else 'Upgrade to Enterprise for autonomous recovery.'}
"""
def get_inactive_agent_html(agent_name: str, description: str, is_real_arf: bool = False):
"""Get HTML for inactive agent state with boundary indicators (unchanged)"""
boundary_color = "#8b5cf6" if is_real_arf else "#10b981"
status_color = "#64748b"
return f"""
{description}
Requires { 'Enterprise' if is_real_arf else 'OSS' } activation
"""
# ===========================================
# IMPORT MODULAR COMPONENTS - FIXED: Added MockEnhancedROICalculator
# ===========================================
def import_components() -> Dict[str, Any]:
"""Safely import all components with proper error handling - FIXED: Added mock ROI calculator"""
components = {
"all_available": False,
"error": None,
"get_styles": lambda: "",
"show_boundaries": settings.show_boundaries,
}
try:
logger.info("Starting component import...")
# First, import gradio
import gradio as gr
components["gr"] = gr
# Import UI styles
from ui.styles import get_styles
components["get_styles"] = get_styles
# Import UI components - IMPORTANT: Now includes create_realism_panel AND update_performance_metrics
from ui.components import (
create_header, create_status_bar, create_tab1_incident_demo,
create_tab2_business_roi, create_tab3_enterprise_features,
create_tab4_audit_trail, create_tab5_learning_engine,
create_footer, create_realism_panel, update_performance_metrics # Added update_performance_metrics
)
components.update({
"create_header": create_header,
"create_status_bar": create_status_bar,
"create_tab1_incident_demo": create_tab1_incident_demo,
"create_tab2_business_roi": create_tab2_business_roi,
"create_tab3_enterprise_features": create_tab3_enterprise_features,
"create_tab4_audit_trail": create_tab4_audit_trail,
"create_tab5_learning_engine": create_tab5_learning_engine,
"create_footer": create_footer,
"create_realism_panel": create_realism_panel,
"update_performance_metrics": update_performance_metrics # Added for dynamic metrics
})
# Import scenarios
from demo.scenarios import INCIDENT_SCENARIOS
components["INCIDENT_SCENARIOS"] = INCIDENT_SCENARIOS
# Try to import TrueARFOrchestrator (renamed for version consistency)
try:
from core.true_arf_orchestrator import TrueARFOrchestrator
components["DemoOrchestrator"] = TrueARFOrchestrator
except ImportError:
# Fallback to old name for compatibility during transition
try:
from core.true_arf_orchestrator import TrueARF337Orchestrator
components["DemoOrchestrator"] = TrueARF337Orchestrator
logger.warning("โ ๏ธ Using TrueARF337Orchestrator - rename to TrueARFOrchestrator for version consistency")
except ImportError:
# Fallback to real ARF integration
try:
from core.real_arf_integration import RealARFIntegration
components["DemoOrchestrator"] = RealARFIntegration
except ImportError:
# Create a minimal mock orchestrator
class MockOrchestrator:
async def analyze_incident(self, scenario_name, scenario_data):
return {
"status": "mock",
"scenario": scenario_name,
"message": "Mock analysis (no real ARF available)",
"boundary_note": "OSS advisory mode - execution requires Enterprise",
"demo_display": {
"real_arf_version": "mock",
"true_oss_used": False,
"enterprise_simulated": True,
"architectural_boundary": "OSS advises โ Enterprise would execute"
}
}
async def execute_healing(self, scenario_name, mode="autonomous"):
return {
"status": "mock",
"scenario": scenario_name,
"message": "Mock execution (no real ARF available)",
"boundary_note": "Simulated Enterprise execution - real execution requires infrastructure",
"enterprise_features_used": ["simulated_execution", "mock_rollback", "demo_mode"]
}
components["DemoOrchestrator"] = MockOrchestrator
# FIXED: EnhancedROICalculator with proper mock fallback
try:
from core.calculators import EnhancedROICalculator
components["EnhancedROICalculator"] = EnhancedROICalculator()
logger.info("โ
Real EnhancedROICalculator loaded")
except ImportError:
# Create comprehensive mock ROI calculator
class MockEnhancedROICalculator:
"""Mock ROI calculator for demo purposes - FIXED to prevent KeyError"""
def calculate_comprehensive_roi(self, scenario_name=None, monthly_incidents=15, team_size=5, **kwargs):
"""Calculate comprehensive ROI metrics with realistic mock data"""
from datetime import datetime
# Mock ROI calculation with realistic values
impact_map = {
"Cache Miss Storm": 8500,
"Database Connection Pool Exhaustion": 4200,
"Kubernetes Memory Leak": 5500,
"API Rate Limit Storm": 3800,
"Network Partition": 12000,
"Storage I/O Saturation": 6800
}
impact_per_incident = impact_map.get(scenario_name or "Cache Miss Storm", 5000)
annual_impact = impact_per_incident * monthly_incidents * 12
potential_savings = int(annual_impact * 0.82)
enterprise_cost = 625000
roi_multiplier = round(potential_savings / enterprise_cost, 1)
payback_months = round((enterprise_cost / (potential_savings / 12)), 1)
return {
"status": "โ
Calculated Successfully",
"scenario": scenario_name or "Cache Miss Storm",
"timestamp": datetime.now().isoformat(),
"calculator": "MockEnhancedROICalculator",
"summary": {
"your_annual_impact": f"${annual_impact:,}",
"potential_savings": f"${potential_savings:,}",
"enterprise_cost": f"${enterprise_cost:,}",
"roi_multiplier": f"{roi_multiplier}ร",
"payback_months": f"{payback_months}",
"annual_roi_percentage": f"{int((potential_savings - enterprise_cost) / enterprise_cost * 100)}%",
"boundary_context": "Based on OSS analysis + simulated Enterprise execution"
},
"breakdown": {
"direct_cost_savings": f"${int(potential_savings * 0.7):,}",
"productivity_gains": f"${int(potential_savings * 0.2):,}",
"risk_reduction": f"${int(potential_savings * 0.1):,}"
},
"annual_projection": {
"incidents_prevented": monthly_incidents * 12,
"annual_savings": f"${potential_savings:,}",
"roi": f"{roi_multiplier}ร"
},
"notes": [
"๐ ROI calculation using mock data",
"๐ก Real enterprise ROI includes additional factors",
"๐ Full ROI requires Enterprise edition",
f"๐ Based on {monthly_incidents} incidents/month"
]
}
def get_roi_visualization_data(self):
"""Get data for ROI visualization"""
return {
"labels": ["Direct Savings", "Productivity", "Risk Reduction", "Upsell"],
"values": [65, 20, 10, 5],
"colors": ["#10b981", "#3b82f6", "#8b5cf6", "#f59e0b"]
}
components["EnhancedROICalculator"] = MockEnhancedROICalculator()
logger.info("โ
Mock EnhancedROICalculator created (preventing KeyError)")
# Try to import visualization engine
try:
from core.visualizations import EnhancedVisualizationEngine
components["EnhancedVisualizationEngine"] = EnhancedVisualizationEngine()
except ImportError:
class MockVisualizationEngine:
def create_executive_dashboard(self, data=None, is_real_arf=True):
return create_empty_plot("Executive Dashboard", is_real_arf)
def create_telemetry_plot(self, scenario_name, anomaly_detected=True, is_real_arf=True):
return create_simple_telemetry_plot(scenario_name, is_real_arf)
def create_impact_gauge(self, scenario_name, is_real_arf=True):
return create_simple_impact_plot(scenario_name, is_real_arf)
def create_timeline_comparison(self, is_real_arf=True):
return create_empty_plot("Timeline Comparison", is_real_arf)
components["EnhancedVisualizationEngine"] = MockVisualizationEngine()
components["all_available"] = True
components["error"] = None
logger.info("โ
Successfully imported all modular components including update_performance_metrics")
except Exception as e:
logger.error(f"โ IMPORT ERROR: {e}")
components["error"] = str(e)
components["all_available"] = False
# Ensure we have minimal components
if "gr" not in components:
import gradio as gr
components["gr"] = gr
if "INCIDENT_SCENARIOS" not in components:
components["INCIDENT_SCENARIOS"] = {
"Cache Miss Storm": {
"component": "Redis Cache Cluster",
"severity": "HIGH",
"business_impact": {"revenue_loss_per_hour": 8500},
"boundary_note": "OSS analysis only - execution requires Enterprise"
}
}
# Ensure EnhancedROICalculator exists
if "EnhancedROICalculator" not in components:
class MinimalROICalculator:
def calculate_comprehensive_roi(self, **kwargs):
return {
"status": "โ
Minimal ROI Calculation",
"summary": {"roi_multiplier": "5.2ร"}
}
components["EnhancedROICalculator"] = MinimalROICalculator()
# Ensure update_performance_metrics exists
if "update_performance_metrics" not in components:
def fallback_performance_metrics(scenario_name: str):
"""Fallback function if the real one fails"""
logger.warning(f"Using fallback performance metrics for {scenario_name}")
return (
"""
โฑ๏ธ
Detection Time
42s
โ 90% faster than average
""",
"""
โก
Mean Time to Resolve
14m
โ 70% faster than manual
""",
"""
๐ค
Auto-Heal Rate
78.9%
โ 5.0ร industry average
""",
"""
๐ฐ
Cost Saved
$7.2K
Per incident avoided
"""
)
components["update_performance_metrics"] = fallback_performance_metrics
return components
_components = None
_audit_manager = None
def get_components() -> Dict[str, Any]:
"""Lazy load components singleton"""
global _components
if _components is None:
_components = import_components()
return _components
# ===========================================
# AUDIT TRAIL MANAGER - FIXED: Returns DataFrames instead of HTML
# ===========================================
class AuditTrailManager:
"""Enhanced audit trail manager with boundary tracking - FIXED to return DataFrames"""
def __init__(self):
self.executions = []
self.incidents = []
self.boundary_crossings = []
self.max_items = settings.max_history_items
def add_execution(self, scenario_name: str, mode: str, result: Dict):
"""Add an execution record"""
record = {
"timestamp": datetime.datetime.now().isoformat(),
"scenario": scenario_name,
"mode": mode,
"result": result,
"boundary_context": "Enterprise execution simulated" if "simulated" in str(result) else "OSS advisory"
}
self.executions.insert(0, record)
if len(self.executions) > self.max_items:
self.executions = self.executions[:self.max_items]
# Track boundary crossing
if "enterprise" in mode.lower():
self.boundary_crossings.append({
"timestamp": record["timestamp"],
"from": "OSS",
"to": "Enterprise",
"action": scenario_name
})
logger.info(f"๐ Execution recorded: {scenario_name} ({mode})")
return record
def add_incident(self, scenario_name: str, analysis_result: Dict):
"""Add an incident analysis record"""
record = {
"timestamp": datetime.datetime.now().isoformat(),
"scenario": scenario_name,
"analysis": analysis_result,
"boundary_context": analysis_result.get("boundary_note", "OSS analysis")
}
self.incidents.insert(0, record)
if len(self.incidents) > self.max_items:
self.incidents = self.incidents[:self.max_items]
logger.info(f"๐ Incident analysis recorded: {scenario_name}")
return record
def get_execution_dataframe(self) -> pd.DataFrame:
"""
FIXED: Robust pandas DataFrame creation for Gradio DataFrame component
"""
try:
if not self.executions:
# Return empty DataFrame with correct columns
return pd.DataFrame(columns=[
"Execution ID", "Scenario", "Status", "Mode",
"Start Time", "End Time", "Duration", "Boundary"
])
# Build DataFrame from executions with safe access
data = []
for i, execution in enumerate(self.executions):
try:
# Safe access to nested dictionaries
result = execution.get("result", {})
# Execution ID - safe extraction with fallback
exec_id = result.get("execution_id", f"exec_{i:03d}")
# Status determination with multiple fallbacks
status_text = "Unknown"
if isinstance(result, dict):
status_lower = str(result.get("status", "")).lower()
if "success" in status_lower:
status_text = "Success"
elif "failed" in status_lower or "error" in status_lower:
status_text = "Failed"
else:
# Check if there's an error key
if result.get("error"):
status_text = "Failed"
else:
status_text = "Success"
# Mode extraction
mode = execution.get("mode", "unknown")
# Scenario extraction
scenario = execution.get("scenario", "Unknown")
# Timestamp formatting with validation
timestamp = execution.get("timestamp", "")
start_time = ""
if timestamp and len(timestamp) > 10:
try:
# Format: YYYY-MM-DD HH:MM:SS
start_time = timestamp[:19]
except Exception:
start_time = timestamp # Fallback to raw string
# End time extraction from telemetry
end_time = ""
telemetry = result.get("telemetry", {})
if telemetry:
end_timestamp = telemetry.get("end_time", "")
if end_timestamp and len(end_timestamp) > 10:
try:
end_time = end_timestamp[:19]
except Exception:
end_time = end_timestamp # Fallback
# Duration - mock or extract from execution
duration = "12m" # Default mock duration
if telemetry and "estimated_duration" in telemetry:
duration = telemetry.get("estimated_duration", "12m")
# Boundary context
boundary = execution.get("boundary_context", "Unknown")
data.append({
"Execution ID": exec_id,
"Scenario": scenario,
"Status": status_text,
"Mode": mode,
"Start Time": start_time,
"End Time": end_time,
"Duration": duration,
"Boundary": boundary
})
except Exception as row_error:
logger.warning(f"Error processing execution row {i}: {row_error}")
# Add error row for debugging
data.append({
"Execution ID": f"error_{i}",
"Scenario": "Error",
"Status": "Failed",
"Mode": "error",
"Start Time": datetime.datetime.now().isoformat()[:19],
"End Time": "",
"Duration": "0m",
"Boundary": "Error processing"
})
if not data:
logger.warning("No valid execution data found, returning empty DataFrame")
return pd.DataFrame(columns=[
"Execution ID", "Scenario", "Status", "Mode",
"Start Time", "End Time", "Duration", "Boundary"
])
# Create DataFrame
df = pd.DataFrame(data)
# Safe sorting - only if we have valid Start Time data
if not df.empty and "Start Time" in df.columns:
# Check if Start Time column has valid data
valid_times = df["Start Time"].apply(
lambda x: isinstance(x, str) and len(x) > 0 and x != "None"
)
if valid_times.any():
try:
# Sort by time (newest first)
df = df.sort_values("Start Time", ascending=False)
except Exception as sort_error:
logger.warning(f"Could not sort DataFrame: {sort_error}")
# Keep unsorted if sorting fails
else:
logger.debug("No valid timestamps for sorting")
logger.info(f"โ
Created execution DataFrame with {len(df)} rows")
return df
except Exception as e:
logger.error(f"โ Error creating execution DataFrame: {e}")
# Return informative error DataFrame
error_df = pd.DataFrame(columns=[
"Error", "Message", "Timestamp"
]).from_records([{
"Error": "DataFrame Creation Failed",
"Message": str(e),
"Timestamp": datetime.datetime.now().isoformat()[:19]
}])
return error_df
def get_incident_dataframe(self) -> pd.DataFrame:
"""
FIXED: Robust pandas DataFrame creation for Gradio DataFrame component
"""
try:
if not self.incidents:
# Return empty DataFrame with correct columns
return pd.DataFrame(columns=[
"Scenario", "Status", "Boundary", "Time",
"Confidence", "Action", "Target"
])
# Build DataFrame from incidents with safe access
data = []
for i, incident in enumerate(self.incidents):
try:
# Safe extraction of basic fields
scenario = incident.get("scenario", "Unknown")
boundary = incident.get("boundary_context", "OSS analysis")
# Analysis data extraction
analysis = incident.get("analysis", {})
# Status determination
status = "Analyzed"
if isinstance(analysis, dict):
analysis_status = analysis.get("status", "").lower()
if analysis_status:
status = analysis_status.capitalize()
else:
# Fallback status determination
if analysis.get("error"):
status = "Error"
elif analysis.get("analysis") or analysis.get("oss_analysis"):
status = "Success"
# Timestamp formatting
timestamp = incident.get("timestamp", "")
time_display = ""
if timestamp and len(timestamp) > 10:
try:
# Extract HH:MM:SS
time_display = timestamp[11:19]
except Exception:
time_display = timestamp[:8] if len(timestamp) >= 8 else timestamp
# Extract healing intent details with multiple fallback paths
confidence = 0.85 # Default confidence
action = "Analysis"
target = "system"
# Try multiple paths to find healing intent
healing_intent = None
# Path 1: oss_analysis -> analysis -> decision
oss_analysis = analysis.get("oss_analysis", {})
if isinstance(oss_analysis, dict):
oss_analysis_inner = oss_analysis.get("analysis", {})
if isinstance(oss_analysis_inner, dict):
healing_intent = oss_analysis_inner.get("decision", {})
# Path 2: direct analysis -> decision
if not healing_intent and isinstance(analysis.get("analysis", {}), dict):
healing_intent = analysis["analysis"].get("decision", {})
# Path 3: direct healing_intent
if not healing_intent:
healing_intent = analysis.get("healing_intent", {})
if healing_intent and isinstance(healing_intent, dict):
confidence = healing_intent.get("confidence", 0.85)
action = healing_intent.get("action", "Analysis")
target = healing_intent.get("target", "system")
# Format confidence as percentage
confidence_display = f"{confidence * 100:.1f}%"
data.append({
"Scenario": scenario,
"Status": status,
"Boundary": boundary,
"Time": time_display,
"Confidence": confidence_display,
"Action": action[:50], # Limit action length
"Target": target[:30] # Limit target length
})
except Exception as row_error:
logger.warning(f"Error processing incident row {i}: {row_error}")
# Add error row for debugging
data.append({
"Scenario": "Error",
"Status": "Failed",
"Boundary": "Error processing",
"Time": datetime.datetime.now().isoformat()[11:19],
"Confidence": "0.0%",
"Action": "Error",
"Target": "system"
})
if not data:
logger.warning("No valid incident data found, returning empty DataFrame")
return pd.DataFrame(columns=[
"Scenario", "Status", "Boundary", "Time",
"Confidence", "Action", "Target"
])
# Create DataFrame
df = pd.DataFrame(data)
# Safe sorting - only if we have valid Time data
if not df.empty and "Time" in df.columns:
# Check if Time column has valid data
valid_times = df["Time"].apply(
lambda x: isinstance(x, str) and len(x) > 0 and x != "None"
)
if valid_times.any():
try:
# Sort by time (newest first)
df = df.sort_values("Time", ascending=False)
except Exception as sort_error:
logger.warning(f"Could not sort incident DataFrame: {sort_error}")
# Keep unsorted if sorting fails
else:
logger.debug("No valid timestamps for sorting in incident DataFrame")
logger.info(f"โ
Created incident DataFrame with {len(df)} rows")
return df
except Exception as e:
logger.error(f"โ Error creating incident DataFrame: {e}")
# Return informative error DataFrame
error_df = pd.DataFrame(columns=[
"Error", "Message", "Timestamp"
]).from_records([{
"Error": "DataFrame Creation Failed",
"Message": str(e),
"Timestamp": datetime.datetime.now().isoformat()[:19]
}])
return error_df
def get_execution_table_html(self):
"""Legacy HTML method for backward compatibility"""
if not self.executions:
return """
๐ญ
No executions yet
Run scenarios to see execution history
"""
rows = []
for i, exec in enumerate(self.executions[:10]):
status = "โ
" if "success" in exec["result"].get("status", "").lower() else "โ ๏ธ"
boundary = exec["boundary_context"]
boundary_color = "#10b981" if "OSS" in boundary else "#8b5cf6"
rows.append(f"""
|
{status} {exec["scenario"]}
|
{exec["mode"]}
|
{boundary}
|
{exec["timestamp"][11:19]}
|
""")
return f"""
| Scenario |
Mode |
Boundary |
Time |
{''.join(rows)}
"""
def get_incident_table_html(self):
"""Legacy HTML method for backward compatibility"""
if not self.incidents:
return """
๐ญ
No incidents analyzed yet
Run OSS analysis to see incident history
"""
rows = []
for i, incident in enumerate(self.incidents[:10]):
scenario = incident["scenario"]
analysis = incident["analysis"]
boundary = incident["boundary_context"]
boundary_color = "#10b981" if "OSS" in boundary else "#8b5cf6"
rows.append(f"""
|
{scenario}
|
{analysis.get('status', 'analyzed')}
|
{boundary}
|
{incident["timestamp"][11:19]}
|
""")
return f"""
| Scenario |
Status |
Boundary |
Time |
{''.join(rows)}
"""
def clear(self):
"""Clear all audit trails"""
self.executions = []
self.incidents = []
self.boundary_crossings = []
logger.info("๐งน Audit trail cleared")
def export_json(self):
"""Export audit trail as JSON"""
return {
"executions": self.executions,
"incidents": self.incidents,
"boundary_crossings": self.boundary_crossings,
"export_time": datetime.datetime.now().isoformat(),
"version": "3.3.9",
"architecture": "OSS advises โ Enterprise executes"
}
def get_audit_manager() -> AuditTrailManager:
"""Lazy load audit manager singleton"""
global _audit_manager
if _audit_manager is None:
_audit_manager = AuditTrailManager()
return _audit_manager
# ===========================================
# HELPER FUNCTIONS
# ===========================================
def get_scenario_impact(scenario_name: str) -> float:
"""Get average impact for a given scenario"""
impact_map = {
"Cache Miss Storm": 8500,
"Database Connection Pool Exhaustion": 4200,
"Kubernetes Memory Leak": 5500,
"API Rate Limit Storm": 3800,
"Network Partition": 12000,
"Storage I/O Saturation": 6800
}
return impact_map.get(scenario_name, 5000)
def extract_roi_multiplier(roi_result: Dict) -> float:
"""Extract ROI multiplier from EnhancedROICalculator result"""
try:
if "summary" in roi_result and "roi_multiplier" in roi_result["summary"]:
roi_str = roi_result["summary"]["roi_multiplier"]
if "ร" in roi_str:
return float(roi_str.replace("ร", ""))
return float(roi_str)
return 5.2
except Exception as e:
logger.warning(f"Failed to extract ROI multiplier: {e}")
return 5.2
# ===========================================
# SURGICAL FIX: update_scenario_display() - ENHANCED WITH REALISM PANEL
# ===========================================
def update_scenario_display(scenario_name: str) -> tuple:
"""
ENHANCED: Returns Plotly figures AND realism panel
Returns 5 values: (scenario_card_html, telemetry_fig, impact_fig, timeline_fig, realism_html)
"""
components = get_components()
scenarios = components["INCIDENT_SCENARIOS"]
scenario = scenarios.get(scenario_name, {
"component": "Unknown System",
"severity": "MEDIUM",
"business_impact": {"revenue_loss_per_hour": 5000},
"boundary_note": "Scenario not found"
})
# Create scenario card HTML (unchanged)
severity_colors = {
"HIGH": "#ef4444",
"MEDIUM": "#f59e0b",
"LOW": "#10b981"
}
severity_color = severity_colors.get(scenario["severity"], "#64748b")
impact = scenario["business_impact"].get("revenue_loss_per_hour", get_scenario_impact(scenario_name))
scenario_card_html = f"""
{scenario_name}
{scenario["severity"]} SEVERITY
{scenario["component"]}
${impact:,}
Revenue Loss/Hour
Business Impact Analysis
${int(impact * 0.85):,}
Savings
Boundary Context: {scenario.get('boundary_note', 'OSS analyzes, Enterprise executes')}
"""
# Get visualizations as Plotly figures (ENHANCED)
telemetry_fig = create_simple_telemetry_plot(scenario_name, settings.use_true_arf)
impact_fig = create_simple_impact_plot(scenario_name, settings.use_true_arf)
timeline_fig = create_empty_plot(f"Timeline: {scenario_name}", settings.use_true_arf)
# ============ NEW: Create realism panel ============
try:
# Use the imported create_realism_panel function
realism_html = components["create_realism_panel"](scenario, scenario_name)
except (ImportError, KeyError):
# Fallback if realism function isn't available yet
realism_html = """
๐ง
Realism Panel Loading...
Trade-offs, risk assessments, and ranked actions will appear here
"""
logger.info(f"โ
Updated scenario display for {scenario_name} with realism panel")
# ============ CHANGE HERE: Add realism_html to return tuple ============
return scenario_card_html, telemetry_fig, impact_fig, timeline_fig, realism_html
# ===========================================
# ENHANCED: Combined update function for scenario display + performance metrics
# ===========================================
def update_scenario_display_with_metrics(scenario_name: str) -> tuple:
"""
Combined update function that returns both scenario display AND performance metrics.
Returns 9 values: (scenario_card, telemetry_viz, impact_viz, timeline_viz, realism_panel,
detection_time_html, mttr_html, auto_heal_html, savings_html)
"""
# Get the scenario display components (5 outputs)
scenario_card, telemetry_fig, impact_fig, timeline_fig, realism_html = update_scenario_display(scenario_name)
# Get performance metrics (4 outputs)
components = get_components()
detection_time_html, mttr_html, auto_heal_html, savings_html = components["update_performance_metrics"](scenario_name)
return (scenario_card, telemetry_fig, impact_fig, timeline_fig, realism_html,
detection_time_html, mttr_html, auto_heal_html, savings_html)
# ===========================================
# SURGICAL FIX: run_true_arf_analysis() - FIXED to return DataFrames
# ===========================================
@AsyncRunner.async_to_sync
async def run_true_arf_analysis(scenario_name: str) -> tuple:
"""
FIXED: Returns exactly 5 values as expected by UI:
1. detection_html (HTML string)
2. recall_html (HTML string)
3. decision_html (HTML string)
4. oss_results_dict (Python dict for JSON display)
5. incident_df (DataFrame for Gradio DataFrame component)
"""
components = get_components()
installation = get_installation_status()
boundaries = BoundaryManager.get_system_boundaries()
logger.info(f"๐ Running True ARF analysis for: {scenario_name}")
try:
# Get orchestrator
orchestrator = components["DemoOrchestrator"]()
# Get scenario data
scenarios = components["INCIDENT_SCENARIOS"]
scenario_data = scenarios.get(scenario_name, {})
# Run analysis
analysis_result = await orchestrator.analyze_incident(scenario_name, scenario_data)
# Add to audit trail
get_audit_manager().add_incident(scenario_name, analysis_result)
# Check if we have real ARF
is_real_arf = installation["oss_installed"] or settings.use_true_arf
# Create HTML for active agents
boundary_color = boundaries["oss"]["color"] if is_real_arf else "#f59e0b"
boundary_text = boundaries["oss"]["label"] if is_real_arf else "Mock ARF"
# Detection Agent HTML
detection_html = f"""
๐ต๏ธโโ๏ธ
Detection Agent
Anomaly detected with 94% confidence
Status: Active
DETECTED
"""
# Recall Agent HTML
recall_html = f"""
๐ง
Recall Agent
3 similar incidents found in RAG memory
Status: Active
RECALLED
"""
# Decision Agent HTML
decision_html = f"""
๐ฏ
Decision Agent
HealingIntent created: Scale Redis cluster
Status: Active
DECIDED
"""
# OSS Results Dict for JSON display
if is_real_arf and "real" in str(analysis_result).lower():
oss_results_dict = {
"status": "success",
"scenario": scenario_name,
"arf_version": "3.3.9",
"analysis": {
"detected": True,
"confidence": 94,
"similar_incidents": 3,
"healing_intent_created": True,
"recommended_action": "Scale Redis cluster from 3 to 5 nodes",
"estimated_recovery": "12 minutes"
},
"agents": {
"detection": {"status": "active", "confidence": 94},
"recall": {"status": "active", "similar_incidents": 3},
"decision": {"status": "active", "healing_intent_created": True}
},
"boundary_note": f"OSS analysis complete โ Ready for Enterprise execution"
}
else:
oss_results_dict = {
"status": "mock_analysis",
"scenario": scenario_name,
"arf_version": "mock",
"analysis": {
"detected": True,
"confidence": 94,
"similar_incidents": 3,
"healing_intent_created": True,
"recommended_action": "Scale Redis cluster from 3 to 5 nodes",
"estimated_recovery": "12 minutes"
},
"agents": {
"detection": {"status": "active", "confidence": 94},
"recall": {"status": "active", "similar_incidents": 3},
"decision": {"status": "active", "healing_intent_created": True}
},
"boundary_note": f"Mock analysis - {boundary_text}"
}
# Incident DataFrame (FIXED: Returns DataFrame instead of HTML)
incident_df = get_audit_manager().get_incident_dataframe()
return detection_html, recall_html, decision_html, oss_results_dict, incident_df
except Exception as e:
logger.error(f"True ARF analysis failed: {e}")
# Return error state with proper types
error_html = f"""
โ
Analysis Error
Failed to analyze incident
Status: Error
"""
error_dict = {
"status": "error",
"error": str(e),
"scenario": scenario_name,
"arf_version": "3.3.9",
"recommendation": "Check ARF installation"
}
# Return empty DataFrame on error
error_df = pd.DataFrame(columns=["Error", "Message"]).from_records([
{"Error": "Analysis Failed", "Message": str(e)}
])
return error_html, error_html, error_html, error_dict, error_df
# ===========================================
# FIXED EXECUTION FUNCTION - Returns DataFrames
# ===========================================
def execute_enterprise_healing(scenario_name, approval_required, mcp_mode_value):
"""
MINIMAL FIX: Returns proper data types matching UI expectations
FIXED: Returns DataFrame instead of HTML for execution table
"""
import gradio as gr
components = get_components()
installation = get_installation_status()
boundaries = BoundaryManager.get_system_boundaries()
logger.info(f"โก Executing enterprise healing for: {scenario_name}")
# Check if Enterprise is actually available
is_real_enterprise = installation["enterprise_installed"]
is_simulated = not is_real_enterprise
# Get scenario impact
scenario = components["INCIDENT_SCENARIOS"].get(scenario_name, {})
impact = scenario.get("business_impact", {})
revenue_loss = impact.get("revenue_loss_per_hour", get_scenario_impact(scenario_name))
savings = int(revenue_loss * 0.85)
# Create approval display HTML
if approval_required:
approval_display = """
โณ
HUMAN APPROVAL REQUIRED
Based on your safety settings, this execution requires human approval.
"""
else:
approval_display = """
โก
AUTONOMOUS APPROVAL GRANTED
Proceeding with autonomous execution.
"""
# Execute healing (async)
@AsyncRunner.async_to_sync
async def execute_async():
try:
orchestrator = components["DemoOrchestrator"]()
execution_result = await orchestrator.execute_healing(scenario_name, "autonomous")
# Add to audit trail
get_audit_manager().add_execution(scenario_name, "enterprise_autonomous", execution_result)
return execution_result
except Exception as e:
logger.error(f"Execution failed: {e}")
return {
"status": "failed",
"error": str(e),
"boundary_note": "Execution boundary reached"
}
execution_result = execute_async()
# Create results dict for JSON display
if is_real_enterprise:
enterprise_results = {
"demo_mode": "Real Enterprise",
"scenario": scenario_name,
"arf_version": boundaries["enterprise"]["version"],
"execution_mode": "autonomous" if not approval_required else "human_approved",
"results": {
"recovery_time": "12 minutes",
"cost_saved": f"${savings:,}",
"users_protected": "45,000"
},
"safety_features": [
"Rollback guarantee: 100%",
"Atomic execution",
"MCP validation"
]
}
else:
enterprise_results = {
"demo_mode": "Enterprise Simulation",
"scenario": scenario_name,
"arf_version": boundaries["enterprise"]["version"],
"execution_mode": "simulated_autonomous",
"results": {
"recovery_time": "12 minutes (simulated)",
"cost_saved": f"${savings:,} (simulated)",
"users_protected": "45,000 (simulated)"
},
"safety_features": [
"Rollback guarantee: 100% (simulated)",
"Atomic execution (simulated)"
]
}
# Get execution DataFrame (FIXED: Returns DataFrame instead of HTML)
execution_df = get_audit_manager().get_execution_dataframe()
return approval_display, enterprise_results, execution_df
# ===========================================
# FIXED ROI FUNCTION - Enhanced for Gradio
# ===========================================
def calculate_roi(scenario_name, monthly_incidents, team_size):
"""
ENHANCED: Returns (JSON/dict, Plotly figure) for ROI calculation with Gradio compatibility
"""
components = get_components()
try:
# Try to use real ROI calculator
calculator = components["EnhancedROICalculator"]
roi_result = calculator.calculate_comprehensive_roi(
scenario_name=scenario_name,
monthly_incidents=monthly_incidents,
team_size=team_size
)
except Exception as e:
logger.warning(f"ROI calculation failed, using mock: {e}")
# Mock ROI calculation
impact_per_incident = get_scenario_impact(scenario_name)
annual_impact = impact_per_incident * monthly_incidents * 12
potential_savings = int(annual_impact * 0.82)
enterprise_cost = 625000
roi_multiplier = round(potential_savings / enterprise_cost, 1)
payback_months = round((enterprise_cost / (potential_savings / 12)), 1)
roi_result = {
"status": "โ
Calculated Successfully",
"summary": {
"your_annual_impact": f"${annual_impact:,}",
"potential_savings": f"${potential_savings:,}",
"enterprise_cost": f"${enterprise_cost:,}",
"roi_multiplier": f"{roi_multiplier}ร",
"payback_months": f"{payback_months}",
"annual_roi_percentage": f"{int((potential_savings - enterprise_cost) / enterprise_cost * 100)}%",
"boundary_context": "Based on OSS analysis + simulated Enterprise execution"
},
"boundary_note": "ROI calculation includes OSS advisory value and simulated Enterprise execution benefits"
}
# Create ROI chart as Plotly figure (ENHANCED for Gradio)
categories = ['Without ARF', 'With ARF', 'Net Savings']
annual_impact_val = impact_per_incident * monthly_incidents * 12 if 'impact_per_incident' in locals() else 1000000
potential_savings_val = potential_savings if 'potential_savings' in locals() else 820000
enterprise_cost_val = enterprise_cost if 'enterprise_cost' in locals() else 625000
values = [annual_impact_val, annual_impact_val - potential_savings_val, potential_savings_val - enterprise_cost_val]
fig = go.Figure(data=[
go.Bar(
name='Cost',
x=categories,
y=values,
marker_color=['#ef4444', '#10b981', '#8b5cf6']
)
])
fig.update_layout(
title={
'text': f"ROI Analysis: {scenario_name}",
'font': dict(size=18, color='#1e293b', family="Arial, sans-serif")
},
height=400,
plot_bgcolor='white',
paper_bgcolor='white',
showlegend=False,
margin=dict(l=40, r=20, t=60, b=40)
)
logger.info(f"โ
Created ROI plot for {scenario_name}")
# Return both the dict and the Plotly figure
return roi_result, fig
# ===========================================
# CREATE DEMO INTERFACE - UPDATED WITH REALISM PANEL INTEGRATION
# ===========================================
def create_demo_interface():
"""Create demo interface using modular components with boundary awareness and realism panel"""
import gradio as gr
# Get components
components = get_components()
# Get CSS styles
css_styles = components["get_styles"]()
# Store CSS for later use in launch()
global _demo_css
_demo_css = css_styles
# Get boundary badges for the interface
boundary_badges = BoundaryManager.get_boundary_badges()
# Create interface without css parameter (will be added in launch)
with gr.Blocks(
title=f"๐ ARF Investor Demo v3.3.9 - TRUE ARF OSS Integration"
) as demo:
# Header
header_html = components["create_header"]("3.3.9")
# Status bar with boundary badges
status_html = components["create_status_bar"]()
# ============ 5 TABS ============
with gr.Tabs(elem_classes="tab-nav"):
# TAB 1: Live Incident Demo - NOW WITH REALISM PANEL
with gr.TabItem("๐ฅ Live Incident Demo", id="tab1"):
(scenario_dropdown, scenario_card, telemetry_viz, impact_viz,
workflow_header, detection_agent, recall_agent, decision_agent,
oss_section, enterprise_section, oss_btn, enterprise_btn,
approval_toggle, mcp_mode, timeline_viz, realism_panel,
detection_time, mttr, auto_heal, savings,
oss_results_display, enterprise_results_display, approval_display, demo_btn) = components["create_tab1_incident_demo"]()
# TAB 2: Business ROI
with gr.TabItem("๐ฐ Business Impact & ROI", id="tab2"):
(dashboard_output, roi_scenario_dropdown, monthly_slider, team_slider,
calculate_btn, roi_output, roi_chart) = components["create_tab2_business_roi"](components["INCIDENT_SCENARIOS"])
# TAB 3: Enterprise Features
with gr.TabItem("๐ข Enterprise Features", id="tab3"):
(license_display, validate_btn, trial_btn, upgrade_btn,
mcp_mode_tab3, mcp_mode_info, features_table, integrations_table) = components["create_tab3_enterprise_features"]()
# TAB 4: Audit Trail
with gr.TabItem("๐ Audit Trail & History", id="tab4"):
(refresh_btn, clear_btn, export_btn, execution_table,
incident_table, export_text) = components["create_tab4_audit_trail"]()
# TAB 5: Learning Engine
with gr.TabItem("๐ง Learning Engine", id="tab5"):
(learning_graph, graph_type, show_labels, search_query, search_btn,
clear_btn_search, search_results, stats_display, patterns_display,
performance_display) = components["create_tab5_learning_engine"]()
# Footer
footer_html = components["create_footer"]()
# ============ EVENT HANDLERS ============
# Update scenario display when dropdown changes - NOW INCLUDES PERFORMANCE METRICS
scenario_dropdown.change(
fn=update_scenario_display_with_metrics, # โ Changed to combined function
inputs=[scenario_dropdown],
outputs=[
scenario_card, telemetry_viz, impact_viz, timeline_viz, realism_panel,
detection_time, mttr, auto_heal, savings # 4 new outputs for metrics
]
)
# Run OSS Analysis - FIXED: Now returns DataFrame for incident_table
oss_btn.click(
fn=run_true_arf_analysis,
inputs=[scenario_dropdown],
outputs=[
detection_agent, recall_agent, decision_agent,
oss_results_display, incident_table
]
)
# Execute Enterprise Healing - FIXED: Now returns DataFrame for execution_table
enterprise_btn.click(
fn=execute_enterprise_healing,
inputs=[scenario_dropdown, approval_toggle, mcp_mode],
outputs=[approval_display, enterprise_results_display, execution_table]
)
# Run Complete Demo with boundary progression
@AsyncRunner.async_to_sync
async def run_complete_demo_async(scenario_name):
"""Run a complete demo walkthrough with true ARF and boundary awareness"""
# Step 1: Update scenario with metrics
update_result = update_scenario_display_with_metrics(scenario_name)
# Step 2: Run true ARF analysis
oss_result = await run_true_arf_analysis(scenario_name)
# Step 3: Execute Enterprise (simulation) with boundary context
await asyncio.sleep(1)
scenario = components["INCIDENT_SCENARIOS"].get(scenario_name, {})
impact = scenario.get("business_impact", {})
revenue_loss = impact.get("revenue_loss_per_hour", get_scenario_impact(scenario_name))
savings_amount = int(revenue_loss * 0.85)
# Get boundary context
boundaries = BoundaryManager.get_system_boundaries()
# Get orchestrator for execution simulation
orchestrator = components["DemoOrchestrator"]()
execution_result = await orchestrator.execute_healing(scenario_name, "autonomous")
enterprise_results = {
"demo_mode": "Complete Walkthrough",
"scenario": scenario_name,
"arf_version": "3.3.9",
"true_oss_used": True,
"enterprise_simulated": True,
"boundary_progression": [
f"1. Incident detected - {boundaries['oss']['label']}",
f"2. OSS analysis completed - {boundaries['oss']['label']}",
f"3. HealingIntent created - {boundaries['oss']['label']}",
f"4. Enterprise license validated ({boundaries['enterprise']['label']})",
f"5. Autonomous execution simulated ({boundaries['enterprise']['label']}+)",
f"6. Outcome recorded in RAG memory"
],
"execution_result": execution_result,
"outcome": {
"recovery_time": "12 minutes",
"manual_comparison": "45 minutes",
"cost_saved": f"${savings_amount:,}",
"users_protected": "45,000",
"learning": "Pattern added to RAG memory"
},
"architectural_summary": f"This demonstrates the complete ARF v3.3.9 architecture: {boundaries['oss']['label']} for advisory analysis โ {boundaries['enterprise']['label']} for autonomous execution"
}
# Create demo completion message with enhanced boundary context
demo_message = f"""
โ
Complete Demo: Architecture Validated
ARF v3.3.9 โข OSS advises โ Enterprise executes
BOUNDARY VALIDATED
{boundaries['oss']['label']}
โข Anomaly detected in 45s
โข 3 similar incidents recalled
โข 94% confidence healing plan
โข Apache 2.0 license validated
{boundaries['enterprise']['label']}
โข Autonomous execution simulated
โข Rollback guarantee: 100%
โข 12min vs 45min recovery
โข ${savings_amount:,} saved
๐๏ธ Architecture Flow
Cost Saved
${savings_amount:,}
โ
Architecture Successfully Validated
Clear separation maintained: OSS for advisory intelligence, Enterprise for autonomous execution
"""
# Update the enterprise_results_display to include demo completion info
enterprise_results["demo_completion_message"] = demo_message
# Get updated DataFrames (FIXED: Returns DataFrames)
incident_df = get_audit_manager().get_incident_dataframe()
execution_df = get_audit_manager().get_execution_dataframe()
# Combine all results
return (
*update_result, # 9 outputs: scenario_card, telemetry_viz, impact_viz, timeline_viz, realism_panel, detection_time, mttr, auto_heal, savings
*oss_result[:3], # 3 outputs: detection_agent, recall_agent, decision_agent
oss_result[3], # 1 output: oss_results_display
enterprise_results, # 1 output: enterprise_results_display
demo_message, # 1 output: approval_display
incident_df, # 1 output: incident_table (DataFrame)
execution_df # 1 output: execution_table (DataFrame)
)
# FIXED: demo_btn.click with correct output count
demo_btn.click(
fn=run_complete_demo_async,
inputs=[scenario_dropdown],
outputs=[
scenario_card, telemetry_viz, impact_viz, timeline_viz, realism_panel,
detection_time, mttr, auto_heal, savings, # 9
detection_agent, recall_agent, decision_agent, # 3
oss_results_display, # 1
enterprise_results_display, # 1
approval_display, # 1
incident_table, # 1
execution_table # 1
]
)
# ROI Calculation
calculate_btn.click(
fn=calculate_roi,
inputs=[roi_scenario_dropdown, monthly_slider, team_slider],
outputs=[roi_output, roi_chart]
)
# Update ROI scenario - FIXED: Use the EnhancedROICalculator
roi_scenario_dropdown.change(
fn=lambda x: get_components()["EnhancedROICalculator"].calculate_comprehensive_roi(scenario_name=x),
inputs=[roi_scenario_dropdown],
outputs=[roi_output]
)
# Update ROI chart
monthly_slider.change(
fn=lambda x, y: calculate_roi(roi_scenario_dropdown.value, x, y)[1],
inputs=[monthly_slider, team_slider],
outputs=[roi_chart]
)
team_slider.change(
fn=lambda x, y: calculate_roi(roi_scenario_dropdown.value, x, y)[1],
inputs=[monthly_slider, team_slider],
outputs=[roi_chart]
)
# Audit Trail Functions - FIXED: Returns DataFrames
def refresh_audit_trail():
"""Refresh audit trail tables - FIXED to return DataFrames"""
return (
get_audit_manager().get_execution_dataframe(), # DataFrame
get_audit_manager().get_incident_dataframe() # DataFrame
)
def clear_audit_trail():
"""Clear audit trail - FIXED to return empty DataFrames"""
get_audit_manager().clear()
# Return empty DataFrames with correct columns
exec_df = pd.DataFrame(columns=["Execution ID", "Scenario", "Status", "Mode", "Start Time"])
incident_df = pd.DataFrame(columns=["Scenario", "Status", "Boundary", "Time"])
return exec_df, incident_df
def export_audit_trail():
"""Export audit trail as JSON"""
audit_data = {
"executions": get_audit_manager().executions,
"incidents": get_audit_manager().incidents,
"boundary_crossings": get_audit_manager().boundary_crossings,
"export_time": datetime.datetime.now().isoformat(),
"arf_version": "3.3.9",
"architecture": "OSS advises โ Enterprise executes"
}
return json.dumps(audit_data, indent=2)
refresh_btn.click(
fn=refresh_audit_trail,
inputs=[],
outputs=[execution_table, incident_table]
)
clear_btn.click(
fn=clear_audit_trail,
inputs=[],
outputs=[execution_table, incident_table]
)
export_btn.click(
fn=export_audit_trail,
inputs=[],
outputs=[export_text]
)
# Enterprise Features
def validate_license():
"""Validate enterprise license with boundary context"""
boundaries = BoundaryManager.get_system_boundaries()
if boundaries["enterprise"]["available"]:
return {
"status": "โ
Valid License",
"license_type": "Enterprise",
"version": boundaries["enterprise"]["version"],
"expires": "2025-12-31",
"capabilities": boundaries["enterprise"]["capabilities"],
"boundary_context": f"Real {boundaries['enterprise']['label']} detected"
}
else:
return {
"status": "โ ๏ธ Demo Mode",
"license_type": "Simulated",
"version": boundaries["enterprise"]["version"],
"expires": "Demo only",
"capabilities": boundaries["enterprise"]["capabilities"],
"boundary_context": f"Simulating {boundaries['enterprise']['label']} - requires license",
"contact": "sales@arf.dev"
}
validate_btn.click(
fn=validate_license,
inputs=[],
outputs=[license_display]
)
# Load default scenario - UPDATE outputs with realism_panel AND performance metrics
demo.load(
fn=lambda: update_scenario_display_with_metrics(settings.default_scenario),
inputs=[],
outputs=[
scenario_card, telemetry_viz, impact_viz, timeline_viz, realism_panel,
detection_time, mttr, auto_heal, savings
]
)
# Load ROI data
demo.load(
fn=lambda: calculate_roi(settings.default_scenario, 15, 5),
inputs=[],
outputs=[roi_output, roi_chart]
)
logger.info("โ
Demo interface created successfully with boundary awareness, realism panel, and dynamic performance metrics")
return demo
# ===========================================
# LAUNCH FUNCTION
# ===========================================
def launch_demo():
"""Launch the demo application with proper configuration"""
try:
logger.info("๐ Starting ARF Ultimate Investor Demo v3.3.9 - ENTERPRISE EDITION")
# Check installation
installation = get_installation_status()
boundaries = BoundaryManager.get_system_boundaries()
logger.info("=" * 60)
logger.info("๐๏ธ SYSTEM ARCHITECTURE BOUNDARIES:")
logger.info(f" OSS: {boundaries['oss']['label']} v{boundaries['oss']['version']}")
logger.info(f" Enterprise: {boundaries['enterprise']['label']} v{boundaries['enterprise']['version']}")
logger.info(f" Mode: {boundaries['demo_mode']['architecture']}")
logger.info("=" * 60)
# Create interface
demo = create_demo_interface()
# Get CSS styles
components = get_components()
css_styles = components["get_styles"]()
# Configure for Hugging Face Spaces
launch_config = {
"server_name": "0.0.0.0",
"server_port": 7860,
"share": False,
"favicon_path": None,
"quiet": False,
"show_error": True,
"debug": False,
"max_threads": 40,
}
# Add CSS if available
if css_styles:
launch_config["css"] = css_styles
logger.info("โ
Launch configuration ready")
return demo, launch_config
except Exception as e:
logger.error(f"โ Launch failed: {e}", exc_info=True)
# Create minimal fallback interface
import gradio as gr
with gr.Blocks(title="ARF Demo - Fallback Mode") as fallback_demo:
gr.HTML(f"""
๐จ ARF Demo Failed to Start
Error: {str(e)}
Troubleshooting Steps:
- Check logs for detailed error
- Ensure all dependencies are installed
- Try: pip install agentic-reliability-framework==3.3.9
- Restart the application
""")
return fallback_demo, {"server_name": "0.0.0.0", "server_port": 7860}
# ===========================================
# MAIN EXECUTION
# ===========================================
if __name__ == "__main__":
try:
logger.info("๐ ARF Ultimate Investor Demo v3.3.9 - ENTERPRISE EDITION")
logger.info("=" * 60)
logger.info("Enhanced with clear OSS vs Enterprise boundaries")
logger.info("Now with Realism Panel for enterprise-seasoned SRE experience")
logger.info("PHASE 2: Dynamic Performance Metrics by Scenario Type")
logger.info(f"True ARF OSS v3.3.9 integration with simulated Enterprise execution")
logger.info("=" * 60)
# Launch the demo
demo, config = launch_demo()
print("\n" + "="*60)
print("๐ ARF Ultimate Investor Demo v3.3.9 - ENTERPRISE EDITION")
print("๐ Architecture: OSS advises โ Enterprise executes")
print("๐ญ Phase 2: Dynamic Performance Metrics by Scenario")
print("๐ Starting on http://localhost:7860")
print("="*60 + "\n")
# Launch with error handling
try:
demo.launch(**config)
except Exception as launch_error:
logger.error(f"โ Launch error: {launch_error}")
# Try alternative launch without CSS
if "css" in config:
logger.info("โ ๏ธ Retrying without CSS...")
config.pop("css", None)
demo.launch(**config)
else:
# Last resort: simple launch
demo.launch(server_name="0.0.0.0", server_port=7860)
except KeyboardInterrupt:
logger.info("๐ Demo stopped by user")
except Exception as e:
logger.error(f"โ Fatal error: {e}", exc_info=True)
sys.exit(1)