# app.py - Complete fixed version
# ๐ ARF Ultimate Investor Demo v3.8.0 - ENTERPRISE EDITION
# ENHANCED VERSION WITH CLEAR BOUNDARIES AND RELIABLE VISUALIZATIONS
# Fixed to show clear OSS vs Enterprise boundaries with architectural honesty
import logging
import sys
import traceback
import json
import datetime
import asyncio
import time
import random
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# ===========================================
# CONFIGURE LOGGING FIRST
# ===========================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('arf_demo.log')
]
)
logger = logging.getLogger(__name__)
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
# ===========================================
# FIX FOR ASYNC EVENT LOOP ISSUES
# ===========================================
try:
import nest_asyncio
nest_asyncio.apply()
logger.info("โ
Applied nest_asyncio for async event loop compatibility")
except ImportError:
logger.warning("โ ๏ธ nest_asyncio not available, async operations may have issues")
# ===========================================
# IMPORT UTILITY CLASSES FIRST
# ===========================================
from utils.installation import InstallationHelper
from demo.guidance import DemoPsychologyController, get_demo_controller
# ===========================================
# BOUNDARY MANAGEMENT SYSTEM
# ===========================================
class BoundaryManager:
"""Manages clear boundaries between OSS and Enterprise"""
@staticmethod
def get_system_boundaries():
"""Get current system boundaries"""
installation = get_installation_status()
return {
"oss": {
"available": installation["oss_installed"],
"version": installation["oss_version"] or "mock",
"label": installation["badges"]["oss"]["text"],
"color": installation["badges"]["oss"]["color"],
"icon": installation["badges"]["oss"]["icon"],
"capabilities": ["advisory_analysis", "rag_search", "healing_intent"],
"license": "Apache 2.0"
},
"enterprise": {
"available": installation["enterprise_installed"],
"version": installation["enterprise_version"] or "simulated",
"label": installation["badges"]["enterprise"]["text"],
"color": installation["badges"]["enterprise"]["color"],
"icon": installation["badges"]["enterprise"]["icon"],
"capabilities": ["autonomous_execution", "rollback_guarantee", "mcp_integration", "enterprise_support"],
"license": "Commercial"
},
"demo_mode": {
"active": True,
"architecture": "OSS advises โ Enterprise executes",
"boundary_visible": settings.show_boundaries
}
}
@staticmethod
def get_boundary_badges() -> str:
"""Get HTML badges showing system boundaries"""
boundaries = BoundaryManager.get_system_boundaries()
return f"""
{boundaries['oss']['icon']}
{boundaries['oss']['label']}
Apache 2.0 โข Advisory Intelligence
{boundaries['enterprise']['icon']}
{boundaries['enterprise']['label']}
Commercial โข Autonomous Execution
๐๏ธ
Architecture Boundary
OSS advises โ Enterprise executes
"""
@staticmethod
def create_boundary_indicator(action: str, is_simulated: bool = True) -> str:
"""Create clear execution boundary indicator"""
if is_simulated:
return f"""
๐ญ
SIMULATED ENTERPRISE EXECUTION
Action: {action}
Mode: Enterprise Simulation (not real execution)
Boundary: OSS advises โ Enterprise would execute
DEMO BOUNDARY
In production, Enterprise edition would execute against real infrastructure
"""
else:
return f"""
โก
REAL ENTERPRISE EXECUTION
Action: {action}
Mode: Enterprise Autonomous
Boundary: Real execution with safety guarantees
ENTERPRISE+
"""
# ===========================================
# ASYNC UTILITIES
# ===========================================
class AsyncRunner:
"""Enhanced async runner with better error handling"""
@staticmethod
def run_async(coro):
"""Run async coroutine in sync context"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
except Exception as e:
logger.error(f"Async execution failed: {e}")
return {"error": str(e), "status": "failed", "boundary_note": "Execution boundary reached"}
@staticmethod
def async_to_sync(async_func):
"""Decorator to convert async function to sync"""
def wrapper(*args, **kwargs):
try:
return AsyncRunner.run_async(async_func(*args, **kwargs))
except Exception as e:
logger.error(f"Async to sync conversion failed: {e}")
return {"error": str(e), "status": "failed", "boundary_context": "OSS advisory only - execution requires Enterprise"}
return wrapper
# ===========================================
# SIMPLE SETTINGS - FIXED: Added missing attribute
# ===========================================
class Settings:
"""Simple settings class - FIXED: Added default_savings_rate"""
def __init__(self):
self.arf_mode = "demo"
self.use_true_arf = True
self.default_scenario = "Cache Miss Storm"
self.max_history_items = 100
self.auto_refresh_seconds = 30
self.show_boundaries = True
self.architectural_honesty = True
self.engineer_annual_cost = 200000
self.default_savings_rate = 0.25 # FIXED: Added missing attribute
settings = Settings()
# ===========================================
# ARF INSTALLATION CHECK - FIXED VERSION
# ===========================================
def check_arf_installation():
"""Check if real ARF packages are installed - Fixed version"""
results = {
"oss_installed": False,
"enterprise_installed": False,
"oss_version": None,
"enterprise_version": None,
"oss_edition": "unknown",
"oss_license": "unknown",
"execution_allowed": False,
"recommendations": [],
"boundaries": {
"oss_can": ["advisory_analysis", "rag_search", "healing_intent"],
"oss_cannot": ["execute", "modify_infra", "autonomous_healing"],
"enterprise_requires": ["license", "infra_access", "safety_controls"]
},
"badges": {
"oss": {"text": "โ ๏ธ Mock ARF", "color": "#f59e0b", "icon": "โ ๏ธ"},
"enterprise": {"text": "๐ Enterprise Required", "color": "#64748b", "icon": "๐"}
},
"timestamp": datetime.datetime.now().isoformat()
}
# Check OSS package using InstallationHelper
installation_helper = InstallationHelper()
status = installation_helper.check_installation()
results["oss_installed"] = status["oss_installed"]
results["oss_version"] = status["oss_version"]
results["enterprise_installed"] = status["enterprise_installed"]
results["enterprise_version"] = status["enterprise_version"]
results["recommendations"] = status["recommendations"]
if results["oss_installed"]:
results["badges"]["oss"] = {
"text": f"โ
ARF OSS v{results['oss_version']}",
"color": "#10b981",
"icon": "โ
"
}
logger.info(f"โ
ARF OSS v{results['oss_version']} detected")
else:
logger.info("โ ๏ธ ARF OSS not installed - using mock mode")
if results["enterprise_installed"]:
results["badges"]["enterprise"] = {
"text": f"๐ Enterprise v{results['enterprise_version']}",
"color": "#8b5cf6",
"icon": "๐"
}
logger.info(f"โ
ARF Enterprise v{results['enterprise_version']} detected")
else:
logger.info("โ ๏ธ ARF Enterprise not installed - using simulation")
return results
_installation_status = None
def get_installation_status():
"""Get cached installation status"""
global _installation_status
if _installation_status is None:
_installation_status = check_arf_installation()
return _installation_status
# ===========================================
# FIXED VISUALIZATION FUNCTIONS - MINIMAL FIXES ONLY
# ===========================================
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import numpy as np
# ===========================================
# SURGICAL FIX 1: create_simple_telemetry_plot()
# ===========================================
def create_simple_telemetry_plot(scenario_name: str, is_real_arf: bool = True) -> go.Figure:
"""
MINIMAL FIX: Returns Plotly figure instead of HTML string
FIXED: Removed font weight properties, returns valid Plotly figure
"""
try:
# Generate sample telemetry data
times = pd.date_range(start=datetime.datetime.now() - datetime.timedelta(minutes=10),
end=datetime.datetime.now(),
periods=60)
# Different patterns based on scenario
if "Cache" in scenario_name:
normal_values = np.random.normal(30, 5, 30).tolist()
anomaly_values = np.random.normal(85, 10, 30).tolist()
data = normal_values + anomaly_values
title = f"Cache Hit Rate: {scenario_name}"
y_label = "Hit Rate (%)"
threshold = 75
elif "Database" in scenario_name:
normal_values = np.random.normal(15, 3, 30).tolist()
anomaly_values = np.random.normal(95, 5, 30).tolist()
data = normal_values + anomaly_values
title = f"Database Connections: {scenario_name}"
y_label = "Connections (%)"
threshold = 90
elif "Kubernetes" in scenario_name:
normal_values = np.random.normal(40, 8, 30).tolist()
anomaly_values = np.random.normal(95, 2, 30).tolist()
data = normal_values + anomaly_values
title = f"Memory Usage: {scenario_name}"
y_label = "Memory (%)"
threshold = 85
else:
normal_values = np.random.normal(50, 10, 30).tolist()
anomaly_values = np.random.normal(90, 5, 30).tolist()
data = normal_values + anomaly_values
title = f"System Metrics: {scenario_name}"
y_label = "Metric (%)"
threshold = 80
# Create Plotly figure
fig = go.Figure()
# Add normal region
fig.add_trace(go.Scatter(
x=times[:30],
y=data[:30],
mode='lines',
name='Normal',
line=dict(color='#10b981', width=3)
))
# Add anomaly region
fig.add_trace(go.Scatter(
x=times[30:],
y=data[30:],
mode='lines',
name='Anomaly',
line=dict(color='#ef4444', width=3)
))
# Add threshold line - FIXED: Simplified without problematic properties
fig.add_hline(y=threshold, line_dash="dash",
line_color="#f59e0b")
# Update layout - FIXED: Using only 'size' not 'weight' in font
fig.update_layout(
title={
'text': title,
'font': {'size': 18, 'color': '#1e293b'},
'x': 0.5
},
xaxis_title="Time",
yaxis_title=y_label,
height=300,
margin=dict(l=20, r=20, t=40, b=20),
plot_bgcolor='white',
showlegend=True
)
return fig
except Exception as e:
logger.error(f"Error creating telemetry plot: {e}")
# Return empty figure as fallback
fig = go.Figure()
fig.update_layout(
title="Error loading telemetry",
height=300,
plot_bgcolor='white'
)
return fig
# ===========================================
# SURGICAL FIX 2: create_simple_impact_plot()
# ===========================================
def create_simple_impact_plot(scenario_name: str, is_real_arf: bool = True) -> go.Figure:
"""
MINIMAL FIX: Returns Plotly figure (gauge chart) instead of HTML string
FIXED: Removed problematic font properties, simplified gauge
"""
try:
# Impact values based on scenario
impact_values = {
"Cache Miss Storm": 8500,
"Database Connection Pool Exhaustion": 4200,
"Kubernetes Memory Leak": 5500,
"API Rate Limit Storm": 3800,
"Network Partition": 12000,
"Storage I/O Saturation": 6800
}
impact = impact_values.get(scenario_name, 5000)
# Create gauge chart - FIXED: Simplified without problematic properties
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=impact,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': f"Revenue Impact: ${impact:,}/hour"},
number={'prefix': "$", 'suffix': "/hour"},
gauge={
'axis': {'range': [None, impact * 1.2]},
'bar': {'color': "#ef4444"},
'bgcolor': "white",
'borderwidth': 2,
'bordercolor': "gray",
'steps': [
{'range': [0, impact * 0.3], 'color': '#10b981'},
{'range': [impact * 0.3, impact * 0.7], 'color': '#f59e0b'},
{'range': [impact * 0.7, impact], 'color': '#ef4444'}
]
}
))
# Update layout - FIXED: Simplified layout
fig.update_layout(
height=400,
margin=dict(l=20, r=20, t=60, b=20),
paper_bgcolor='white'
)
return fig
except Exception as e:
logger.error(f"Error creating impact plot: {e}")
# Return empty gauge as fallback
fig = go.Figure(go.Indicator(
mode="gauge",
value=0,
title="Error loading impact data"
))
fig.update_layout(height=400)
return fig
# ===========================================
# SURGICAL FIX 3: create_empty_plot()
# ===========================================
def create_empty_plot(title: str, is_real_arf: bool = True) -> go.Figure:
"""
MINIMAL FIX: Returns Plotly figure (placeholder) instead of HTML string
FIXED: Simplified font properties
"""
fig = go.Figure()
# Add text annotation - FIXED: Simplified font
fig.add_annotation(
x=0.5, y=0.5,
text=title,
showarrow=False,
font={'size': 16, 'color': "#64748b"},
xref="paper",
yref="paper"
)
fig.update_layout(
title={
'text': "Visualization Placeholder",
'font': {'size': 14, 'color': "#94a3b8"}
},
height=300,
plot_bgcolor='white',
xaxis={'visible': False},
yaxis={'visible': False},
margin=dict(l=20, r=20, t=40, b=20)
)
return fig
# Keep the HTML fallback functions for other uses
def create_html_telemetry_fallback(scenario_name: str, is_real_arf: bool) -> str:
"""HTML fallback for telemetry visualization (unchanged)"""
severity_colors = {
"Cache Miss Storm": "#f59e0b",
"Database Connection Pool Exhaustion": "#ef4444",
"Kubernetes Memory Leak": "#8b5cf6",
"API Rate Limit Storm": "#ec4899",
"Network Partition": "#14b8a6",
"Storage I/O Saturation": "#84cc16"
}
color = severity_colors.get(scenario_name, "#64748b")
boundary_indicator = "๐ข ENTERPRISE" if is_real_arf else "๐ OSS ONLY"
return f"""
{boundary_indicator}
๐ Telemetry: {scenario_name}
Real-time metrics showing anomalous behavior pattern detection.
ARF analyzes 45+ data points per second.
Boundary: This visualization shows {'real' if is_real_arf else 'simulated'}
telemetry analysis. {'Enterprise' if is_real_arf else 'OSS'} edition provides enhanced
anomaly detection.
"""
def create_html_impact_fallback(scenario_name: str, is_real_arf: bool) -> str:
"""HTML fallback for impact visualization (unchanged)"""
impact_values = {
"Cache Miss Storm": 8500,
"Database Connection Pool Exhaustion": 4200,
"Kubernetes Memory Leak": 5500,
"API Rate Limit Storm": 3800,
"Network Partition": 12000,
"Storage I/O Saturation": 6800
}
impact = impact_values.get(scenario_name, 5000)
savings = int(impact * 0.85)
boundary_text = "Enterprise Autonomous" if is_real_arf else "OSS Advisory"
boundary_color = "#8b5cf6" if is_real_arf else "#10b981"
return f"""
๐ฐ Business Impact Analysis
{boundary_text}
${impact:,}
Revenue Loss/Hour
$0
${impact//2:,}
${impact:,}
Without ARF
45 min
Mean time to resolve
With ARF
12 min
Autonomous recovery
๐
Potential ROI: 5.2ร
ARF saves 85% of potential revenue loss through autonomous recovery
Boundary Context: {'Enterprise' if is_real_arf else 'OSS'} analysis shows
{'real' if is_real_arf else 'simulated'} impact metrics.
{'Commercial license enables autonomous execution.' if is_real_arf else 'Upgrade to Enterprise for autonomous recovery.'}
"""
def get_inactive_agent_html(agent_name: str, description: str, is_real_arf: bool = False):
"""Get HTML for inactive agent state with boundary indicators (unchanged)"""
boundary_color = "#8b5cf6" if is_real_arf else "#10b981"
status_color = "#64748b"
return f"""
{description}
Requires { 'Enterprise' if is_real_arf else 'OSS' } activation
"""
# ===========================================
# IMPORT MODULAR COMPONENTS - FIXED: Added MockEnhancedROICalculator
# ===========================================
def import_components() -> Dict[str, Any]:
"""Safely import all components with proper error handling - FIXED: Added mock ROI calculator"""
components = {
"all_available": False,
"error": None,
"get_styles": lambda: "",
"show_boundaries": settings.show_boundaries,
}
try:
logger.info("Starting component import...")
# First, import gradio
import gradio as gr
components["gr"] = gr
# Import UI styles
from ui.styles import get_styles
components["get_styles"] = get_styles
# Import UI components
from ui.components import (
create_header, create_status_bar, create_tab1_incident_demo,
create_tab2_business_roi, create_tab3_enterprise_features,
create_tab4_audit_trail, create_tab5_learning_engine,
create_footer
)
components.update({
"create_header": create_header,
"create_status_bar": create_status_bar,
"create_tab1_incident_demo": create_tab1_incident_demo,
"create_tab2_business_roi": create_tab2_business_roi,
"create_tab3_enterprise_features": create_tab3_enterprise_features,
"create_tab4_audit_trail": create_tab4_audit_trail,
"create_tab5_learning_engine": create_tab5_learning_engine,
"create_footer": create_footer,
})
# Import scenarios
from demo.scenarios import INCIDENT_SCENARIOS
components["INCIDENT_SCENARIOS"] = INCIDENT_SCENARIOS
# Try to import TrueARF337Orchestrator
try:
from core.true_arf_orchestrator import TrueARF337Orchestrator
components["DemoOrchestrator"] = TrueARF337Orchestrator
except ImportError:
# Fallback to real ARF integration
try:
from core.real_arf_integration import RealARFIntegration
components["DemoOrchestrator"] = RealARFIntegration
except ImportError:
# Create a minimal mock orchestrator
class MockOrchestrator:
async def analyze_incident(self, scenario_name, scenario_data):
return {
"status": "mock",
"scenario": scenario_name,
"message": "Mock analysis (no real ARF available)",
"boundary_note": "OSS advisory mode - execution requires Enterprise",
"demo_display": {
"real_arf_version": "mock",
"true_oss_used": False,
"enterprise_simulated": True,
"architectural_boundary": "OSS advises โ Enterprise would execute"
}
}
async def execute_healing(self, scenario_name, mode="autonomous"):
return {
"status": "mock",
"scenario": scenario_name,
"message": "Mock execution (no real ARF available)",
"boundary_note": "Simulated Enterprise execution - real execution requires infrastructure",
"enterprise_features_used": ["simulated_execution", "mock_rollback", "demo_mode"]
}
components["DemoOrchestrator"] = MockOrchestrator
# FIXED: EnhancedROICalculator with proper mock fallback
try:
from core.calculators import EnhancedROICalculator
components["EnhancedROICalculator"] = EnhancedROICalculator()
logger.info("โ
Real EnhancedROICalculator loaded")
except ImportError:
# Create comprehensive mock ROI calculator
class MockEnhancedROICalculator:
"""Mock ROI calculator for demo purposes - FIXED to prevent KeyError"""
def calculate_comprehensive_roi(self, scenario_name=None, monthly_incidents=15, team_size=5, **kwargs):
"""Calculate comprehensive ROI metrics with realistic mock data"""
from datetime import datetime
# Mock ROI calculation with realistic values
impact_map = {
"Cache Miss Storm": 8500,
"Database Connection Pool Exhaustion": 4200,
"Kubernetes Memory Leak": 5500,
"API Rate Limit Storm": 3800,
"Network Partition": 12000,
"Storage I/O Saturation": 6800
}
impact_per_incident = impact_map.get(scenario_name or "Cache Miss Storm", 5000)
annual_impact = impact_per_incident * monthly_incidents * 12
potential_savings = int(annual_impact * 0.82)
enterprise_cost = 625000
roi_multiplier = round(potential_savings / enterprise_cost, 1)
payback_months = round((enterprise_cost / (potential_savings / 12)), 1)
return {
"status": "โ
Calculated Successfully",
"scenario": scenario_name or "Cache Miss Storm",
"timestamp": datetime.now().isoformat(),
"calculator": "MockEnhancedROICalculator",
"summary": {
"your_annual_impact": f"${annual_impact:,}",
"potential_savings": f"${potential_savings:,}",
"enterprise_cost": f"${enterprise_cost:,}",
"roi_multiplier": f"{roi_multiplier}ร",
"payback_months": f"{payback_months}",
"annual_roi_percentage": f"{int((potential_savings - enterprise_cost) / enterprise_cost * 100)}%",
"boundary_context": "Based on OSS analysis + simulated Enterprise execution"
},
"breakdown": {
"direct_cost_savings": f"${int(potential_savings * 0.7):,}",
"productivity_gains": f"${int(potential_savings * 0.2):,}",
"risk_reduction": f"${int(potential_savings * 0.1):,}"
},
"annual_projection": {
"incidents_prevented": monthly_incidents * 12,
"annual_savings": f"${potential_savings:,}",
"roi": f"{roi_multiplier}ร"
},
"notes": [
"๐ ROI calculation using mock data",
"๐ก Real enterprise ROI includes additional factors",
"๐ Full ROI requires Enterprise edition",
f"๐ Based on {monthly_incidents} incidents/month"
]
}
def get_roi_visualization_data(self):
"""Get data for ROI visualization"""
return {
"labels": ["Direct Savings", "Productivity", "Risk Reduction", "Upsell"],
"values": [65, 20, 10, 5],
"colors": ["#10b981", "#3b82f6", "#8b5cf6", "#f59e0b"]
}
components["EnhancedROICalculator"] = MockEnhancedROICalculator()
logger.info("โ
Mock EnhancedROICalculator created (preventing KeyError)")
# Try to import visualization engine
try:
from core.visualizations import EnhancedVisualizationEngine
components["EnhancedVisualizationEngine"] = EnhancedVisualizationEngine()
except ImportError:
class MockVisualizationEngine:
def create_executive_dashboard(self, data=None, is_real_arf=True):
return create_empty_plot("Executive Dashboard", is_real_arf)
def create_telemetry_plot(self, scenario_name, anomaly_detected=True, is_real_arf=True):
return create_simple_telemetry_plot(scenario_name, is_real_arf)
def create_impact_gauge(self, scenario_name, is_real_arf=True):
return create_simple_impact_plot(scenario_name, is_real_arf)
def create_timeline_comparison(self, is_real_arf=True):
return create_empty_plot("Timeline Comparison", is_real_arf)
components["EnhancedVisualizationEngine"] = MockVisualizationEngine()
components["all_available"] = True
components["error"] = None
logger.info("โ
Successfully imported all modular components")
except Exception as e:
logger.error(f"โ IMPORT ERROR: {e}")
components["error"] = str(e)
components["all_available"] = False
# Ensure we have minimal components
if "gr" not in components:
import gradio as gr
components["gr"] = gr
if "INCIDENT_SCENARIOS" not in components:
components["INCIDENT_SCENARIOS"] = {
"Cache Miss Storm": {
"component": "Redis Cache Cluster",
"severity": "HIGH",
"business_impact": {"revenue_loss_per_hour": 8500},
"boundary_note": "OSS analysis only - execution requires Enterprise"
}
}
# Ensure EnhancedROICalculator exists
if "EnhancedROICalculator" not in components:
class MinimalROICalculator:
def calculate_comprehensive_roi(self, **kwargs):
return {
"status": "โ
Minimal ROI Calculation",
"summary": {"roi_multiplier": "5.2ร"}
}
components["EnhancedROICalculator"] = MinimalROICalculator()
return components
_components = None
_audit_manager = None
def get_components() -> Dict[str, Any]:
"""Lazy load components singleton"""
global _components
if _components is None:
_components = import_components()
return _components
# ===========================================
# AUDIT TRAIL MANAGER - FIXED: Returns DataFrames instead of HTML
# ===========================================
class AuditTrailManager:
"""Enhanced audit trail manager with boundary tracking - FIXED to return DataFrames"""
def __init__(self):
self.executions = []
self.incidents = []
self.boundary_crossings = []
self.max_items = settings.max_history_items
def add_execution(self, scenario_name: str, mode: str, result: Dict):
"""Add an execution record"""
record = {
"timestamp": datetime.datetime.now().isoformat(),
"scenario": scenario_name,
"mode": mode,
"result": result,
"boundary_context": "Enterprise execution simulated" if "simulated" in str(result) else "OSS advisory"
}
self.executions.insert(0, record)
if len(self.executions) > self.max_items:
self.executions = self.executions[:self.max_items]
# Track boundary crossing
if "enterprise" in mode.lower():
self.boundary_crossings.append({
"timestamp": record["timestamp"],
"from": "OSS",
"to": "Enterprise",
"action": scenario_name
})
logger.info(f"๐ Execution recorded: {scenario_name} ({mode})")
return record
def add_incident(self, scenario_name: str, analysis_result: Dict):
"""Add an incident analysis record"""
record = {
"timestamp": datetime.datetime.now().isoformat(),
"scenario": scenario_name,
"analysis": analysis_result,
"boundary_context": analysis_result.get("boundary_note", "OSS analysis")
}
self.incidents.insert(0, record)
if len(self.incidents) > self.max_items:
self.incidents = self.incidents[:self.max_items]
logger.info(f"๐ Incident analysis recorded: {scenario_name}")
return record
def get_execution_dataframe(self) -> pd.DataFrame:
"""
FIXED: Returns pandas DataFrame for Gradio DataFrame component
"""
try:
if not self.executions:
# Return empty DataFrame with correct columns
return pd.DataFrame(columns=[
"Execution ID", "Scenario", "Status", "Mode",
"Start Time", "End Time", "Duration", "Boundary"
])
# Build DataFrame from executions
data = []
for i, execution in enumerate(self.executions):
# Extract execution ID from result or generate one
exec_id = execution.get("result", {}).get("execution_id", f"exec_{i}")
status = "Success" if "success" in str(execution.get("result", {})).lower() else "Failed"
mode = execution.get("mode", "unknown")
scenario = execution.get("scenario", "Unknown")
timestamp = execution.get("timestamp", "")
boundary = execution.get("boundary_context", "Unknown")
# Extract end time from telemetry if available
end_time = execution.get("result", {}).get("telemetry", {}).get("end_time", "")
duration = "12m" # Mock duration
data.append({
"Execution ID": exec_id,
"Scenario": scenario,
"Status": status,
"Mode": mode,
"Start Time": timestamp[:19] if timestamp else "", # Format: YYYY-MM-DD HH:MM:SS
"End Time": end_time[:19] if end_time else "",
"Duration": duration,
"Boundary": boundary
})
df = pd.DataFrame(data)
# Sort by time (newest first)
if not df.empty and "Start Time" in df.columns:
df = df.sort_values("Start Time", ascending=False)
return df
except Exception as e:
logger.error(f"Error creating execution DataFrame: {e}")
# Return empty DataFrame as fallback
return pd.DataFrame(columns=[
"Error", "Message"
]).from_records([{"Error": "DataFrame Error", "Message": str(e)}])
def get_incident_dataframe(self) -> pd.DataFrame:
"""
FIXED: Returns pandas DataFrame for Gradio DataFrame component
"""
try:
if not self.incidents:
# Return empty DataFrame with correct columns
return pd.DataFrame(columns=[
"Scenario", "Status", "Boundary", "Time",
"Confidence", "Action", "Target"
])
# Build DataFrame from incidents
data = []
for i, incident in enumerate(self.incidents):
scenario = incident.get("scenario", "Unknown")
analysis = incident.get("analysis", {})
status = analysis.get("status", "analyzed").capitalize()
boundary = incident.get("boundary_context", "OSS analysis")
timestamp = incident.get("timestamp", "")
time_display = timestamp[11:19] if len(timestamp) > 11 else ""
# Extract analysis details
healing_intent = analysis.get("oss_analysis", {}).get("analysis", {}).get("decision", {})
confidence = healing_intent.get("confidence", 0.85)
action = healing_intent.get("action", "Analysis")
target = healing_intent.get("target", "system")
data.append({
"Scenario": scenario,
"Status": status,
"Boundary": boundary,
"Time": time_display,
"Confidence": f"{confidence * 100:.1f}%",
"Action": action,
"Target": target
})
df = pd.DataFrame(data)
# Sort by time (newest first)
if not df.empty and "Time" in df.columns:
df = df.sort_values("Time", ascending=False)
return df
except Exception as e:
logger.error(f"Error creating incident DataFrame: {e}")
# Return empty DataFrame as fallback
return pd.DataFrame(columns=[
"Error", "Message"
]).from_records([{"Error": "DataFrame Error", "Message": str(e)}])
def get_execution_table_html(self):
"""Legacy HTML method for backward compatibility"""
if not self.executions:
return """
๐ญ
No executions yet
Run scenarios to see execution history
"""
rows = []
for i, exec in enumerate(self.executions[:10]):
status = "โ
" if "success" in exec["result"].get("status", "").lower() else "โ ๏ธ"
boundary = exec["boundary_context"]
boundary_color = "#10b981" if "OSS" in boundary else "#8b5cf6"
rows.append(f"""
|
{status} {exec["scenario"]}
|
{exec["mode"]}
|
{boundary}
|
{exec["timestamp"][11:19]}
|
""")
return f"""
| Scenario |
Mode |
Boundary |
Time |
{''.join(rows)}
"""
def get_incident_table_html(self):
"""Legacy HTML method for backward compatibility"""
if not self.incidents:
return """
๐ญ
No incidents analyzed yet
Run OSS analysis to see incident history
"""
rows = []
for i, incident in enumerate(self.incidents[:10]):
scenario = incident["scenario"]
analysis = incident["analysis"]
boundary = incident["boundary_context"]
boundary_color = "#10b981" if "OSS" in boundary else "#8b5cf6"
rows.append(f"""
|
{scenario}
|
{analysis.get('status', 'analyzed')}
|
{boundary}
|
{incident["timestamp"][11:19]}
|
""")
return f"""
| Scenario |
Status |
Boundary |
Time |
{''.join(rows)}
"""
def clear(self):
"""Clear all audit trails"""
self.executions = []
self.incidents = []
self.boundary_crossings = []
logger.info("๐งน Audit trail cleared")
def export_json(self):
"""Export audit trail as JSON"""
return {
"executions": self.executions,
"incidents": self.incidents,
"boundary_crossings": self.boundary_crossings,
"export_time": datetime.datetime.now().isoformat(),
"version": "3.3.7",
"architecture": "OSS advises โ Enterprise executes"
}
def get_audit_manager() -> AuditTrailManager:
"""Lazy load audit manager singleton"""
global _audit_manager
if _audit_manager is None:
_audit_manager = AuditTrailManager()
return _audit_manager
# ===========================================
# HELPER FUNCTIONS
# ===========================================
def get_scenario_impact(scenario_name: str) -> float:
"""Get average impact for a given scenario"""
impact_map = {
"Cache Miss Storm": 8500,
"Database Connection Pool Exhaustion": 4200,
"Kubernetes Memory Leak": 5500,
"API Rate Limit Storm": 3800,
"Network Partition": 12000,
"Storage I/O Saturation": 6800
}
return impact_map.get(scenario_name, 5000)
def extract_roi_multiplier(roi_result: Dict) -> float:
"""Extract ROI multiplier from EnhancedROICalculator result"""
try:
if "summary" in roi_result and "roi_multiplier" in roi_result["summary"]:
roi_str = roi_result["summary"]["roi_multiplier"]
if "ร" in roi_str:
return float(roi_str.replace("ร", ""))
return float(roi_str)
return 5.2
except Exception as e:
logger.warning(f"Failed to extract ROI multiplier: {e}")
return 5.2
# ===========================================
# SURGICAL FIX 4: update_scenario_display()
# ===========================================
def update_scenario_display(scenario_name: str) -> tuple:
"""
FIXED: Returns exactly 4 values as expected by UI:
1. scenario_card_html (HTML string)
2. telemetry_fig (Plotly figure from create_simple_telemetry_plot())
3. impact_fig (Plotly figure from create_simple_impact_plot())
4. timeline_fig (Plotly figure from create_empty_plot())
"""
components = get_components()
scenarios = components["INCIDENT_SCENARIOS"]
scenario = scenarios.get(scenario_name, {
"component": "Unknown System",
"severity": "MEDIUM",
"business_impact": {"revenue_loss_per_hour": 5000},
"boundary_note": "Scenario not found"
})
# Create scenario card HTML (unchanged)
severity_colors = {
"HIGH": "#ef4444",
"MEDIUM": "#f59e0b",
"LOW": "#10b981"
}
severity_color = severity_colors.get(scenario["severity"], "#64748b")
impact = scenario["business_impact"].get("revenue_loss_per_hour", get_scenario_impact(scenario_name))
scenario_card_html = f"""
{scenario_name}
{scenario["severity"]} SEVERITY
{scenario["component"]}
${impact:,}
Revenue Loss/Hour
Business Impact Analysis
${int(impact * 0.85):,}
Savings
Boundary Context: {scenario.get('boundary_note', 'OSS analyzes, Enterprise executes')}
"""
# Get visualizations as Plotly figures (FIXED)
telemetry_fig = create_simple_telemetry_plot(scenario_name, settings.use_true_arf)
impact_fig = create_simple_impact_plot(scenario_name, settings.use_true_arf)
timeline_fig = create_empty_plot(f"Timeline: {scenario_name}", settings.use_true_arf)
return scenario_card_html, telemetry_fig, impact_fig, timeline_fig
# ===========================================
# SURGICAL FIX 5: run_true_arf_analysis() - FIXED to return DataFrames
# ===========================================
@AsyncRunner.async_to_sync
async def run_true_arf_analysis(scenario_name: str) -> tuple:
"""
FIXED: Returns exactly 5 values as expected by UI:
1. detection_html (HTML string)
2. recall_html (HTML string)
3. decision_html (HTML string)
4. oss_results_dict (Python dict for JSON display)
5. incident_df (DataFrame for Gradio DataFrame component)
"""
components = get_components()
installation = get_installation_status()
boundaries = BoundaryManager.get_system_boundaries()
logger.info(f"๐ Running True ARF analysis for: {scenario_name}")
try:
# Get orchestrator
orchestrator = components["DemoOrchestrator"]()
# Get scenario data
scenarios = components["INCIDENT_SCENARIOS"]
scenario_data = scenarios.get(scenario_name, {})
# Run analysis
analysis_result = await orchestrator.analyze_incident(scenario_name, scenario_data)
# Add to audit trail
get_audit_manager().add_incident(scenario_name, analysis_result)
# Check if we have real ARF
is_real_arf = installation["oss_installed"] or settings.use_true_arf
# Create HTML for active agents
boundary_color = boundaries["oss"]["color"] if is_real_arf else "#f59e0b"
boundary_text = boundaries["oss"]["label"] if is_real_arf else "Mock ARF"
# Detection Agent HTML
detection_html = f"""
๐ต๏ธโโ๏ธ
Detection Agent
Anomaly detected with 94% confidence
Status: Active
DETECTED
"""
# Recall Agent HTML
recall_html = f"""
๐ง
Recall Agent
3 similar incidents found in RAG memory
Status: Active
RECALLED
"""
# Decision Agent HTML
decision_html = f"""
๐ฏ
Decision Agent
HealingIntent created: Scale Redis cluster
Status: Active
DECIDED
"""
# OSS Results Dict for JSON display
if is_real_arf and "real" in str(analysis_result).lower():
oss_results_dict = {
"status": "success",
"scenario": scenario_name,
"arf_version": boundaries["oss"]["version"],
"analysis": {
"detected": True,
"confidence": 94,
"similar_incidents": 3,
"healing_intent_created": True,
"recommended_action": "Scale Redis cluster from 3 to 5 nodes",
"estimated_recovery": "12 minutes"
},
"agents": {
"detection": {"status": "active", "confidence": 94},
"recall": {"status": "active", "similar_incidents": 3},
"decision": {"status": "active", "healing_intent_created": True}
},
"boundary_note": f"OSS analysis complete โ Ready for Enterprise execution"
}
else:
oss_results_dict = {
"status": "mock_analysis",
"scenario": scenario_name,
"arf_version": "mock",
"analysis": {
"detected": True,
"confidence": 94,
"similar_incidents": 3,
"healing_intent_created": True,
"recommended_action": "Scale Redis cluster from 3 to 5 nodes",
"estimated_recovery": "12 minutes"
},
"agents": {
"detection": {"status": "active", "confidence": 94},
"recall": {"status": "active", "similar_incidents": 3},
"decision": {"status": "active", "healing_intent_created": True}
},
"boundary_note": f"Mock analysis - {boundary_text}"
}
# Incident DataFrame (FIXED: Returns DataFrame instead of HTML)
incident_df = get_audit_manager().get_incident_dataframe()
return detection_html, recall_html, decision_html, oss_results_dict, incident_df
except Exception as e:
logger.error(f"True ARF analysis failed: {e}")
# Return error state with proper types
error_html = f"""
โ
Analysis Error
Failed to analyze incident
Status: Error
"""
error_dict = {
"status": "error",
"error": str(e),
"scenario": scenario_name,
"arf_version": "3.3.9",
"recommendation": "Check ARF installation"
}
# Return empty DataFrame on error
error_df = pd.DataFrame(columns=["Error", "Message"]).from_records([
{"Error": "Analysis Failed", "Message": str(e)}
])
return error_html, error_html, error_html, error_dict, error_df
# ===========================================
# FIXED EXECUTION FUNCTION - Returns DataFrames
# ===========================================
def execute_enterprise_healing(scenario_name, approval_required, mcp_mode_value):
"""
MINIMAL FIX: Returns proper data types matching UI expectations
FIXED: Returns DataFrame instead of HTML for execution table
"""
import gradio as gr
components = get_components()
installation = get_installation_status()
boundaries = BoundaryManager.get_system_boundaries()
logger.info(f"โก Executing enterprise healing for: {scenario_name}")
# Check if Enterprise is actually available
is_real_enterprise = installation["enterprise_installed"]
is_simulated = not is_real_enterprise
# Get scenario impact
scenario = components["INCIDENT_SCENARIOS"].get(scenario_name, {})
impact = scenario.get("business_impact", {})
revenue_loss = impact.get("revenue_loss_per_hour", get_scenario_impact(scenario_name))
savings = int(revenue_loss * 0.85)
# Create approval display HTML
if approval_required:
approval_display = """
โณ
HUMAN APPROVAL REQUIRED
Based on your safety settings, this execution requires human approval.
"""
else:
approval_display = """
โก
AUTONOMOUS APPROVAL GRANTED
Proceeding with autonomous execution.
"""
# Execute healing (async)
@AsyncRunner.async_to_sync
async def execute_async():
try:
orchestrator = components["DemoOrchestrator"]()
execution_result = await orchestrator.execute_healing(scenario_name, "autonomous")
# Add to audit trail
get_audit_manager().add_execution(scenario_name, "enterprise_autonomous", execution_result)
return execution_result
except Exception as e:
logger.error(f"Execution failed: {e}")
return {
"status": "failed",
"error": str(e),
"boundary_note": "Execution boundary reached"
}
execution_result = execute_async()
# Create results dict for JSON display
if is_real_enterprise:
enterprise_results = {
"demo_mode": "Real Enterprise",
"scenario": scenario_name,
"arf_version": boundaries["enterprise"]["version"],
"execution_mode": "autonomous" if not approval_required else "human_approved",
"results": {
"recovery_time": "12 minutes",
"cost_saved": f"${savings:,}",
"users_protected": "45,000"
},
"safety_features": [
"Rollback guarantee: 100%",
"Atomic execution",
"MCP validation"
]
}
else:
enterprise_results = {
"demo_mode": "Enterprise Simulation",
"scenario": scenario_name,
"arf_version": boundaries["enterprise"]["version"],
"execution_mode": "simulated_autonomous",
"results": {
"recovery_time": "12 minutes (simulated)",
"cost_saved": f"${savings:,} (simulated)",
"users_protected": "45,000 (simulated)"
},
"safety_features": [
"Rollback guarantee: 100% (simulated)",
"Atomic execution (simulated)"
]
}
# Get execution DataFrame (FIXED: Returns DataFrame instead of HTML)
execution_df = get_audit_manager().get_execution_dataframe()
return approval_display, enterprise_results, execution_df
# ===========================================
# FIXED ROI FUNCTION
# ===========================================
def calculate_roi(scenario_name, monthly_incidents, team_size):
"""
MINIMAL FIX: Returns (JSON/dict, Plotly figure) for ROI calculation
"""
components = get_components()
try:
# Try to use real ROI calculator
calculator = components["EnhancedROICalculator"]
roi_result = calculator.calculate_comprehensive_roi(
scenario_name=scenario_name,
monthly_incidents=monthly_incidents,
team_size=team_size
)
except Exception as e:
logger.warning(f"ROI calculation failed, using mock: {e}")
# Mock ROI calculation
impact_per_incident = get_scenario_impact(scenario_name)
annual_impact = impact_per_incident * monthly_incidents * 12
potential_savings = int(annual_impact * 0.82)
enterprise_cost = 625000
roi_multiplier = round(potential_savings / enterprise_cost, 1)
payback_months = round((enterprise_cost / (potential_savings / 12)), 1)
roi_result = {
"status": "โ
Calculated Successfully",
"summary": {
"your_annual_impact": f"${annual_impact:,}",
"potential_savings": f"${potential_savings:,}",
"enterprise_cost": f"${enterprise_cost:,}",
"roi_multiplier": f"{roi_multiplier}ร",
"payback_months": f"{payback_months}",
"annual_roi_percentage": f"{int((potential_savings - enterprise_cost) / enterprise_cost * 100)}%",
"boundary_context": "Based on OSS analysis + simulated Enterprise execution"
},
"boundary_note": "ROI calculation includes OSS advisory value and simulated Enterprise execution benefits"
}
# Create ROI chart as Plotly figure (FIXED)
categories = ['Without ARF', 'With ARF', 'Net Savings']
annual_impact_val = impact_per_incident * monthly_incidents * 12 if 'impact_per_incident' in locals() else 1000000
potential_savings_val = potential_savings if 'potential_savings' in locals() else 820000
enterprise_cost_val = enterprise_cost if 'enterprise_cost' in locals() else 625000
values = [annual_impact_val, annual_impact_val - potential_savings_val, potential_savings_val - enterprise_cost_val]
fig = go.Figure(data=[
go.Bar(
name='Cost',
x=categories,
y=values,
marker_color=['#ef4444', '#10b981', '#8b5cf6']
)
])
fig.update_layout(
title=f"ROI Analysis: {scenario_name}",
height=400,
showlegend=False
)
# Return both the dict and the Plotly figure
return roi_result, fig
# ===========================================
# CREATE DEMO INTERFACE - UNCHANGED except for DataFrame fixes
# ===========================================
def create_demo_interface():
"""Create demo interface using modular components with boundary awareness"""
import gradio as gr
# Get components
components = get_components()
# Get CSS styles
css_styles = components["get_styles"]()
# Store CSS for later use in launch()
global _demo_css
_demo_css = css_styles
# Get boundary badges for the interface
boundary_badges = BoundaryManager.get_boundary_badges()
# Create interface without css parameter (will be added in launch)
with gr.Blocks(
title=f"๐ ARF Investor Demo v3.8.0 - TRUE ARF v3.3.7"
) as demo:
# Header
header_html = components["create_header"]("3.3.7", settings.use_true_arf)
# Status bar with boundary badges
status_html = components["create_status_bar"]()
# Add boundary badges as a separate element
boundary_display = gr.HTML(value=boundary_badges, visible=settings.show_boundaries)
# ============ 5 TABS ============
with gr.Tabs(elem_classes="tab-nav"):
# TAB 1: Live Incident Demo
with gr.TabItem("๐ฅ Live Incident Demo", id="tab1"):
(scenario_dropdown, scenario_card, telemetry_viz, impact_viz,
workflow_header, detection_agent, recall_agent, decision_agent,
oss_section, enterprise_section, oss_btn, enterprise_btn,
approval_toggle, mcp_mode, timeline_viz,
detection_time, mttr, auto_heal, savings,
oss_results_display, enterprise_results_display, approval_display, demo_btn) = components["create_tab1_incident_demo"]()
# TAB 2: Business ROI
with gr.TabItem("๐ฐ Business Impact & ROI", id="tab2"):
(dashboard_output, roi_scenario_dropdown, monthly_slider, team_slider,
calculate_btn, roi_output, roi_chart) = components["create_tab2_business_roi"](components["INCIDENT_SCENARIOS"])
# TAB 3: Enterprise Features
with gr.TabItem("๐ข Enterprise Features", id="tab3"):
(license_display, validate_btn, trial_btn, upgrade_btn,
mcp_mode_tab3, mcp_mode_info, features_table, integrations_table) = components["create_tab3_enterprise_features"]()
# TAB 4: Audit Trail
with gr.TabItem("๐ Audit Trail & History", id="tab4"):
(refresh_btn, clear_btn, export_btn, execution_table,
incident_table, export_text) = components["create_tab4_audit_trail"]()
# TAB 5: Learning Engine
with gr.TabItem("๐ง Learning Engine", id="tab5"):
(learning_graph, graph_type, show_labels, search_query, search_btn,
clear_btn_search, search_results, stats_display, patterns_display,
performance_display) = components["create_tab5_learning_engine"]()
# Footer
footer_html = components["create_footer"]()
# ============ EVENT HANDLERS ============
# Update scenario display when dropdown changes
scenario_dropdown.change(
fn=update_scenario_display,
inputs=[scenario_dropdown],
outputs=[scenario_card, telemetry_viz, impact_viz, timeline_viz]
)
# Run OSS Analysis - FIXED: Now returns DataFrame for incident_table
oss_btn.click(
fn=run_true_arf_analysis,
inputs=[scenario_dropdown],
outputs=[
detection_agent, recall_agent, decision_agent,
oss_results_display, incident_table
]
)
# Execute Enterprise Healing - FIXED: Now returns DataFrame for execution_table
enterprise_btn.click(
fn=execute_enterprise_healing,
inputs=[scenario_dropdown, approval_toggle, mcp_mode],
outputs=[approval_display, enterprise_results_display, execution_table]
)
# Run Complete Demo with boundary progression
@AsyncRunner.async_to_sync
async def run_complete_demo_async(scenario_name):
"""Run a complete demo walkthrough with true ARF and boundary awareness"""
# Step 1: Update scenario
update_result = update_scenario_display(scenario_name)
# Step 2: Run true ARF analysis
oss_result = await run_true_arf_analysis(scenario_name)
# Step 3: Execute Enterprise (simulation) with boundary context
await asyncio.sleep(1)
scenario = components["INCIDENT_SCENARIOS"].get(scenario_name, {})
impact = scenario.get("business_impact", {})
revenue_loss = impact.get("revenue_loss_per_hour", get_scenario_impact(scenario_name))
savings = int(revenue_loss * 0.85)
# Get boundary context
boundaries = BoundaryManager.get_system_boundaries()
# Get orchestrator for execution simulation
orchestrator = components["DemoOrchestrator"]()
execution_result = await orchestrator.execute_healing(scenario_name, "autonomous")
enterprise_results = {
"demo_mode": "Complete Walkthrough",
"scenario": scenario_name,
"arf_version": "3.3.7",
"true_oss_used": True,
"enterprise_simulated": True,
"boundary_progression": [
f"1. Incident detected - {boundaries['oss']['label']}",
f"2. OSS analysis completed - {boundaries['oss']['label']}",
f"3. HealingIntent created - {boundaries['oss']['label']}",
f"4. Enterprise license validated ({boundaries['enterprise']['label']})",
f"5. Autonomous execution simulated ({boundaries['enterprise']['label']}+)",
f"6. Outcome recorded in RAG memory"
],
"execution_result": execution_result,
"outcome": {
"recovery_time": "12 minutes",
"manual_comparison": "45 minutes",
"cost_saved": f"${savings:,}",
"users_protected": "45,000",
"learning": "Pattern added to RAG memory"
},
"architectural_summary": f"This demonstrates the complete ARF v3.3.7 architecture: {boundaries['oss']['label']} for advisory analysis โ {boundaries['enterprise']['label']} for autonomous execution"
}
# Create demo completion message with enhanced boundary context
demo_message = f"""
โ
Complete Demo: Architecture Validated
ARF v3.3.7 โข OSS advises โ Enterprise executes
BOUNDARY VALIDATED
{boundaries['oss']['label']}
โข Anomaly detected in 45s
โข 3 similar incidents recalled
โข 94% confidence healing plan
โข Apache 2.0 license validated
{boundaries['enterprise']['label']}
โข Autonomous execution simulated
โข Rollback guarantee: 100%
โข 12min vs 45min recovery
โข ${savings:,} saved
๐๏ธ Architecture Flow
โ
Architecture Successfully Validated
Clear separation maintained: OSS for advisory intelligence, Enterprise for autonomous execution
"""
# Update the enterprise_results_display to include demo completion info
enterprise_results["demo_completion_message"] = demo_message
# Get updated DataFrames (FIXED: Returns DataFrames)
incident_df = get_audit_manager().get_incident_dataframe()
execution_df = get_audit_manager().get_execution_dataframe()
# Combine all results
return (
*update_result, # 4 outputs: scenario_card, telemetry_viz, impact_viz, timeline_viz
*oss_result[:3], # 3 outputs: detection_agent, recall_agent, decision_agent
oss_result[3], # 1 output: oss_results_display
enterprise_results, # 1 output: enterprise_results_display
demo_message, # 1 output: approval_display
incident_df, # 1 output: incident_table (DataFrame)
execution_df # 1 output: execution_table (DataFrame)
)
# FIXED: demo_btn.click with correct output count
demo_btn.click(
fn=run_complete_demo_async,
inputs=[scenario_dropdown],
outputs=[
scenario_card, telemetry_viz, impact_viz, timeline_viz, # 4
detection_agent, recall_agent, decision_agent, # 3
oss_results_display, # 1
enterprise_results_display, # 1
approval_display, # 1
incident_table, # 1
execution_table # 1
]
)
# ROI Calculation
calculate_btn.click(
fn=calculate_roi,
inputs=[roi_scenario_dropdown, monthly_slider, team_slider],
outputs=[roi_output, roi_chart]
)
# Update ROI scenario - FIXED: Use the EnhancedROICalculator
roi_scenario_dropdown.change(
fn=lambda x: get_components()["EnhancedROICalculator"].calculate_comprehensive_roi(scenario_name=x),
inputs=[roi_scenario_dropdown],
outputs=[roi_output]
)
# Update ROI chart
monthly_slider.change(
fn=lambda x, y: calculate_roi(roi_scenario_dropdown.value, x, y)[1],
inputs=[monthly_slider, team_slider],
outputs=[roi_chart]
)
team_slider.change(
fn=lambda x, y: calculate_roi(roi_scenario_dropdown.value, x, y)[1],
inputs=[monthly_slider, team_slider],
outputs=[roi_chart]
)
# Audit Trail Functions - FIXED: Returns DataFrames
def refresh_audit_trail():
"""Refresh audit trail tables - FIXED to return DataFrames"""
return (
get_audit_manager().get_execution_dataframe(), # DataFrame
get_audit_manager().get_incident_dataframe() # DataFrame
)
def clear_audit_trail():
"""Clear audit trail - FIXED to return empty DataFrames"""
get_audit_manager().clear()
# Return empty DataFrames with correct columns
exec_df = pd.DataFrame(columns=["Execution ID", "Scenario", "Status", "Mode", "Start Time"])
incident_df = pd.DataFrame(columns=["Scenario", "Status", "Boundary", "Time"])
return exec_df, incident_df
def export_audit_trail():
"""Export audit trail as JSON"""
audit_data = {
"executions": get_audit_manager().executions,
"incidents": get_audit_manager().incidents,
"boundary_crossings": get_audit_manager().boundary_crossings,
"export_time": datetime.datetime.now().isoformat(),
"arf_version": "3.3.7",
"architecture": "OSS advises โ Enterprise executes"
}
return json.dumps(audit_data, indent=2)
refresh_btn.click(
fn=refresh_audit_trail,
inputs=[],
outputs=[execution_table, incident_table]
)
clear_btn.click(
fn=clear_audit_trail,
inputs=[],
outputs=[execution_table, incident_table]
)
export_btn.click(
fn=export_audit_trail,
inputs=[],
outputs=[export_text]
)
# Enterprise Features
def validate_license():
"""Validate enterprise license with boundary context"""
boundaries = BoundaryManager.get_system_boundaries()
if boundaries["enterprise"]["available"]:
return {
"status": "โ
Valid License",
"license_type": "Enterprise",
"version": boundaries["enterprise"]["version"],
"expires": "2025-12-31",
"capabilities": boundaries["enterprise"]["capabilities"],
"boundary_context": f"Real {boundaries['enterprise']['label']} detected"
}
else:
return {
"status": "โ ๏ธ Demo Mode",
"license_type": "Simulated",
"version": boundaries["enterprise"]["version"],
"expires": "Demo only",
"capabilities": boundaries["enterprise"]["capabilities"],
"boundary_context": f"Simulating {boundaries['enterprise']['label']} - requires license",
"contact": "sales@arf.dev"
}
validate_btn.click(
fn=validate_license,
inputs=[],
outputs=[license_display]
)
# Initialize with boundary badges
demo.load(
fn=lambda: boundary_badges,
inputs=[],
outputs=[boundary_display]
)
# Load default scenario
demo.load(
fn=lambda: update_scenario_display(settings.default_scenario),
inputs=[],
outputs=[scenario_card, telemetry_viz, impact_viz, timeline_viz]
)
# Load ROI data
demo.load(
fn=lambda: calculate_roi(settings.default_scenario, 15, 5),
inputs=[],
outputs=[roi_output, roi_chart]
)
logger.info("โ
Demo interface created successfully with boundary awareness")
return demo
# ===========================================
# LAUNCH FUNCTION
# ===========================================
def launch_demo():
"""Launch the demo application with proper configuration"""
try:
logger.info("๐ Starting ARF Ultimate Investor Demo v3.8.0 - ENTERPRISE EDITION")
# Check installation
installation = get_installation_status()
boundaries = BoundaryManager.get_system_boundaries()
logger.info("=" * 60)
logger.info("๐๏ธ SYSTEM ARCHITECTURE BOUNDARIES:")
logger.info(f" OSS: {boundaries['oss']['label']} v{boundaries['oss']['version']}")
logger.info(f" Enterprise: {boundaries['enterprise']['label']} v{boundaries['enterprise']['version']}")
logger.info(f" Mode: {boundaries['demo_mode']['architecture']}")
logger.info("=" * 60)
# Create interface
demo = create_demo_interface()
# Get CSS styles
components = get_components()
css_styles = components["get_styles"]()
# Configure for Hugging Face Spaces
launch_config = {
"server_name": "0.0.0.0",
"server_port": 7860,
"share": False,
"favicon_path": None,
"quiet": False,
"show_error": True,
"debug": False,
"max_threads": 40,
}
# Add CSS if available
if css_styles:
launch_config["css"] = css_styles
logger.info("โ
Launch configuration ready")
return demo, launch_config
except Exception as e:
logger.error(f"โ Launch failed: {e}", exc_info=True)
# Create minimal fallback interface
import gradio as gr
with gr.Blocks(title="ARF Demo - Fallback Mode") as fallback_demo:
gr.HTML(f"""
๐จ ARF Demo Failed to Start
Error: {str(e)}
Troubleshooting Steps:
- Check logs for detailed error
- Ensure all dependencies are installed
- Try: pip install agentic-reliability-framework==3.3.7
- Restart the application
""")
return fallback_demo, {"server_name": "0.0.0.0", "server_port": 7860}
# ===========================================
# MAIN EXECUTION
# ===========================================
if __name__ == "__main__":
try:
logger.info("๐ ARF Ultimate Investor Demo v3.8.0 - ENTERPRISE EDITION")
logger.info("=" * 60)
logger.info("Enhanced version with clear boundaries and reliable visualizations")
logger.info("Fixed to show clear OSS vs Enterprise boundaries with architectural honesty")
logger.info("=" * 60)
# Launch the demo
demo, config = launch_demo()
print("\n" + "="*60)
print("๐ ARF Ultimate Investor Demo v3.8.0 - ENTERPRISE EDITION")
print("๐ Architecture: OSS advises โ Enterprise executes")
print("๐ Starting on http://localhost:7860")
print("="*60 + "\n")
# Launch with error handling
try:
demo.launch(**config)
except Exception as launch_error:
logger.error(f"โ Launch error: {launch_error}")
# Try alternative launch without CSS
if "css" in config:
logger.info("โ ๏ธ Retrying without CSS...")
config.pop("css", None)
demo.launch(**config)
else:
# Last resort: simple launch
demo.launch(server_name="0.0.0.0", server_port=7860)
except KeyboardInterrupt:
logger.info("๐ Demo stopped by user")
except Exception as e:
logger.error(f"โ Fatal error: {e}", exc_info=True)
sys.exit(1)