"""
🚀 ARF ULTIMATE INVESTOR DEMO v3.3.8
Enhanced with professional visualizations, export features, and data persistence
FIXED VERSION: All visualization errors resolved
"""
import asyncio
import datetime
import json
import logging
import time
import uuid
import random
import base64
import io
from typing import Dict, Any, List, Optional, Tuple
from collections import defaultdict, deque
import hashlib
import gradio as gr
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
from plotly.subplots import make_subplots
# Import OSS components
try:
from agentic_reliability_framework.arf_core.models.healing_intent import (
HealingIntent,
create_rollback_intent,
create_restart_intent,
create_scale_out_intent,
)
from agentic_reliability_framework.arf_core.engine.simple_mcp_client import OSSMCPClient
OSS_AVAILABLE = True
except ImportError:
OSS_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning("OSS package not available")
# ============================================================================
# BUSINESS IMPACT CALCULATIONS (Based on business.py)
# ============================================================================
class BusinessImpactCalculator:
"""Enterprise-scale business impact calculation"""
def __init__(self):
# Enterprise-scale constants
self.BASE_REVENUE_PER_MINUTE = 5000.0 # $5K/min for enterprise
self.BASE_USERS = 10000 # 10K active users
def calculate_impact(self, scenario: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate business impact for demo scenarios"""
revenue_at_risk = scenario.get("revenue_at_risk", 0)
users_impacted = scenario.get("users_impacted", 0)
if revenue_at_risk > 1000000:
severity = "🚨 CRITICAL"
impact_color = "#ff4444"
elif revenue_at_risk > 500000:
severity = "⚠️ HIGH"
impact_color = "#ffaa00"
elif revenue_at_risk > 100000:
severity = "📈 MEDIUM"
impact_color = "#ffdd00"
else:
severity = "✅ LOW"
impact_color = "#44ff44"
return {
"revenue_at_risk": f"${revenue_at_risk:,.0f}",
"users_impacted": f"{users_impacted:,}",
"severity": severity,
"impact_color": impact_color,
"time_to_resolution": f"{scenario.get('time_to_resolve', 2.3):.1f} min",
"auto_heal_possible": scenario.get("auto_heal_possible", True),
}
# ============================================================================
# RAG GRAPH VISUALIZATION (Based on v3_reliability.py)
# ============================================================================
class RAGGraphVisualizer:
"""Visualize RAG graph memory growth"""
def __init__(self):
self.incidents = []
self.outcomes = []
self.edges = []
def add_incident(self, component: str, severity: str):
"""Add an incident to the graph"""
incident_id = f"inc_{len(self.incidents)}"
self.incidents.append({
"id": incident_id,
"component": component,
"severity": severity,
"timestamp": time.time(),
})
return incident_id
def add_outcome(self, incident_id: str, success: bool, action: str):
"""Add an outcome to the graph"""
outcome_id = f"out_{len(self.outcomes)}"
self.outcomes.append({
"id": outcome_id,
"incident_id": incident_id,
"success": success,
"action": action,
"timestamp": time.time(),
})
# Add edge
self.edges.append({
"source": incident_id,
"target": outcome_id,
"type": "resolved" if success else "failed",
})
return outcome_id
def get_graph_figure(self):
"""Create Plotly figure of RAG graph"""
if not self.incidents:
# Return empty figure with message
fig = go.Figure()
fig.update_layout(
title="🧠 RAG Graph Memory - Learning from Incidents",
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor="white",
height=500,
annotations=[dict(
text="No incidents recorded yet. Try a scenario!",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16, color="gray")
)]
)
return fig
# Prepare node data
nodes = []
node_colors = []
node_sizes = []
# Add incident nodes
for inc in self.incidents:
nodes.append({
"x": random.random(),
"y": random.random(),
"label": f"{inc['component']}\n{inc['severity']}",
"id": inc["id"],
"type": "incident",
})
node_colors.append("#ff6b6b" if inc["severity"] == "critical" else "#ffa726")
node_sizes.append(30)
# Add outcome nodes
for out in self.outcomes:
nodes.append({
"x": random.random() + 0.5, # Shift right
"y": random.random(),
"label": f"{out['action']}\n{'✅' if out['success'] else '❌'}",
"id": out["id"],
"type": "outcome",
})
node_colors.append("#4caf50" if out["success"] else "#f44336")
node_sizes.append(20)
# Create figure
fig = go.Figure()
# Add edges
for edge in self.edges:
source = next((n for n in nodes if n["id"] == edge["source"]), None)
target = next((n for n in nodes if n["id"] == edge["target"]), None)
if source and target:
fig.add_trace(go.Scatter(
x=[source["x"], target["x"]],
y=[source["y"], target["y"]],
mode="lines",
line=dict(
color="#888888",
width=2,
dash="dash" if edge["type"] == "failed" else "solid"
),
hoverinfo="none",
showlegend=False,
))
# Add nodes
fig.add_trace(go.Scatter(
x=[n["x"] for n in nodes],
y=[n["y"] for n in nodes],
mode="markers+text",
marker=dict(
size=node_sizes,
color=node_colors,
line=dict(color="white", width=2)
),
text=[n["label"] for n in nodes],
textposition="top center",
hovertext=[f"Type: {n['type']}" for n in nodes],
hoverinfo="text",
showlegend=False,
))
# Update layout
fig.update_layout(
title="🧠 RAG Graph Memory - Learning from Incidents",
showlegend=False,
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor="white",
height=500,
)
return fig
def get_stats(self):
"""Get graph statistics"""
successful_outcomes = sum(1 for o in self.outcomes if o["success"])
return {
"incident_nodes": len(self.incidents),
"outcome_nodes": len(self.outcomes),
"edges": len(self.edges),
"success_rate": f"{(successful_outcomes / len(self.outcomes) * 100):.1f}%" if self.outcomes else "0%",
"patterns_learned": len(self.outcomes) // 3, # Rough estimate
}
# ============================================================================
# PREDICTIVE ANALYTICS (Based on predictive.py)
# ============================================================================
class PredictiveVisualizer:
"""Visualize predictive analytics"""
def __init__(self):
self.predictions = []
def add_prediction(self, metric: str, current_value: float, predicted_value: float,
time_to_threshold: Optional[float] = None):
"""Add a prediction"""
self.predictions.append({
"metric": metric,
"current": current_value,
"predicted": predicted_value,
"time_to_threshold": time_to_threshold,
"timestamp": time.time(),
"predicted_at": datetime.datetime.now().strftime("%H:%M:%S"),
})
def get_predictive_timeline(self):
"""Create predictive timeline visualization"""
if not self.predictions:
# Return empty figure with message
fig = go.Figure()
fig.update_layout(
title="🔮 Predictive Analytics Timeline",
xaxis_title="Time",
yaxis_title="Metric Value",
height=400,
plot_bgcolor="white",
annotations=[dict(
text="No predictions yet. Try a scenario!",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=14, color="gray")
)]
)
return fig
# Create timeline data - ensure we have valid data
valid_predictions = []
for p in self.predictions[-10:]: # Last 10 predictions
if isinstance(p.get("current"), (int, float)) and isinstance(p.get("predicted"), (int, float)):
valid_predictions.append(p)
if not valid_predictions:
# Return empty figure
fig = go.Figure()
fig.update_layout(
title="🔮 Predictive Analytics Timeline",
height=400,
annotations=[dict(
text="Waiting for prediction data...",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False
)]
)
return fig
df = pd.DataFrame(valid_predictions)
fig = go.Figure()
# Add current values
fig.add_trace(go.Scatter(
x=df["predicted_at"],
y=df["current"],
mode="lines+markers",
name="Current",
line=dict(color="#4caf50", width=3),
marker=dict(size=10),
))
# Add predicted values
fig.add_trace(go.Scatter(
x=df["predicted_at"],
y=df["predicted"],
mode="lines+markers",
name="Predicted",
line=dict(color="#ff9800", width=2, dash="dash"),
marker=dict(size=8),
))
# Add threshold warning if applicable
for i, row in df.iterrows():
if row["time_to_threshold"] and isinstance(row["time_to_threshold"], (int, float)) and row["time_to_threshold"] < 30:
fig.add_annotation(
x=row["predicted_at"],
y=row["predicted"],
text=f"⚠️ {row['time_to_threshold']:.0f} min",
showarrow=True,
arrowhead=2,
arrowsize=1,
arrowwidth=2,
arrowcolor="#ff4444",
font=dict(color="#ff4444", size=10),
)
# Update layout
fig.update_layout(
title="🔮 Predictive Analytics Timeline",
xaxis_title="Time",
yaxis_title="Metric Value",
hovermode="x unified",
plot_bgcolor="white",
height=400,
)
return fig
# ============================================================================
# ENTERPRISE MOCK SERVER (Based on enterprise code structure)
# ============================================================================
class MockEnterpriseServer:
"""Mock enterprise server showing full capabilities"""
def __init__(self, license_key: str):
self.license_key = license_key
self.license_tier = self._get_license_tier(license_key)
self.audit_trail = []
self.learning_engine_active = True
self.execution_stats = {
"total_executions": 0,
"successful_executions": 0,
"autonomous_executions": 0,
"approval_workflows": 0,
"revenue_protected": 0.0,
}
def _get_license_tier(self, license_key: str) -> str:
"""Determine license tier from key"""
if "ENTERPRISE" in license_key:
return "Enterprise"
elif "PROFESSIONAL" in license_key:
return "Professional"
elif "TRIAL" in license_key:
return "Trial"
return "Starter"
async def execute_healing(self, healing_intent: Dict[str, Any], mode: str = "autonomous") -> Dict[str, Any]:
"""Mock enterprise execution"""
execution_id = f"exec_{uuid.uuid4().hex[:16]}"
start_time = time.time()
# Simulate execution time
await asyncio.sleep(random.uniform(0.5, 2.0))
# Determine success based on confidence
confidence = healing_intent.get("confidence", 0.85)
success = random.random() < confidence
# Calculate simulated impact
revenue_protected = random.randint(50000, 500000)
# Update stats
self.execution_stats["total_executions"] += 1
if success:
self.execution_stats["successful_executions"] += 1
self.execution_stats["revenue_protected"] += revenue_protected
if mode == "autonomous":
self.execution_stats["autonomous_executions"] += 1
elif mode == "approval":
self.execution_stats["approval_workflows"] += 1
# Record audit
audit_entry = {
"audit_id": f"audit_{uuid.uuid4().hex[:8]}",
"timestamp": datetime.datetime.now().isoformat(),
"action": healing_intent["action"],
"component": healing_intent["component"],
"mode": mode,
"success": success,
"revenue_protected": revenue_protected,
"execution_time": time.time() - start_time,
"license_tier": self.license_tier,
}
self.audit_trail.append(audit_entry)
return {
"execution_id": execution_id,
"success": success,
"message": f"✅ Successfully executed {healing_intent['action']} on {healing_intent['component']}" if success
else f"⚠️ Execution partially failed for {healing_intent['action']}",
"revenue_protected": revenue_protected,
"execution_time": time.time() - start_time,
"mode": mode,
"license_tier": self.license_tier,
"audit_id": audit_entry["audit_id"],
"learning_recorded": self.learning_engine_active and success,
}
def generate_compliance_report(self, standard: str = "SOC2") -> Dict[str, Any]:
"""Generate mock compliance report"""
return {
"report_id": f"compliance_{uuid.uuid4().hex[:8]}",
"standard": standard,
"generated_at": datetime.datetime.now().isoformat(),
"period": "last_30_days",
"findings": {
"audit_trail_complete": True,
"access_controls_enforced": True,
"data_encrypted": True,
"incident_response_documented": True,
"sla_compliance": "99.95%",
},
"summary": f"✅ {standard} compliance requirements fully met",
"estimated_audit_cost_savings": "$150,000",
}
# ============================================================================
# LIVE DASHBOARD
# ============================================================================
class LiveDashboard:
"""Live executive dashboard"""
def __init__(self):
self.total_revenue_protected = 0.0
self.total_incidents = 0
self.auto_healed = 0
self.engineer_hours_saved = 0
self.start_time = time.time()
def add_execution_result(self, revenue_protected: float, auto_healed: bool = True):
"""Add execution result to dashboard"""
self.total_revenue_protected += revenue_protected
self.total_incidents += 1
if auto_healed:
self.auto_healed += 1
self.engineer_hours_saved += 2.5 # 2.5 hours saved per auto-healed incident
def get_dashboard_data(self):
"""Get current dashboard data"""
uptime_hours = (time.time() - self.start_time) / 3600
return {
"revenue_protected": f"${self.total_revenue_protected:,.0f}",
"total_incidents": self.total_incidents,
"auto_healed": self.auto_healed,
"auto_heal_rate": f"{(self.auto_healed / self.total_incidents * 100):.1f}%" if self.total_incidents > 0 else "0%",
"engineer_hours_saved": f"{self.engineer_hours_saved:.0f} hours",
"avg_mttr": "2.3 minutes",
"industry_mttr": "45 minutes",
"improvement": "94% faster",
"uptime": f"{uptime_hours:.1f} hours",
"roi": "5.2×",
}
# ============================================================================
# ENHANCED VISUALIZATION ENGINE - FIXED VERSION
# ============================================================================
class EnhancedVisualizationEngine:
"""Enhanced visualization engine with animations and interactivity - FIXED"""
@staticmethod
def create_animated_radar_chart(metrics: Dict[str, float], title: str = "Performance Radar"):
"""Create animated radar chart with smooth transitions"""
try:
# Filter out non-numeric values
numeric_metrics = {}
for key, value in metrics.items():
if isinstance(value, (int, float)):
numeric_metrics[key] = value
if not numeric_metrics:
# Return empty radar chart
fig = go.Figure()
fig.update_layout(
title=title,
polar=dict(
radialaxis=dict(visible=True, range=[0, 100])
),
height=400,
annotations=[dict(
text="No numeric metrics available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=14, color="gray")
)]
)
return fig
categories = list(numeric_metrics.keys())
values = list(numeric_metrics.values())
# Ensure we have at least 3 categories for radar chart
if len(categories) < 3:
# Duplicate categories to make radar work
while len(categories) < 3:
categories.append(f"Metric_{len(categories)}")
values.append(0)
# Create radar chart
fig = go.Figure()
fig.add_trace(go.Scatterpolar(
r=values,
theta=categories,
fill='toself',
name='Current',
line_color='#4CAF50',
opacity=0.8
))
# Add ideal baseline (for comparison)
baseline_values = [max(values) * 0.8] * len(values) if values else [0] * len(categories)
fig.add_trace(go.Scatterpolar(
r=baseline_values,
theta=categories,
fill='toself',
name='Ideal Baseline',
line_color='#2196F3',
opacity=0.3
))
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=True,
range=[0, max(values) * 1.2 if values else 100]
)),
showlegend=True,
title=title,
height=400,
animations=[{
'frame': {'duration': 500, 'redraw': True},
'transition': {'duration': 300, 'easing': 'cubic-in-out'},
}]
)
return fig
except Exception as e:
logging.error(f"Error creating radar chart: {e}")
# Return empty figure
fig = go.Figure()
fig.update_layout(
title=title,
height=400,
annotations=[dict(
text=f"Error loading radar chart",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=14, color="red")
)]
)
return fig
@staticmethod
def create_heatmap_timeline(scenarios: List[Dict[str, Any]]):
"""Create heatmap timeline of incidents - FIXED"""
try:
if not scenarios:
# Return empty heatmap
fig = go.Figure()
fig.update_layout(
title="🔥 Incident Heatmap Timeline",
height=400,
annotations=[dict(
text="No scenario data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=14, color="gray")
)]
)
return fig
# Prepare data
severity_map = {"critical": 3, "high": 2, "medium": 1, "low": 0}
data = []
for i, scenario in enumerate(scenarios):
impact = scenario.get("business_impact", {})
# Safely get severity value
revenue_risk = impact.get("revenue_at_risk", 0)
if not isinstance(revenue_risk, (int, float)):
revenue_risk = 0
severity_val = severity_map.get(
"critical" if revenue_risk > 1000000 else
"high" if revenue_risk > 500000 else
"medium" if revenue_risk > 100000 else "low",
0
)
description = scenario.get("description", "Unknown")
if len(description) > 30:
description = description[:27] + "..."
data.append({
"Scenario": description,
"Revenue Risk": revenue_risk,
"Users Impacted": impact.get("users_impacted", 0),
"Severity": severity_val,
"Time to Resolve": impact.get("time_to_resolve", 0),
})
df = pd.DataFrame(data)
# Ensure we have numeric data for heatmap
numeric_columns = ['Revenue Risk', 'Users Impacted', 'Severity', 'Time to Resolve']
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
# Create heatmap
fig = go.Figure(data=go.Heatmap(
z=df[numeric_columns].values.T,
x=df['Scenario'],
y=numeric_columns,
colorscale='RdYlGn_r', # Red to Green (reversed for severity)
showscale=True,
hoverongaps=False,
hovertemplate='%{x}
%{y}: %{z}
Score: %{y:.1f}
Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
| Metric | Without ARF | With ARF | Improvement |
|---|---|---|---|
| {comp[0]} | {comp[1]} | {comp[2]} | {comp[3]} |
Experience the full spectrum: OSS (Free) ↔ Enterprise (Paid)
🚀 v3.3.8 with enhanced visualizations
📊 Professional analytics & export features
Capability Comparison:
| Capability | OSS Edition | Enterprise Edition |
|---|---|---|
| Execution | ❌ Advisory only | ✅ Autonomous + Approval |
| Learning | ❌ No learning | ✅ Continuous learning engine |
| Compliance | ❌ No audit trails | ✅ SOC2/GDPR/HIPAA compliant |
| Storage | ⚠️ In-memory only | ✅ Persistent (Neo4j + PostgreSQL) |
| Support | ❌ Community | ✅ 24/7 Enterprise support |
| ROI | ❌ None | ✅ 5.2× average first year ROI |
📧 Email: enterprise@petterjuan.com
🌐 Website: https://arf.dev
📚 Documentation: https://docs.arf.dev
💻 GitHub: petterjuan/agentic-reliability-framework
🚀 ARF Ultimate Investor Demo v3.3.8 | Enhanced with Professional Analytics & Export Features
Built with ❤️ using Gradio & Plotly