# app.py - Complete fixed version with Plotly compatibility AND MODERN COMPONENTS # ๐Ÿš€ ARF Ultimate Investor Demo v3.3.9 - ENTERPRISE EDITION # Enhanced with clear OSS vs Enterprise boundaries # UPDATED: Added realism panel integration for enterprise-seasoned SRE experience # UPDATED: Added dynamic performance metrics for Phase 2 # SURGICAL FIX: Fixed AsyncRunner.async_to_sync contract violation # DOCTRINAL FIX: Updated unpacking contract from 24 to 26 values # MODERN UI: Integrated modern_components.py for enhanced UI foundation import logging import sys import traceback import json import datetime import asyncio import time import random from pathlib import Path from typing import Dict, List, Any, Optional, Tuple # =========================================== # CONFIGURE LOGGING FIRST # =========================================== logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('arf_demo.log') ] ) logger = logging.getLogger(__name__) # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) # =========================================== # MODERN UI FEATURE FLAGS # =========================================== # Feature flags configuration for safe rollout FEATURE_FLAGS = { 'modern_ui': True, # Use modern components 'dark_mode': True, # Enable dark mode toggle 'responsive_design': True, # Use responsive CSS 'progressive_disclosure': False, # Start disabled 'keyboard_nav': False, 'realtime_updates': False } # Check URL parameters for feature flag overrides (placeholder) def get_feature_flags(): flags = FEATURE_FLAGS.copy() # TODO: Add URL parameter parsing if needed return flags # =========================================== # FIX FOR ASYNC EVENT LOOP ISSUES - UPDATED FOR SPACES COMPATIBILITY # =========================================== # CRITICAL FIX: Don't apply nest_asyncio here - let uvicorn handle it # This fixes the "loop_factory" TypeError try: import nest_asyncio # Only apply if we're NOT in the main thread (detect if uvicorn is running) # We'll handle this differently in the main() function logger.info("โœ… nest_asyncio imported but not applied yet") except ImportError: logger.warning("โš ๏ธ nest_asyncio not available, async operations may have issues") # =========================================== # IMPORT UTILITY CLASSES FIRST # =========================================== from utils.installation import InstallationHelper from demo.guidance import DemoPsychologyController, get_demo_controller # =========================================== # BOUNDARY MANAGEMENT SYSTEM # =========================================== class BoundaryManager: """Manages clear boundaries between OSS and Enterprise""" @staticmethod def get_system_boundaries(): """Get current system boundaries""" installation = get_installation_status() return { "oss": { "available": installation["oss_installed"], "version": installation["oss_version"] or "mock", "label": installation["badges"]["oss"]["text"], "color": installation["badges"]["oss"]["color"], "icon": installation["badges"]["oss"]["icon"], "capabilities": ["advisory_analysis", "rag_search", "healing_intent"], "license": "Apache 2.0" }, "enterprise": { "available": installation["enterprise_installed"], "version": installation["enterprise_version"] or "simulated", "label": installation["badges"]["enterprise"]["text"], "color": installation["badges"]["enterprise"]["color"], "icon": installation["badges"]["enterprise"]["icon"], "capabilities": ["autonomous_execution", "rollback_guarantee", "mcp_integration", "enterprise_support"], "license": "Commercial" }, "demo_mode": { "active": True, "architecture": "OSS advises โ†’ Enterprise executes", "boundary_visible": settings.show_boundaries } } @staticmethod def get_boundary_badges() -> str: """Get HTML badges showing system boundaries""" boundaries = BoundaryManager.get_system_boundaries() return f"""
{boundaries['oss']['icon']}
{boundaries['oss']['label']}
Apache 2.0 โ€ข Advisory Intelligence
{boundaries['enterprise']['icon']}
{boundaries['enterprise']['label']}
Commercial โ€ข Autonomous Execution
๐Ÿ—๏ธ
Architecture Boundary
OSS advises โ†’ Enterprise executes
""" @staticmethod def create_boundary_indicator(action: str, is_simulated: bool = True) -> str: """Create clear execution boundary indicator""" if is_simulated: return f"""
๐ŸŽญ

SIMULATED ENTERPRISE EXECUTION

Action: {action}
Mode: Enterprise Simulation (not real execution)
Boundary: OSS advises โ†’ Enterprise would execute

DEMO BOUNDARY

In production, Enterprise edition would execute against real infrastructure

""" else: return f"""
โšก

REAL ENTERPRISE EXECUTION

Action: {action}
Mode: Enterprise Autonomous
Boundary: Real execution with safety guarantees

ENTERPRISE+
""" # =========================================== # FIXED: AsyncRunner - CONTRACT-PRESERVING VERSION # =========================================== class AsyncRunner: """Enhanced async runner with better error handling - FIXED to preserve return contracts""" @staticmethod def run_async(coro): """Run async coroutine in sync context""" try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) except Exception as e: logger.error(f"Async execution failed: {e}") # CRITICAL FIX: Return contract-compatible values instead of dict error_html = f"""
โŒ

Async Error

Async operation failed

""" error_dict = { "status": "error", "error": str(e), "scenario": "Unknown", "arf_version": "3.3.9", "boundary_note": "Async execution boundary reached" } error_df = pd.DataFrame(columns=["Error", "Message"]).from_records([ {"Error": "Async Execution Failed", "Message": str(e)} ]) # ๐Ÿ”’ SHAPE CONTRACT ENFORCED: Always return 5-tuple matching expected signature return error_html, error_html, error_html, error_dict, error_df @staticmethod def async_to_sync(async_func): """Decorator to convert async function to sync - FIXED to preserve return contract""" def wrapper(*args, **kwargs): try: # Direct call to run_async which now preserves contract result = AsyncRunner.run_async(async_func(*args, **kwargs)) # Ensure result is a 5-tuple (contract validation) if isinstance(result, tuple) and len(result) == 5: return result else: # Contract violation - wrap it properly logger.warning(f"Contract violation: Expected 5-tuple, got {type(result)}") error_html = f"""
โš ๏ธ

Contract Violation

Expected 5-tuple, got {type(result).__name__}

""" error_dict = { "status": "contract_error", "error": f"Expected 5-tuple, got {type(result)}", "scenario": args[0] if args else "Unknown", "arf_version": "3.3.9", "boundary_note": "Return contract violation" } error_df = pd.DataFrame(columns=["Error", "Message"]).from_records([ {"Error": "Contract Error", "Message": "Return shape violation"} ]) return error_html, error_html, error_html, error_dict, error_df except Exception as e: logger.error(f"Async to sync conversion failed: {e}") # ๐Ÿ”’ SHAPE CONTRACT ENFORCED: Always return 5-tuple error_html = f"""
โŒ

Conversion Error

Async to sync failed

""" error_dict = { "status": "error", "error": str(e), "scenario": args[0] if args else "Unknown", "arf_version": "3.3.9", "boundary_context": "OSS advisory only - execution requires Enterprise" } error_df = pd.DataFrame(columns=["Error", "Message"]).from_records([ {"Error": "Conversion Failed", "Message": str(e)} ]) # ๐Ÿ”’ SHAPE CONTRACT ENFORCED: Always return 5-tuple return error_html, error_html, error_html, error_dict, error_df return wrapper # =========================================== # SIMPLE SETTINGS - FIXED: Added missing attributes # =========================================== class Settings: """Simple settings class - FIXED: Added all missing attributes""" def __init__(self): self.arf_mode = "demo" self.use_true_arf = True self.default_scenario = "Cache Miss Storm" self.max_history_items = 100 self.auto_refresh_seconds = 30 self.show_boundaries = True self.architectural_honesty = True self.engineer_annual_cost = 200000 self.default_savings_rate = 0.25 # FIXED: Added missing attribute self.cache_miss_impact = 8500 self.database_impact = 4200 self.kubernetes_impact = 5500 self.api_impact = 3800 self.network_impact = 12000 self.storage_impact = 6800 self.telemetry_enabled = True self.mcp_mode = "simulated" self.enterprise_features = ["simulated_execution", "rollback_guarantee"] self.default_savings_rate = 0.25 # FIXED: Ensure it's defined self.enable_mcp_integration = True # FIXED: Added missing self.enable_learning_engine = True # FIXED: Added missing self.max_concurrent_incidents = 5 # FIXED: Added missing settings = Settings() # =========================================== # ARF INSTALLATION CHECK - FIXED VERSION # =========================================== def check_arf_installation(): """Check if real ARF packages are installed - Fixed version""" results = { "oss_installed": False, "enterprise_installed": False, "oss_version": None, "enterprise_version": None, "oss_edition": "unknown", "oss_license": "unknown", "execution_allowed": False, "recommendations": [], "boundaries": { "oss_can": ["advisory_analysis", "rag_search", "healing_intent"], "oss_cannot": ["execute", "modify_infra", "autonomous_healing"], "enterprise_requires": ["license", "infra_access", "safety_controls"] }, "badges": { "oss": {"text": "โš ๏ธ Mock ARF", "color": "#f59e0b", "icon": "โš ๏ธ"}, "enterprise": {"text": "๐Ÿ”’ Enterprise Required", "color": "#64748b", "icon": "๐Ÿ”’"} }, "timestamp": datetime.datetime.now().isoformat() } # Check OSS package using InstallationHelper installation_helper = InstallationHelper() status = installation_helper.check_installation() results["oss_installed"] = status["oss_installed"] results["oss_version"] = status["oss_version"] results["enterprise_installed"] = status["enterprise_installed"] results["enterprise_version"] = status["enterprise_version"] results["recommendations"] = status["recommendations"] if results["oss_installed"]: results["badges"]["oss"] = { "text": f"โœ… ARF OSS v{results['oss_version']}", "color": "#10b981", "icon": "โœ…" } logger.info(f"โœ… ARF OSS v{results['oss_version']} detected") else: results["badges"]["oss"] = { "text": "โœ… ARF OSS v3.3.9", "color": "#10b981", "icon": "โœ…" } logger.info("โœ… ARF OSS v3.3.9 (demo mode)") if results["enterprise_installed"]: results["badges"]["enterprise"] = { "text": f"๐Ÿš€ Enterprise v{results['enterprise_version']}", "color": "#8b5cf6", "icon": "๐Ÿš€" } logger.info(f"โœ… ARF Enterprise v{results['enterprise_version']} detected") else: results["badges"]["enterprise"] = { "text": "๐Ÿข Enterprise Edition", # Changed from "๐Ÿ”’ Enterprise Required" "color": "#3b82f6", # Changed from "#64748b" (gray to blue) "icon": "๐Ÿข" # Changed from "๐Ÿ”’" } logger.info("๐Ÿข Enterprise Edition (simulated)") return results _installation_status = None def get_installation_status(): """Get cached installation status""" global _installation_status if _installation_status is None: _installation_status = check_arf_installation() return _installation_status # =========================================== # PLOTLY CONFIGURATION FOR GRADIO COMPATIBILITY # =========================================== import plotly.graph_objects as go import plotly.express as px import plotly.io as pio import pandas as pd import numpy as np # Configure Plotly for Gradio compatibility pio.templates.default = "plotly_white" logger.info("โœ… Plotly configured for Gradio compatibility") # =========================================== # MODERN UI COMPONENTS IMPORT # =========================================== # Import modern components with fallback try: from ui.modern_components import ( initialize_modern_ui, Card, Grid, ObservationGate, SequencingFlow, ProcessDisplay, DESIGN_TOKENS, Button, Badge, ResponsiveUtils, Accessibility, DarkMode, create_example_dashboard ) MODERN_UI_AVAILABLE = True logger.info("โœ… Modern UI components loaded successfully") except ImportError as e: MODERN_UI_AVAILABLE = False logger.warning(f"โš ๏ธ Modern UI components not available: {e}") # Create minimal fallback classes class Card: @staticmethod def create(content, **kwargs): return f"
{content}
" class ObservationGate: @staticmethod def create(confidence=65.0, **kwargs): return f"
Observation Gate: {confidence}%
" # =========================================== # CSS LOADING FUNCTION # =========================================== def load_css_files(): """Load CSS files for modern UI with fallback - ENHANCED""" css_content = "" # Feature flag check flags = get_feature_flags() if flags.get('modern_ui', True): # Default to True try: # Load modern.css with open("styles/modern.css", "r") as f: css_content += f.read() + "\n" logger.info("โœ… Loaded modern.css") except FileNotFoundError: logger.warning("โš ๏ธ modern.css not found, using fallback") css_content += """ /* Modern CSS Fallback */ :root { --color-primary: #3b82f6; --color-success: #10b981; --color-warning: #f59e0b; --color-danger: #ef4444; --color-bg: #ffffff; --color-text: #1e293b; --color-border: #e2e8f0; } .container { width: 100%; max-width: 1200px; margin: 0 auto; padding: 0 1rem; } .card { background: white; border-radius: 0.75rem; border: 1px solid var(--color-border); padding: 1.5rem; box-shadow: 0 1px 3px 0 rgb(0 0 0 / 0.1); } """ if flags.get('responsive_design', True): try: # Load responsive.css with open("styles/responsive.css", "r") as f: css_content += f.read() + "\n" logger.info("โœ… Loaded responsive.css") except FileNotFoundError: logger.warning("โš ๏ธ responsive.css not found, using fallback") css_content += """ /* Responsive Fallback */ @media (max-width: 768px) { .grid-2, .grid-3, .grid-4 { grid-template-columns: 1fr !important; } .card { padding: 1rem; } } """ # Add dark mode toggle CSS if flags.get('dark_mode', True): css_content += """ /* Dark Mode Toggle */ .dark-mode-toggle { position: fixed; bottom: 20px; right: 20px; z-index: 1000; background: white; border: 2px solid var(--color-border); border-radius: 50%; width: 48px; height: 48px; display: flex; align-items: center; justify-content: center; cursor: pointer; box-shadow: 0 4px 12px rgba(0,0,0,0.1); transition: all 0.3s ease; } .dark-mode-toggle:hover { transform: scale(1.1); box-shadow: 0 6px 16px rgba(0,0,0,0.15); } [data-theme="dark"] { --color-bg: #0f172a; --color-text: #f1f5f9; --color-border: #334155; } [data-theme="dark"] .card { background: #1e293b; } """ else: # Minimal CSS css_content = """ :root { --color-primary: #3b82f6; --color-bg: #ffffff; --color-text: #1e293b; } body { font-family: system-ui, -apple-system, sans-serif; } """ return css_content # =========================================== # ENHANCED VISUALIZATION FUNCTIONS WITH GRADIO COMPATIBILITY # =========================================== def create_simple_telemetry_plot(scenario_name: str, is_real_arf: bool = True) -> go.Figure: """ FIXED: Enhanced for Gradio compatibility with better error handling """ try: # Generate sample telemetry data times = pd.date_range(start=datetime.datetime.now() - datetime.timedelta(minutes=10), end=datetime.datetime.now(), periods=60) # Different patterns based on scenario if "Cache" in scenario_name: normal_values = np.random.normal(30, 5, 30).tolist() anomaly_values = np.random.normal(85, 10, 30).tolist() data = normal_values + anomaly_values title = f"Cache Hit Rate: {scenario_name}" y_label = "Hit Rate (%)" threshold = 75 elif "Database" in scenario_name: normal_values = np.random.normal(15, 3, 30).tolist() anomaly_values = np.random.normal(95, 5, 30).tolist() data = normal_values + anomaly_values title = f"Database Connections: {scenario_name}" y_label = "Connections (%)" threshold = 90 elif "Kubernetes" in scenario_name: normal_values = np.random.normal(40, 8, 30).tolist() anomaly_values = np.random.normal(95, 2, 30).tolist() data = normal_values + anomaly_values title = f"Memory Usage: {scenario_name}" y_label = "Memory (%)" threshold = 85 else: normal_values = np.random.normal(50, 10, 30).tolist() anomaly_values = np.random.normal(90, 5, 30).tolist() data = normal_values + anomaly_values title = f"System Metrics: {scenario_name}" y_label = "Metric (%)" threshold = 80 # Create Plotly figure fig = go.Figure() # Add normal region fig.add_trace(go.Scatter( x=times[:30], y=data[:30], mode='lines', name='Normal', line=dict(color='#10b981', width=3), fill='tozeroy', fillcolor='rgba(16, 185, 129, 0.1)' )) # Add anomaly region fig.add_trace(go.Scatter( x=times[30:], y=data[30:], mode='lines', name='Anomaly', line=dict(color='#ef4444', width=3) )) # Add threshold line fig.add_hline(y=threshold, line_dash="dash", line_color="#f59e0b", annotation_text="Alert Threshold", annotation_position="top right") # Update layout - FIXED: Simplified for Gradio compatibility fig.update_layout( title={ 'text': title, 'font': dict(size=18, color='#1e293b', family="Arial, sans-serif"), 'x': 0.5 }, xaxis_title="Time", yaxis_title=y_label, height=300, margin=dict(l=40, r=20, t=50, b=40), plot_bgcolor='white', paper_bgcolor='white', showlegend=True, hovermode='x unified' ) logger.info(f"โœ… Created telemetry plot for {scenario_name}") return fig except Exception as e: logger.error(f"Error creating telemetry plot: {e}") # Return a simple valid Plotly figure as fallback fig = go.Figure() fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode='lines', name='Fallback')) fig.update_layout( title=f"Telemetry: {scenario_name}", height=300, plot_bgcolor='white' ) return fig def create_simple_impact_plot(scenario_name: str, is_real_arf: bool = True) -> go.Figure: """ FIXED: Enhanced for Gradio compatibility """ try: # Impact values based on scenario impact_values = { "Cache Miss Storm": 8500, "Database Connection Pool Exhaustion": 4200, "Kubernetes Memory Leak": 5500, "API Rate Limit Storm": 3800, "Network Partition": 12000, "Storage I/O Saturation": 6800 } impact = impact_values.get(scenario_name, 5000) # Create gauge chart - FIXED: Enhanced for Gradio fig = go.Figure(go.Indicator( mode="gauge+number", value=impact, domain={'x': [0, 1], 'y': [0, 1]}, title={ 'text': f"Revenue Impact: ${impact:,}/hour", 'font': dict(size=16, family="Arial, sans-serif") }, number={ 'prefix': "$", 'suffix': "/hour", 'font': dict(size=28, family="Arial, sans-serif") }, gauge={ 'axis': {'range': [None, impact * 1.2], 'tickwidth': 1}, 'bar': {'color': "#ef4444"}, 'bgcolor': "white", 'borderwidth': 2, 'bordercolor': "gray", 'steps': [ {'range': [0, impact * 0.3], 'color': '#10b981'}, {'range': [impact * 0.3, impact * 0.7], 'color': '#f59e0b'}, {'range': [impact * 0.7, impact], 'color': '#ef4444'} ], 'threshold': { 'line': {'color': "black", 'width': 4}, 'thickness': 0.75, 'value': impact } } )) # Update layout - FIXED: Enhanced for Gradio fig.update_layout( height=400, margin=dict(l=30, r=30, t=70, b=30), paper_bgcolor='white', font=dict(family="Arial, sans-serif") ) logger.info(f"โœ… Created impact plot for {scenario_name}") return fig except Exception as e: logger.error(f"Error creating impact plot: {e}") # Return a simple valid gauge as fallback fig = go.Figure(go.Indicator( mode="gauge", value=0, title={'text': "Impact (fallback)"} )) fig.update_layout(height=400) return fig def create_empty_plot(title: str, is_real_arf: bool = True) -> go.Figure: """ FIXED: Enhanced for Gradio compatibility """ try: fig = go.Figure() # Add text annotation - FIXED: Enhanced fig.add_annotation( x=0.5, y=0.5, text=title, showarrow=False, font=dict(size=18, color="#64748b", family="Arial, sans-serif"), xref="paper", yref="paper" ) # Add boundary indicator if needed if is_real_arf: fig.add_annotation( x=0.02, y=0.98, text="โœ… REAL ARF", showarrow=False, font=dict(size=12, color="#10b981", family="Arial, sans-serif"), xref="paper", yref="paper", bgcolor="white", bordercolor="#10b981", borderwidth=1, borderpad=4 ) fig.update_layout( title={ 'text': "Visualization Placeholder", 'font': dict(size=14, color="#94a3b8", family="Arial, sans-serif") }, height=300, plot_bgcolor='white', paper_bgcolor='white', xaxis={'visible': False}, yaxis={'visible': False}, margin=dict(l=20, r=20, t=50, b=20) ) return fig except Exception as e: logger.error(f"Error creating empty plot: {e}") # Ultra-simple fallback fig = go.Figure() fig.update_layout(height=300) return fig # =========================================== # ARF OSS UI ADAPTER - DOCTRINALLY PURE IMPLEMENTATION # =========================================== def transform_arf_output_for_ui(raw_result: dict, scenario_name: str) -> dict: """ TRANSLATOR FUNCTION - NOT AN ANALYST Extracts existing intelligence from real ARF OSS output and transforms to UI-expected format. Does not compute, infer, or enhance. Rules: 1. Source of truth: raw_result["oss_analysis"]["analysis"] 2. Extract only what exists 3. Derive UI fields mechanically from existing data 4. Never invent intelligence 5. Defaults must be visibly conservative, not plausible 6. Status must reflect contribution, not invocation Returns: UI-compatible dict with populated analysis and agents fields """ # =================================================== # STEP 1: DETERMINE INPUT MODE AND SOURCE DATA # =================================================== # Mode 1: Real OSS Mode (has oss_analysis) if "oss_analysis" in raw_result and raw_result["oss_analysis"]: oss_analysis = raw_result["oss_analysis"] source_analysis = oss_analysis.get("analysis", {}) if isinstance(oss_analysis, dict) else {} is_real_oss = True # Mode 2: Mock/Fallback Mode (already has analysis at root) elif "analysis" in raw_result and raw_result["analysis"]: source_analysis = raw_result["analysis"] is_real_oss = False # Mode 3: Error/Failure Mode else: # Return minimal UI-safe structure to prevent UI breakage return { "status": raw_result.get("status", "error"), "scenario": scenario_name, "arf_version": raw_result.get("arf_version", "3.3.9"), "analysis": { "detected": False, "confidence": 0, # VISIBLY CONSERVATIVE "similar_incidents": 0, # VISIBLY CONSERVATIVE "healing_intent_created": False, "recommended_action": "Check OSS analysis output", "estimated_recovery": "Unknown" # VISIBLY CONSERVATIVE }, "agents": { "detection": {"status": "error", "confidence": 0}, "recall": {"status": "error", "similar_incidents": 0}, "decision": {"status": "error", "healing_intent_created": False} }, "boundary_note": "OSS analysis output malformed", "installation": { "oss_installed": True, "version": "3.3.9", "edition": "oss" } } # =================================================== # STEP 2: EXTRACT ANALYSIS DATA (SOURCE OF TRUTH ONLY) # =================================================== # Extract detection data - MUST EXIST IN SOURCE detection_data = source_analysis.get("detection", {}) if isinstance(source_analysis, dict) else {} recall_data = source_analysis.get("recall", {}) if isinstance(source_analysis, dict) else {} decision_data = source_analysis.get("decision", {}) if isinstance(source_analysis, dict) else {} # =================================================== # STEP 3: BUILD UI ANALYSIS (DERIVED, NOT INFERRED) # =================================================== # DOCTRINALLY PURE: detection only when OSS explicitly signals it detected = False if isinstance(detection_data, dict): if "anomaly_detected" in detection_data: detected = bool(detection_data["anomaly_detected"]) elif "detected" in detection_data: detected = bool(detection_data["detected"]) # CORRECTED: confidence = 0 if not in source (visibly conservative) confidence = 0 # VISIBLY CONSERVATIVE DEFAULT # Confidence must come from detection if anomaly detected if detected and isinstance(detection_data, dict) and "confidence" in detection_data: confidence = detection_data["confidence"] elif isinstance(decision_data, dict) and "confidence" in decision_data: confidence = decision_data["confidence"] # CORRECTED: similar_incidents = 0 if not in source (visibly conservative) similar_incidents = 0 # VISIBLY CONSERVATIVE DEFAULT if isinstance(recall_data, dict) and "results" in recall_data: if isinstance(recall_data["results"], list): similar_incidents = len(recall_data["results"]) elif "similar_incidents" in recall_data: similar_incidents = recall_data["similar_incidents"] # DOCTRINALLY PURE: healing intent only when explicitly true in OSS output healing_intent_created = False if isinstance(decision_data, dict): # Healing intent exists if explicitly marked OR an action is present healing_intent_created = bool( decision_data.get("healing_intent_created", False) or decision_data.get("action") or decision_data.get("recommended_action") ) # Rule: recommended_action = pass through existing decision action text recommended_action = "No actionable intelligence found" # VISIBLY CONSERVATIVE if isinstance(decision_data, dict): if "action" in decision_data: recommended_action = decision_data["action"] elif "recommended_action" in decision_data: recommended_action = decision_data["recommended_action"] # CORRECTED: estimated_recovery = "Unknown" (do not calculate or imply) estimated_recovery = "Unknown" # =================================================== # STEP 4: BUILD UI AGENTS (CONTRIBUTION-BASED STATUS) # =================================================== # REFINED: Agent status reflects actual contribution, not mere invocation # Detection agent: "active" only if detection actually occurred detection_status = "active" if detected else "inactive" # Recall agent: "active" only if similar incidents were found recall_status = "active" if similar_incidents > 0 else "inactive" # Decision agent: "active" only if healing intent was created decision_status = "active" if healing_intent_created else "inactive" # Override status to "error" if OSS status is error if raw_result.get("status") == "error": detection_status = "error" recall_status = "error" decision_status = "error" # =================================================== # STEP 5: ASSEMBLE FINAL UI-COMPATIBLE RESULT # =================================================== result = { "status": raw_result.get("status", "success"), "scenario": raw_result.get("scenario", scenario_name), "arf_version": raw_result.get("arf_version", "3.3.9"), "analysis": { "detected": detected, "confidence": confidence, "similar_incidents": similar_incidents, "healing_intent_created": healing_intent_created, "recommended_action": recommended_action, "estimated_recovery": estimated_recovery }, "agents": { "detection": { "status": detection_status, "confidence": confidence if detection_status == "active" else 0 }, "recall": { "status": recall_status, "similar_incidents": similar_incidents if recall_status == "active" else 0 }, "decision": { "status": decision_status, "healing_intent_created": healing_intent_created if decision_status == "active" else False } }, "boundary_note": raw_result.get("boundary_note", "Real ARF OSS 3.3.9 analysis complete โ†’ Ready for Enterprise execution"), "installation": { "oss_installed": True, "version": "3.3.9", "edition": "oss" } } # Preserve original oss_analysis for debugging (optional) if is_real_oss: result["_original_oss_analysis"] = raw_result.get("oss_analysis") return result # =========================================== # UPDATED: run_true_arf_analysis() - DOCTRINALLY PURE # =========================================== @AsyncRunner.async_to_sync async def run_true_arf_analysis(scenario_name: str) -> tuple: """ DOCTRINALLY PURE VERSION: Adapter transforms ARF OSS output with epistemic honesty Returns exactly 5 values as expected by UI: 1. detection_html (HTML string) 2. recall_html (HTML string) 3. decision_html (HTML string) 4. oss_results_dict (Python dict for JSON display) 5. incident_df (DataFrame for Gradio DataFrame component) """ components = get_components() installation = get_installation_status() boundaries = BoundaryManager.get_system_boundaries() logger.info(f"๐Ÿ” Running True ARF analysis for: {scenario_name}") try: # Get orchestrator orchestrator = components["DemoOrchestrator"]() # Get scenario data scenarios = components["INCIDENT_SCENARIOS"] scenario_data = scenarios.get(scenario_name, {}) # ====================================================== # DOCTRINAL FIX POINT: CALL REAL ARF OSS # ====================================================== raw_result = await orchestrator.analyze_incident(scenario_name, scenario_data) # ====================================================== # DOCTRINAL ADAPTER: TRANSFORM WITH EPISTEMIC HONESTY # ====================================================== transformed_result = transform_arf_output_for_ui(raw_result, scenario_name) # ====================================================== # EXISTING UI INTEGRATION (PRESERVED) # ====================================================== # Add to audit trail get_audit_manager().add_incident(scenario_name, transformed_result) # Create HTML for active agents using transformed result boundary_color = boundaries["oss"]["color"] # Extract data from transformed result analysis = transformed_result.get("analysis", {}) agents = transformed_result.get("agents", {}) # Get data for HTML templates confidence = analysis.get("confidence", 0) # CONSERVATIVE DEFAULT similar_incidents = analysis.get("similar_incidents", 0) # CONSERVATIVE DEFAULT detection_agent = agents.get("detection", {}) recall_agent = agents.get("recall", {}) decision_agent = agents.get("decision", {}) # Detection Agent HTML - Now truthfully reflects actual detection detection_status = detection_agent.get("status", "inactive") detection_status_text = detection_status.capitalize() # REFINED: Badge text reflects actual contribution state if detection_status == "active": detection_badge = "DETECTED" confidence_text = f"Anomaly detected with {confidence}% confidence" elif detection_status == "inactive": detection_badge = "ANALYZING" confidence_text = "No anomaly signal found" else: detection_badge = "ERROR" confidence_text = "Detection analysis failed" detection_html = f"""
๐Ÿ•ต๏ธโ€โ™‚๏ธ

Detection Process

{confidence_text}

Status: {detection_status_text}
{detection_badge}
""" # Recall Agent HTML - Now truthfully reflects actual recall recall_status = recall_agent.get("status", "inactive") recall_status_text = recall_status.capitalize() # REFINED: Badge text reflects actual contribution state if recall_status == "active": recall_badge = "RECALLED" recall_text = f"Found {similar_incidents} similar incident{'s' if similar_incidents != 1 else ''}" elif recall_status == "inactive": recall_badge = "SEARCHING" recall_text = "No similar incidents found" else: recall_badge = "ERROR" recall_text = "Recall analysis failed" recall_html = f"""
๐Ÿง 

Recall Process

{recall_text}

Status: {recall_status_text}
{recall_badge}
""" # Decision Agent HTML - Now truthfully reflects actual decision decision_status = decision_agent.get("status", "inactive") decision_status_text = decision_status.capitalize() # REFINED: Badge text reflects actual contribution state if decision_status == "active": decision_badge = "DECIDED" decision_text = analysis.get('recommended_action', 'Action recommended') elif decision_status == "inactive": decision_badge = "EVALUATING" decision_text = "No action recommended" else: decision_badge = "ERROR" decision_text = "Decision analysis failed" decision_html = f"""
๐ŸŽฏ

Decision Process

{decision_text}

Status: {decision_status_text}
{decision_badge}
""" # OSS Results Dict for JSON display (using transformed result) oss_results_dict = transformed_result # Incident DataFrame incident_df = get_audit_manager().get_incident_dataframe() logger.info(f"โœ… True ARF analysis complete for {scenario_name}") logger.info(f" Detection: {'ACTIVE' if detection_status == 'active' else 'INACTIVE'} (confidence: {confidence})") logger.info(f" Recall: {'ACTIVE' if recall_status == 'active' else 'INACTIVE'} (incidents: {similar_incidents})") logger.info(f" Decision: {'ACTIVE' if decision_status == 'active' else 'INACTIVE'}") return detection_html, recall_html, decision_html, oss_results_dict, incident_df except Exception as e: logger.error(f"True ARF analysis failed: {e}") # Return error state with proper types error_html = f"""
โŒ

Analysis Error

Failed to analyze incident

Status: Error
""" error_dict = { "status": "error", "error": str(e), "scenario": scenario_name, "arf_version": "3.3.9", "analysis": { "detected": False, "confidence": 0, "similar_incidents": 0, "healing_intent_created": False, "recommended_action": "Check ARF installation", "estimated_recovery": "Unknown" }, "agents": { "detection": {"status": "error", "confidence": 0}, "recall": {"status": "error", "similar_incidents": 0}, "decision": {"status": "error", "healing_intent_created": False} } } # Return empty DataFrame on error error_df = pd.DataFrame(columns=["Error", "Message"]).from_records([ {"Error": "Analysis Failed", "Message": str(e)} ]) return error_html, error_html, error_html, error_dict, error_df # =========================================== # IMPORT MODULAR COMPONENTS - FIXED: Added MockEnhancedROICalculator # =========================================== def import_components() -> Dict[str, Any]: """Safely import all components with proper error handling - FIXED: Added mock ROI calculator""" components = { "all_available": False, "error": None, "get_styles": lambda: "", "show_boundaries": settings.show_boundaries, } try: logger.info("Starting component import...") # First, import gradio import gradio as gr components["gr"] = gr # Import UI styles from ui.styles import get_styles components["get_styles"] = get_styles # Import UI components - IMPORTANT: Now includes create_realism_panel AND update_performance_metrics from ui.components import ( create_header, create_status_bar, create_tab1_incident_demo, create_tab2_business_roi, create_tab3_enterprise_features, create_tab4_audit_trail, create_tab5_learning_engine, create_footer, create_realism_panel, update_performance_metrics # Added update_performance_metrics ) components.update({ "create_header": create_header, "create_status_bar": create_status_bar, "create_tab1_incident_demo": create_tab1_incident_demo, "create_tab2_business_roi": create_tab2_business_roi, "create_tab3_enterprise_features": create_tab3_enterprise_features, "create_tab4_audit_trail": create_tab4_audit_trail, "create_tab5_learning_engine": create_tab5_learning_engine, "create_footer": create_footer, "create_realism_panel": create_realism_panel, "update_performance_metrics": update_performance_metrics # Added for dynamic metrics }) # Import scenarios from demo.scenarios import INCIDENT_SCENARIOS components["INCIDENT_SCENARIOS"] = INCIDENT_SCENARIOS # Try to import TrueARFOrchestrator (renamed for version consistency) try: from core.true_arf_orchestrator import TrueARFOrchestrator components["DemoOrchestrator"] = TrueARFOrchestrator except ImportError: # Fallback to old name for compatibility during transition try: from core.true_arf_orchestrator import TrueARF337Orchestrator components["DemoOrchestrator"] = TrueARF337Orchestrator logger.warning("โš ๏ธ Using TrueARF337Orchestrator - rename to TrueARFOrchestrator for version consistency") except ImportError: # Fallback to real ARF integration try: from core.real_arf_integration import RealARFIntegration components["DemoOrchestrator"] = RealARFIntegration except ImportError: # Create a minimal mock orchestrator class MockOrchestrator: async def analyze_incident(self, scenario_name, scenario_data): return { "status": "mock", "scenario": scenario_name, "message": "Mock analysis (no real ARF available)", "boundary_note": "OSS advisory mode - execution requires Enterprise", "demo_display": { "real_arf_version": "mock", "true_oss_used": False, "enterprise_simulated": True, "architectural_boundary": "OSS advises โ†’ Enterprise would execute" } } async def execute_healing(self, scenario_name, mode="autonomous"): return { "status": "mock", "scenario": scenario_name, "message": "Mock execution (no real ARF available)", "boundary_note": "Simulated Enterprise execution - real execution requires infrastructure", "enterprise_features_used": ["simulated_execution", "mock_rollback", "demo_mode"] } components["DemoOrchestrator"] = MockOrchestrator # FIXED: EnhancedROICalculator with proper mock fallback try: from core.calculators import EnhancedROICalculator components["EnhancedROICalculator"] = EnhancedROICalculator() logger.info("โœ… Real EnhancedROICalculator loaded") except ImportError: # Create comprehensive mock ROI calculator class MockEnhancedROICalculator: """Mock ROI calculator for demo purposes - FIXED to prevent KeyError""" def calculate_comprehensive_roi(self, scenario_name=None, monthly_incidents=15, team_size=5, **kwargs): """Calculate comprehensive ROI metrics with realistic mock data""" from datetime import datetime # Mock ROI calculation with realistic values impact_map = { "Cache Miss Storm": 8500, "Database Connection Pool Exhaustion": 4200, "Kubernetes Memory Leak": 5500, "API Rate Limit Storm": 3800, "Network Partition": 12000, "Storage I/O Saturation": 6800 } impact_per_incident = impact_map.get(scenario_name or "Cache Miss Storm", 5000) annual_impact = impact_per_incident * monthly_incidents * 12 potential_savings = int(annual_impact * 0.82) enterprise_cost = 625000 roi_multiplier = round(potential_savings / enterprise_cost, 1) payback_months = round((enterprise_cost / (potential_savings / 12)), 1) return { "status": "โœ… Calculated Successfully", "scenario": scenario_name or "Cache Miss Storm", "timestamp": datetime.now().isoformat(), "calculator": "MockEnhancedROICalculator", "summary": { "your_annual_impact": f"${annual_impact:,}", "potential_savings": f"${potential_savings:,}", "enterprise_cost": f"${enterprise_cost:,}", "roi_multiplier": f"{roi_multiplier}ร—", "payback_months": f"{payback_months}", "annual_roi_percentage": f"{int((potential_savings - enterprise_cost) / enterprise_cost * 100)}%", "boundary_context": "Based on OSS analysis + simulated Enterprise execution" }, "breakdown": { "direct_cost_savings": f"${int(potential_savings * 0.7):,}", "productivity_gains": f"${int(potential_savings * 0.2):,}", "risk_reduction": f"${int(potential_savings * 0.1):,}" }, "annual_projection": { "incidents_prevented": monthly_incidents * 12, "annual_savings": f"${potential_savings:,}", "roi": f"{roi_multiplier}ร—" }, "notes": [ "๐Ÿ“Š ROI calculation using mock data", "๐Ÿ’ก Real enterprise ROI includes additional factors", "๐Ÿ”’ Full ROI requires Enterprise edition", f"๐Ÿ“ˆ Based on {monthly_incidents} incidents/month" ] } def get_roi_visualization_data(self): """Get data for ROI visualization""" return { "labels": ["Direct Savings", "Productivity", "Risk Reduction", "Upsell"], "values": [65, 20, 10, 5], "colors": ["#10b981", "#3b82f6", "#8b5cf6", "#f59e0b"] } components["EnhancedROICalculator"] = MockEnhancedROICalculator() logger.info("โœ… Mock EnhancedROICalculator created (preventing KeyError)") # Try to import visualization engine try: from core.visualizations import EnhancedVisualizationEngine components["EnhancedVisualizationEngine"] = EnhancedVisualizationEngine() except ImportError: class MockVisualizationEngine: def create_executive_dashboard(self, data=None, is_real_arf=True): return create_empty_plot("Executive Dashboard", is_real_arf) def create_telemetry_plot(self, scenario_name, anomaly_detected=True, is_real_arf=True): return create_simple_telemetry_plot(scenario_name, is_real_arf) def create_impact_gauge(self, scenario_name, is_real_arf=True): return create_simple_impact_plot(scenario_name, is_real_arf) def create_timeline_comparison(self, is_real_arf=True): return create_empty_plot("Timeline Comparison", is_real_arf) components["EnhancedVisualizationEngine"] = MockVisualizationEngine() components["all_available"] = True components["error"] = None logger.info("โœ… Successfully imported all modular components including update_performance_metrics") except Exception as e: logger.error(f"โŒ IMPORT ERROR: {e}") components["error"] = str(e) components["all_available"] = False # Ensure we have minimal components if "gr" not in components: import gradio as gr components["gr"] = gr if "INCIDENT_SCENARIOS" not in components: components["INCIDENT_SCENARIOS"] = { "Cache Miss Storm": { "component": "Redis Cache Cluster", "severity": "HIGH", "business_impact": {"revenue_loss_per_hour": 8500}, "boundary_note": "OSS analysis only - execution requires Enterprise" } } # Ensure EnhancedROICalculator exists if "EnhancedROICalculator" not in components: class MinimalROICalculator: def calculate_comprehensive_roi(self, **kwargs): return { "status": "โœ… Minimal ROI Calculation", "summary": {"roi_multiplier": "5.2ร—"} } components["EnhancedROICalculator"] = MinimalROICalculator() # Ensure update_performance_metrics exists if "update_performance_metrics" not in components: def fallback_performance_metrics(scenario_name: str): """Fallback function if the real one fails""" logger.warning(f"Using fallback performance metrics for {scenario_name}") return ( """
โฑ๏ธ

Detection Time

42s

โ†“ 90% faster than average

""", """
โšก

Mean Time to Resolve

14m

โ†“ 70% faster than manual

""", """
๐Ÿค–

Auto-Heal Rate

78.9%

โ†‘ 5.0ร— industry average

""", """
๐Ÿ’ฐ

Cost Saved

$7.2K

Per incident avoided

""" ) components["update_performance_metrics"] = fallback_performance_metrics return components _components = None _audit_manager = None def get_components() -> Dict[str, Any]: """Lazy load components singleton""" global _components if _components is None: _components = import_components() return _components # =========================================== # AUDIT TRAIL MANAGER - FIXED: Returns DataFrames instead of HTML # =========================================== class AuditTrailManager: """Enhanced audit trail manager with boundary tracking - FIXED to return DataFrames""" def __init__(self): self.executions = [] self.incidents = [] self.boundary_crossings = [] self.max_items = settings.max_history_items def add_execution(self, scenario_name: str, mode: str, result: Dict): """Add an execution record""" record = { "timestamp": datetime.datetime.now().isoformat(), "scenario": scenario_name, "mode": mode, "result": result, "boundary_context": "Enterprise execution simulated" if "simulated" in str(result) else "OSS advisory" } self.executions.insert(0, record) if len(self.executions) > self.max_items: self.executions = self.executions[:self.max_items] # Track boundary crossing if "enterprise" in mode.lower(): self.boundary_crossings.append({ "timestamp": record["timestamp"], "from": "OSS", "to": "Enterprise", "action": scenario_name }) logger.info(f"๐Ÿ“ Execution recorded: {scenario_name} ({mode})") return record def add_incident(self, scenario_name: str, analysis_result: Dict): """Add an incident analysis record""" record = { "timestamp": datetime.datetime.now().isoformat(), "scenario": scenario_name, "analysis": analysis_result, "boundary_context": analysis_result.get("boundary_note", "OSS analysis") } self.incidents.insert(0, record) if len(self.incidents) > self.max_items: self.incidents = self.incidents[:self.max_items] logger.info(f"๐Ÿ“ Incident analysis recorded: {scenario_name}") return record def get_execution_dataframe(self) -> pd.DataFrame: """ FIXED: Robust pandas DataFrame creation for Gradio DataFrame component """ try: if not self.executions: # Return empty DataFrame with correct columns return pd.DataFrame(columns=[ "Execution ID", "Scenario", "Status", "Mode", "Start Time", "End Time", "Duration", "Boundary" ]) # Build DataFrame from executions with safe access data = [] for i, execution in enumerate(self.executions): try: # Safe access to nested dictionaries result = execution.get("result", {}) # Execution ID - safe extraction with fallback exec_id = result.get("execution_id", f"exec_{i:03d}") # Status determination with multiple fallbacks status_text = "Unknown" if isinstance(result, dict): status_lower = str(result.get("status", "")).lower() if "success" in status_lower: status_text = "Success" elif "failed" in status_lower or "error" in status_lower: status_text = "Failed" else: # Check if there's an error key if result.get("error"): status_text = "Failed" else: status_text = "Success" # Mode extraction mode = execution.get("mode", "unknown") # Scenario extraction scenario = execution.get("scenario", "Unknown") # Timestamp formatting with validation timestamp = execution.get("timestamp", "") start_time = "" if timestamp and len(timestamp) > 10: try: # Format: YYYY-MM-DD HH:MM:SS start_time = timestamp[:19] except Exception: start_time = timestamp # Fallback to raw string # End time extraction from telemetry end_time = "" telemetry = result.get("telemetry", {}) if telemetry: end_timestamp = telemetry.get("end_time", "") if end_timestamp and len(end_timestamp) > 10: try: end_time = end_timestamp[:19] except Exception: end_time = end_timestamp # Fallback # Duration - mock or extract from execution duration = "12m" # Default mock duration if telemetry and "estimated_duration" in telemetry: duration = telemetry.get("estimated_duration", "12m") # Boundary context boundary = execution.get("boundary_context", "Unknown") data.append({ "Execution ID": exec_id, "Scenario": scenario, "Status": status_text, "Mode": mode, "Start Time": start_time, "End Time": end_time, "Duration": duration, "Boundary": boundary }) except Exception as row_error: logger.warning(f"Error processing execution row {i}: {row_error}") # Add error row for debugging data.append({ "Execution ID": f"error_{i}", "Scenario": "Error", "Status": "Failed", "Mode": "error", "Start Time": datetime.datetime.now().isoformat()[:19], "End Time": "", "Duration": "0m", "Boundary": "Error processing" }) if not data: logger.warning("No valid execution data found, returning empty DataFrame") return pd.DataFrame(columns=[ "Execution ID", "Scenario", "Status", "Mode", "Start Time", "End Time", "Duration", "Boundary" ]) # Create DataFrame df = pd.DataFrame(data) # Safe sorting - only if we have valid Start Time data if not df.empty and "Start Time" in df.columns: # Check if Start Time column has valid data valid_times = df["Start Time"].apply( lambda x: isinstance(x, str) and len(x) > 0 and x != "None" ) if valid_times.any(): try: # Sort by time (newest first) df = df.sort_values("Start Time", ascending=False) except Exception as sort_error: logger.warning(f"Could not sort DataFrame: {sort_error}") # Keep unsorted if sorting fails else: logger.debug("No valid timestamps for sorting") logger.info(f"โœ… Created execution DataFrame with {len(df)} rows") return df except Exception as e: logger.error(f"โŒ Error creating execution DataFrame: {e}") # Return informative error DataFrame error_df = pd.DataFrame(columns=[ "Error", "Message", "Timestamp" ]).from_records([{ "Error": "DataFrame Creation Failed", "Message": str(e), "Timestamp": datetime.datetime.now().isoformat()[:19] }]) return error_df def get_incident_dataframe(self) -> pd.DataFrame: """ FIXED: Robust pandas DataFrame creation for Gradio DataFrame component """ try: if not self.incidents: # Return empty DataFrame with correct columns return pd.DataFrame(columns=[ "Scenario", "Status", "Boundary", "Time", "Confidence", "Action", "Target" ]) # Build DataFrame from incidents with safe access data = [] for i, incident in enumerate(self.incidents): try: # Safe extraction of basic fields scenario = incident.get("scenario", "Unknown") boundary = incident.get("boundary_context", "OSS analysis") # Analysis data extraction analysis = incident.get("analysis", {}) # Status determination status = "Analyzed" if isinstance(analysis, dict): analysis_status = analysis.get("status", "").lower() if analysis_status: status = analysis_status.capitalize() else: # Fallback status determination if analysis.get("error"): status = "Error" elif analysis.get("analysis") or analysis.get("oss_analysis"): status = "Success" # Timestamp formatting timestamp = incident.get("timestamp", "") time_display = "" if timestamp and len(timestamp) > 10: try: # Extract HH:MM:SS time_display = timestamp[11:19] except Exception: time_display = timestamp[:8] if len(timestamp) >= 8 else timestamp # Extract healing intent details with multiple fallback paths confidence = 0.85 # Default confidence action = "Analysis" target = "system" # Try multiple paths to find healing intent healing_intent = None # Path 1: oss_analysis -> analysis -> decision oss_analysis = analysis.get("oss_analysis", {}) if isinstance(oss_analysis, dict): oss_analysis_inner = oss_analysis.get("analysis", {}) if isinstance(oss_analysis_inner, dict): healing_intent = oss_analysis_inner.get("decision", {}) # Path 2: direct analysis -> decision if not healing_intent and isinstance(analysis.get("analysis", {}), dict): healing_intent = analysis["analysis"].get("decision", {}) # Path 3: direct healing_intent if not healing_intent: healing_intent = analysis.get("healing_intent", {}) if healing_intent and isinstance(healing_intent, dict): confidence = healing_intent.get("confidence", 0.85) action = healing_intent.get("action", "Analysis") target = healing_intent.get("target", "system") # Format confidence as percentage confidence_display = f"{confidence * 100:.1f}%" data.append({ "Scenario": scenario, "Status": status, "Boundary": boundary, "Time": time_display, "Confidence": confidence_display, "Action": action[:50], # Limit action length "Target": target[:30] # Limit target length }) except Exception as row_error: logger.warning(f"Error processing incident row {i}: {row_error}") # Add error row for debugging data.append({ "Scenario": "Error", "Status": "Failed", "Boundary": "Error processing", "Time": datetime.datetime.now().isoformat()[11:19], "Confidence": "0.0%", "Action": "Error", "Target": "system" }) if not data: logger.warning("No valid incident data found, returning empty DataFrame") return pd.DataFrame(columns=[ "Scenario", "Status", "Boundary", "Time", "Confidence", "Action", "Target" ]) # Create DataFrame df = pd.DataFrame(data) # Safe sorting - only if we have valid Time data if not df.empty and "Time" in df.columns: # Check if Time column has valid data valid_times = df["Time"].apply( lambda x: isinstance(x, str) and len(x) > 0 and x != "None" ) if valid_times.any(): try: # Sort by time (newest first) df = df.sort_values("Time", ascending=False) except Exception as sort_error: logger.warning(f"Could not sort incident DataFrame: {sort_error}") # Keep unsorted if sorting fails else: logger.debug("No valid timestamps for sorting in incident DataFrame") logger.info(f"โœ… Created incident DataFrame with {len(df)} rows") return df except Exception as e: logger.error(f"โŒ Error creating incident DataFrame: {e}") # Return informative error DataFrame error_df = pd.DataFrame(columns=[ "Error", "Message", "Timestamp" ]).from_records([{ "Error": "DataFrame Creation Failed", "Message": str(e), "Timestamp": datetime.datetime.now().isoformat()[:19] }]) return error_df def get_execution_table_html(self): """Legacy HTML method for backward compatibility""" if not self.executions: return """
๐Ÿ“ญ

No executions yet

Run scenarios to see execution history

""" rows = [] for i, exec in enumerate(self.executions[:10]): status = "โœ…" if "success" in exec["result"].get("status", "").lower() else "โš ๏ธ" boundary = exec["boundary_context"] boundary_color = "#10b981" if "OSS" in boundary else "#8b5cf6" rows.append(f""" {status} {exec["scenario"]} {exec["mode"]}
{boundary}
{exec["timestamp"][11:19]} """) return f"""
{''.join(rows)}
Scenario Mode Boundary Time
""" def get_incident_table_html(self): """Legacy HTML method for backward compatibility""" if not self.incidents: return """
๐Ÿ“ญ

No incidents analyzed yet

Run OSS analysis to see incident history

""" rows = [] for i, incident in enumerate(self.incidents[:10]): scenario = incident["scenario"] analysis = incident["analysis"] boundary = incident["boundary_context"] boundary_color = "#10b981" if "OSS" in boundary else "#8b5cf6" rows.append(f""" {scenario} {analysis.get('status', 'analyzed')}
{boundary}
{incident["timestamp"][11:19]} """) return f"""
{''.join(rows)}
Scenario Status Boundary Time
""" def clear(self): """Clear all audit trails""" self.executions = [] self.incidents = [] self.boundary_crossings = [] logger.info("๐Ÿงน Audit trail cleared") def export_json(self): """Export audit trail as JSON""" return { "executions": self.executions, "incidents": self.incidents, "boundary_crossings": self.boundary_crossings, "export_time": datetime.datetime.now().isoformat(), "version": "3.3.9", "architecture": "OSS advises โ†’ Enterprise executes" } def get_audit_manager() -> AuditTrailManager: """Lazy load audit manager singleton""" global _audit_manager if _audit_manager is None: _audit_manager = AuditTrailManager() return _audit_manager # =========================================== # HELPER FUNCTIONS # =========================================== def get_scenario_impact(scenario_name: str) -> float: """Get average impact for a given scenario""" impact_map = { "Cache Miss Storm": 8500, "Database Connection Pool Exhaustion": 4200, "Kubernetes Memory Leak": 5500, "API Rate Limit Storm": 3800, "Network Partition": 12000, "Storage I/O Saturation": 6800 } return impact_map.get(scenario_name, 5000) def extract_roi_multiplier(roi_result: Dict) -> float: """Extract ROI multiplier from EnhancedROICalculator result""" try: if "summary" in roi_result and "roi_multiplier" in roi_result["summary"]: roi_str = roi_result["summary"]["roi_multiplier"] if "ร—" in roi_str: return float(roi_str.replace("ร—", "")) return float(roi_str) return 5.2 except Exception as e: logger.warning(f"Failed to extract ROI multiplier: {e}") return 5.2 # =========================================== # SURGICAL FIX: update_scenario_display() - ENHANCED WITH REALISM PANEL # =========================================== def update_scenario_display(scenario_name: str) -> tuple: """ ENHANCED: Returns Plotly figures AND realism panel Returns 5 values: (scenario_card_html, telemetry_fig, impact_fig, timeline_fig, realism_html) """ components = get_components() scenarios = components["INCIDENT_SCENARIOS"] scenario = scenarios.get(scenario_name, { "component": "Unknown System", "severity": "MEDIUM", "business_impact": {"revenue_loss_per_hour": 5000}, "boundary_note": "Scenario not found" }) # Create scenario card HTML (MODERN: Use Card component if available) if get_feature_flags().get('modern_ui', False) and MODERN_UI_AVAILABLE: # Use modern Card component scenario_card_html = Card.create( title=scenario_name, content=f"""
{scenario["severity"]} SEVERITY
{scenario["component"]}
Boundary Context: {scenario.get('boundary_note', 'OSS analyzes, Enterprise executes')}
""", footer=f"Revenue Impact: ${scenario['business_impact'].get('revenue_loss_per_hour', get_scenario_impact(scenario_name)):,}/hour" ) else: # Legacy scenario card severity_colors = { "HIGH": "#ef4444", "MEDIUM": "#f59e0b", "LOW": "#10b981" } severity_color = severity_colors.get(scenario["severity"], "#64748b") impact = scenario["business_impact"].get("revenue_loss_per_hour", get_scenario_impact(scenario_name)) scenario_card_html = f"""

{scenario_name}

{scenario["severity"]} SEVERITY
{scenario["component"]}
${impact:,}
Revenue Loss/Hour
Business Impact Analysis
45 min
Without ARF
12 min
With ARF
${int(impact * 0.85):,}
Savings
Boundary Context: {scenario.get('boundary_note', 'OSS analyzes, Enterprise executes')}
""" # Get visualizations as Plotly figures (ENHANCED) telemetry_fig = create_simple_telemetry_plot(scenario_name, settings.use_true_arf) impact_fig = create_simple_impact_plot(scenario_name, settings.use_true_arf) timeline_fig = create_empty_plot(f"Timeline: {scenario_name}", settings.use_true_arf) # ============ NEW: Create realism panel ============ try: # Use the imported create_realism_panel function realism_html = components["create_realism_panel"](scenario, scenario_name) except (ImportError, KeyError): # Fallback if realism function isn't available yet realism_html = """
๐Ÿ”ง

Realism Panel Loading...

Trade-offs, risk assessments, and ranked actions will appear here

""" logger.info(f"โœ… Updated scenario display for {scenario_name} with realism panel") # ============ CHANGE HERE: Add realism_html to return tuple ============ return scenario_card_html, telemetry_fig, impact_fig, timeline_fig, realism_html # =========================================== # ENHANCED: Combined update function for scenario display + performance metrics # =========================================== def update_scenario_display_with_metrics(scenario_name: str) -> tuple: """Combined update function - doctrinally compliant""" # Get scenario display components (5 outputs) scenario_card, telemetry_fig, impact_fig, timeline_fig, _ = update_scenario_display(scenario_name) # Get doctrinally compliant performance metrics (4 outputs) components = get_components() detection_time, recall_quality, confidence_score, sequencing_stage = components["update_performance_metrics"](scenario_name) return (scenario_card, telemetry_fig, impact_fig, timeline_fig, detection_time, recall_quality, confidence_score, sequencing_stage) # Changed # =========================================== # FIXED EXECUTION FUNCTION - Returns DataFrames # =========================================== def execute_enterprise_healing(scenario_name, approval_required, mcp_mode_value): """ MINIMAL FIX: Returns proper data types matching UI expectations FIXED: Returns DataFrame instead of HTML for execution table """ import gradio as gr components = get_components() installation = get_installation_status() boundaries = BoundaryManager.get_system_boundaries() logger.info(f"โšก Executing enterprise healing for: {scenario_name}") # Check if Enterprise is actually available is_real_enterprise = installation["enterprise_installed"] is_simulated = not is_real_enterprise # Get scenario impact scenario = components["INCIDENT_SCENARIOS"].get(scenario_name, {}) impact = scenario.get("business_impact", {}) revenue_loss = impact.get("revenue_loss_per_hour", get_scenario_impact(scenario_name)) savings = int(revenue_loss * 0.85) # Create approval display HTML if approval_required: approval_display = """
โณ

HUMAN APPROVAL REQUIRED

Based on your safety settings, this execution requires human approval.

""" else: approval_display = """
โšก

AUTONOMOUS APPROVAL GRANTED

Proceeding with autonomous execution.

""" # Execute healing (async) @AsyncRunner.async_to_sync async def execute_async(): try: orchestrator = components["DemoOrchestrator"]() execution_result = await orchestrator.execute_healing(scenario_name, "autonomous") # Add to audit trail get_audit_manager().add_execution(scenario_name, "enterprise_autonomous", execution_result) return execution_result except Exception as e: logger.error(f"Execution failed: {e}") return { "status": "failed", "error": str(e), "boundary_note": "Execution boundary reached" } execution_result = execute_async() # Create results dict for JSON display if is_real_enterprise: enterprise_results = { "demo_mode": "Real Enterprise", "scenario": scenario_name, "arf_version": boundaries["enterprise"]["version"], "execution_mode": "autonomous" if not approval_required else "human_approved", "results": { "recovery_time": "12 minutes", "cost_saved": f"${savings:,}", "users_protected": "45,000" }, "safety_features": [ "Rollback guarantee: 100%", "Atomic execution", "MCP validation" ] } else: enterprise_results = { "demo_mode": "Enterprise Simulation", "scenario": scenario_name, "arf_version": boundaries["enterprise"]["version"], "execution_mode": "simulated_autonomous", "results": { "recovery_time": "12 minutes (simulated)", "cost_saved": f"${savings:,} (simulated)", "users_protected": "45,000 (simulated)" }, "safety_features": [ "Rollback guarantee: 100% (simulated)", "Atomic execution (simulated)" ] } # Get execution DataFrame (FIXED: Returns DataFrame instead of HTML) execution_df = get_audit_manager().get_execution_dataframe() return approval_display, enterprise_results, execution_df # =========================================== # FIXED ROI FUNCTION - Enhanced for Gradio # =========================================== def calculate_roi(scenario_name, monthly_incidents, team_size): """ ENHANCED: Returns (JSON/dict, Plotly figure) for ROI calculation with Gradio compatibility """ components = get_components() try: # Try to use real ROI calculator calculator = components["EnhancedROICalculator"] roi_result = calculator.calculate_comprehensive_roi( scenario_name=scenario_name, monthly_incidents=monthly_incidents, team_size=team_size ) except Exception as e: logger.warning(f"ROI calculation failed, using mock: {e}") # Mock ROI calculation impact_per_incident = get_scenario_impact(scenario_name) annual_impact = impact_per_incident * monthly_incidents * 12 potential_savings = int(annual_impact * 0.82) enterprise_cost = 625000 roi_multiplier = round(potential_savings / enterprise_cost, 1) payback_months = round((enterprise_cost / (potential_savings / 12)), 1) roi_result = { "status": "โœ… Calculated Successfully", "summary": { "your_annual_impact": f"${annual_impact:,}", "potential_savings": f"${potential_savings:,}", "enterprise_cost": f"${enterprise_cost:,}", "roi_multiplier": f"{roi_multiplier}ร—", "payback_months": f"{payback_months}", "annual_roi_percentage": f"{int((potential_savings - enterprise_cost) / enterprise_cost * 100)}%", "boundary_context": "Based on OSS analysis + simulated Enterprise execution" }, "boundary_note": "ROI calculation includes OSS advisory value and simulated Enterprise execution benefits" } # Create ROI chart as Plotly figure (ENHANCED for Gradio) categories = ['Without ARF', 'With ARF', 'Net Savings'] annual_impact_val = impact_per_incident * monthly_incidents * 12 if 'impact_per_incident' in locals() else 1000000 potential_savings_val = potential_savings if 'potential_savings' in locals() else 820000 enterprise_cost_val = enterprise_cost if 'enterprise_cost' in locals() else 625000 values = [annual_impact_val, annual_impact_val - potential_savings_val, potential_savings_val - enterprise_cost_val] fig = go.Figure(data=[ go.Bar( name='Cost', x=categories, y=values, marker_color=['#ef4444', '#10b981', '#8b5cf6'] ) ]) fig.update_layout( title={ 'text': f"ROI Analysis: {scenario_name}", 'font': dict(size=18, color='#1e293b', family="Arial, sans-serif") }, height=400, plot_bgcolor='white', paper_bgcolor='white', showlegend=False, margin=dict(l=40, r=20, t=60, b=40) ) logger.info(f"โœ… Created ROI plot for {scenario_name}") # Return both the dict and the Plotly figure return roi_result, fig # =========================================== # CREATE DEMO INTERFACE - UPDATED WITH MODERN UI INTEGRATION # =========================================== def create_demo_interface(): """Create demo interface using modular components with boundary awareness and modern UI""" import gradio as gr # Get components components = get_components() # Get feature flags flags = get_feature_flags() # Create interface WITHOUT css parameter (fixes Gradio 6.0 warning) with gr.Blocks( title=f"๐Ÿš€ ARF Investor Demo v3.3.9 - TRUE ARF OSS Integration" ) as demo: # MODERN UI INITIALIZATION if flags.get('modern_ui', True) and MODERN_UI_AVAILABLE: try: # Initialize modern UI with theme support modern_ui_init = gr.HTML(initialize_modern_ui()) logger.info("โœ… Modern UI initialized") except Exception as e: logger.warning(f"โš ๏ธ Modern UI initialization failed: {e}") modern_ui_init = gr.HTML("") else: modern_ui_init = gr.HTML("") # Add dark mode toggle if enabled if flags.get('dark_mode', True): dark_mode_toggle = gr.HTML(create_dark_mode_toggle()) logger.info("โœ… Dark mode toggle added") # Header header_html = components["create_header"]("3.3.9") # Status bar with boundary badges status_html = components["create_status_bar"]() # ============ 5 TABS ============ with gr.Tabs(elem_classes="tab-nav"): # TAB 1: Live Incident Demo - NOW WITH MODERN COMPONENTS with gr.TabItem("๐Ÿ”ฅ Live Incident Demo", id="tab1"): # ===== SURGICAL FIX: SAFE UNPACKING WITH ERROR HANDLING ===== try: logger.info("๐Ÿ”ง Extracting Tab1 components with safe unpacking...") # Get the raw result tuple tab1_result = components["create_tab1_incident_demo"]() # Debug logging for contract verification logger.info(f"๐Ÿ“Š Tab1 result type: {type(tab1_result)}") if hasattr(tab1_result, '__len__'): logger.info(f"๐Ÿ“Š Tab1 result length: {len(tab1_result)}") for i, item in enumerate(tab1_result): item_type = type(item).__name__ if hasattr(item, '__name__') else type(item) logger.debug(f" Index {i}: {item_type}") # MANUAL INDEX-BASED UNPACKING (CONTRACT ENFORCEMENT) # Indices verified against components.py return statement scenario_dropdown = tab1_result[0] historical_panel = tab1_result[1] scenario_card = tab1_result[2] telemetry_viz = tab1_result[3] impact_viz = tab1_result[4] observation_gate_placeholder = tab1_result[5] sequencing_panel = tab1_result[6] workflow_header = tab1_result[7] detection_process = tab1_result[8] recall_process = tab1_result[9] decision_process = tab1_result[10] oss_section = tab1_result[11] enterprise_section = tab1_result[12] oss_btn = tab1_result[13] enterprise_btn = tab1_result[14] approval_toggle = tab1_result[15] mcp_mode = tab1_result[16] timeline_viz = tab1_result[17] detection_time = tab1_result[18] recall_quality = tab1_result[19] # โ† CRITICAL: WAS mttr confidence_score = tab1_result[20] # โ† CRITICAL: WAS auto_heal sequencing_stage = tab1_result[21] # โ† CRITICAL: WAS savings oss_results_display = tab1_result[22] enterprise_results_display = tab1_result[23] approval_display = tab1_result[24] demo_btn = tab1_result[25] # โ† CRITICAL: Index 25 MUST be demo_btn logger.info("โœ… Tab1 components successfully extracted with correct contract") except Exception as e: logger.error(f"โŒ Tab1 component extraction failed: {e}") logger.error("๐Ÿ”„ Creating fallback components to maintain system integrity...") # FALLBACK CREATION (Minimal viable components) import gradio as gr # Create minimal placeholder components scenario_dropdown = gr.Dropdown(choices=["Error Mode"], value="Error Mode") historical_panel = gr.DataFrame(value=[["System in recovery mode"]]) scenario_card = gr.Markdown("### System Initialization Issue") telemetry_viz = gr.Plot() impact_viz = gr.Plot() observation_gate_placeholder = gr.Markdown("**Observation Gate:** System integrity check") sequencing_panel = gr.Markdown("**Sequencing:** Initializing...") workflow_header = gr.Markdown("### Policy Recovery Mode") detection_process = gr.Textbox(value="DETECTION: ERROR") recall_process = gr.Textbox(value="RECALL: ERROR") decision_process = gr.Textbox(value="DECISION: ERROR") oss_section = gr.Markdown("#### OSS: Unavailable") enterprise_section = gr.Markdown("#### Enterprise: Unavailable") oss_btn = gr.Button("Unavailable", variant="secondary") enterprise_btn = gr.Button("Unavailable", variant="secondary") approval_toggle = gr.Checkbox(label="Approval: Error", value=False) mcp_mode = gr.Radio(choices=["Error"], value="Error") timeline_viz = gr.Plot() detection_time = gr.Number(value=0) recall_quality = gr.Number(value=0) # โ† CORRECT VARIABLE NAME confidence_score = gr.Number(value=0) # โ† CORRECT VARIABLE NAME sequencing_stage = gr.Textbox(value="Error") # โ† CORRECT VARIABLE NAME oss_results_display = gr.Markdown("### Results: Unavailable") enterprise_results_display = gr.Markdown("### Results: Unavailable") approval_display = gr.Markdown("**Status:** System recovery in progress") demo_btn = gr.Button("๐Ÿ”„ System Recovery Required", variant="secondary", size="lg") logger.warning("โš ๏ธ Using fallback components - full functionality limited") # ===== END SURGICAL FIX ===== # TAB 2: Business ROI with gr.TabItem("๐Ÿ’ฐ Business Impact & ROI", id="tab2"): (dashboard_output, roi_scenario_dropdown, monthly_slider, team_slider, calculate_btn, roi_output, roi_chart) = components["create_tab2_business_roi"](components["INCIDENT_SCENARIOS"]) # TAB 3: Enterprise Features with gr.TabItem("๐Ÿข Enterprise Features", id="tab3"): (license_display, validate_btn, trial_btn, upgrade_btn, mcp_mode_tab3, mcp_mode_info, features_table, integrations_table) = components["create_tab3_enterprise_features"]() # TAB 4: Audit Trail with gr.TabItem("๐Ÿ“œ Audit Trail & History", id="tab4"): (refresh_btn, clear_btn, export_btn, execution_table, incident_table, export_text) = components["create_tab4_audit_trail"]() # TAB 5: Learning Engine with gr.TabItem("๐Ÿง  Learning Engine", id="tab5"): (learning_graph, graph_type, show_labels, search_query, search_btn, clear_btn_search, search_results, stats_display, patterns_display, performance_display) = components["create_tab5_learning_engine"]() # Footer footer_html = components["create_footer"]() # Add CSS debug panel for testing if flags.get('modern_ui', True): debug_html = gr.HTML(f"""
CSS Debug:
Modern UI: {flags.get('modern_ui', False)}
Dark Mode: {flags.get('dark_mode', False)}
CSS Loaded: โœ…
""") # ============ EVENT HANDLERS ============ # Update scenario display when dropdown changes - NOW INCLUDES PERFORMANCE METRICS scenario_dropdown.change( fn=update_scenario_display_with_metrics, inputs=[scenario_dropdown], outputs=[ scenario_card, telemetry_viz, impact_viz, timeline_viz, detection_time, recall_quality, confidence_score, sequencing_stage ] ) # =========================================== # FIXED: OSS Analysis Button Connection - SIMPLE PASSTHROUGH # =========================================== def run_oss_analysis_real_arf(scenario_name: str) -> tuple: """ Simple passthrough to run_true_arf_analysis() The decorator now handles contract preservation """ logger.info(f"๐Ÿš€ Running TRUE ARF OSS analysis for: {scenario_name}") return run_true_arf_analysis(scenario_name) # Run OSS Analysis - FIXED: Simple passthrough oss_btn.click( fn=run_oss_analysis_real_arf, inputs=[scenario_dropdown], outputs=[ detection_process, recall_process, decision_process, oss_results_display, incident_table ] ) # Execute Enterprise Healing - FIXED: Now returns DataFrame for execution_table enterprise_btn.click( fn=execute_enterprise_healing, inputs=[scenario_dropdown, approval_toggle, mcp_mode], outputs=[approval_display, enterprise_results_display, execution_table] ) # Run Complete Demo with boundary progression @AsyncRunner.async_to_sync async def run_complete_demo_async(scenario_name): """Run a complete demo walkthrough with true ARF and boundary awareness""" # Step 1: Update scenario with metrics update_result = update_scenario_display_with_metrics(scenario_name) # Step 2: Run OSS analysis using TRUE ARF OSS function oss_result = run_oss_analysis_real_arf(scenario_name) # Step 3: Execute Enterprise (simulation) with boundary context await asyncio.sleep(1) scenario = components["INCIDENT_SCENARIOS"].get(scenario_name, {}) impact = scenario.get("business_impact", {}) revenue_loss = impact.get("revenue_loss_per_hour", get_scenario_impact(scenario_name)) savings_amount = int(revenue_loss * 0.85) # Get boundary context boundaries = BoundaryManager.get_system_boundaries() # Get orchestrator for execution simulation orchestrator = components["DemoOrchestrator"]() execution_result = await orchestrator.execute_healing(scenario_name, "autonomous") enterprise_results = { "demo_mode": "Complete Walkthrough", "scenario": scenario_name, "arf_version": "3.3.9", "true_oss_used": True, "enterprise_simulated": True, "boundary_progression": [ f"1. Incident detected - {boundaries['oss']['label']}", f"2. OSS analysis completed - {boundaries['oss']['label']}", f"3. HealingIntent created - {boundaries['oss']['label']}", f"4. Enterprise license validated ({boundaries['enterprise']['label']})", f"5. Autonomous execution simulated ({boundaries['enterprise']['label']}+)", f"6. Outcome recorded in RAG memory" ], "execution_result": execution_result, "outcome": { "recovery_time": "12 minutes", "manual_comparison": "45 minutes", "cost_saved": f"${savings_amount:,}", "users_protected": "45,000", "learning": "Pattern added to RAG memory" }, "architectural_summary": f"This demonstrates the complete ARF v3.3.9 architecture: {boundaries['oss']['label']} for advisory analysis โ†’ {boundaries['enterprise']['label']} for autonomous execution" } # Create demo completion message with enhanced boundary context demo_message = f"""

โœ… Complete Demo: Architecture Validated

ARF v3.3.9 โ€ข OSS advises โ†’ Enterprise executes

BOUNDARY VALIDATED
{boundaries['oss']['label']}
โ€ข Anomaly detected in 45s
โ€ข 3 similar incidents recalled
โ€ข 94% confidence healing plan
โ€ข Apache 2.0 license validated
{boundaries['enterprise']['label']}
โ€ข Autonomous execution simulated
โ€ข Rollback guarantee: 100%
โ€ข 12min vs 45min recovery
โ€ข ${savings_amount:,} saved
๐Ÿ—๏ธ Architecture Flow
OSS Advisory
Apache 2.0
advises
Enterprise
Commercial
Time Saved
73%
Cost Saved
${savings_amount:,}
ROI Multiplier
5.2ร—
โœ…
Architecture Successfully Validated
Clear separation maintained: OSS for advisory intelligence, Enterprise for autonomous execution
Ready for production? Install ARF Enterprise โ†’
""" # Update the enterprise_results_display to include demo completion info enterprise_results["demo_completion_message"] = demo_message # Get updated DataFrames (FIXED: Returns DataFrames) incident_df = get_audit_manager().get_incident_dataframe() execution_df = get_audit_manager().get_execution_dataframe() # Combine all results return ( *update_result, # 8 outputs: scenario_card, telemetry_viz, impact_viz, timeline_viz, detection_time, recall_quality, confidence_score, sequencing_stage *oss_result[:3], # 3 outputs: detection_process, recall_process, decision_process oss_result[3], # 1 output: oss_results_display enterprise_results, # 1 output: enterprise_results_display demo_message, # 1 output: approval_display incident_df, # 1 output: incident_table (DataFrame) execution_df # 1 output: execution_table (DataFrame) ) # FIXED: demo_btn.click with correct output count demo_btn.click( fn=run_complete_demo_async, inputs=[scenario_dropdown], outputs=[ scenario_card, telemetry_viz, impact_viz, timeline_viz, detection_time, recall_quality, confidence_score, sequencing_stage, # 8 detection_process, recall_process, decision_process, # 3 oss_results_display, # 1 enterprise_results_display, # 1 approval_display, # 1 incident_table, # 1 execution_table # 1 ] ) # ROI Calculation calculate_btn.click( fn=calculate_roi, inputs=[roi_scenario_dropdown, monthly_slider, team_slider], outputs=[roi_output, roi_chart] ) # Update ROI scenario - FIXED: Use the EnhancedROICalculator roi_scenario_dropdown.change( fn=lambda x: get_components()["EnhancedROICalculator"].calculate_comprehensive_roi(scenario_name=x), inputs=[roi_scenario_dropdown], outputs=[roi_output] ) # Update ROI chart monthly_slider.change( fn=lambda x, y: calculate_roi(roi_scenario_dropdown.value, x, y)[1], inputs=[monthly_slider, team_slider], outputs=[roi_chart] ) team_slider.change( fn=lambda x, y: calculate_roi(roi_scenario_dropdown.value, x, y)[1], inputs=[monthly_slider, team_slider], outputs=[roi_chart] ) # Audit Trail Functions - FIXED: Returns DataFrames def refresh_audit_trail(): """Refresh audit trail tables - FIXED to return DataFrames""" return ( get_audit_manager().get_execution_dataframe(), # DataFrame get_audit_manager().get_incident_dataframe() # DataFrame ) def clear_audit_trail(): """Clear audit trail - FIXED to return empty DataFrames""" get_audit_manager().clear() # Return empty DataFrames with correct columns exec_df = pd.DataFrame(columns=["Execution ID", "Scenario", "Status", "Mode", "Start Time"]) incident_df = pd.DataFrame(columns=["Scenario", "Status", "Boundary", "Time"]) return exec_df, incident_df def export_audit_trail(): """Export audit trail as JSON""" audit_data = { "executions": get_audit_manager().executions, "incidents": get_audit_manager().incidents, "boundary_crossings": get_audit_manager().boundary_crossings, "export_time": datetime.datetime.now().isoformat(), "arf_version": "3.3.9", "architecture": "OSS advises โ†’ Enterprise executes" } return json.dumps(audit_data, indent=2) refresh_btn.click( fn=refresh_audit_trail, inputs=[], outputs=[execution_table, incident_table] ) clear_btn.click( fn=clear_audit_trail, inputs=[], outputs=[execution_table, incident_table] ) export_btn.click( fn=export_audit_trail, inputs=[], outputs=[export_text] ) # Enterprise Features def validate_license(): """Validate enterprise license with boundary context""" boundaries = BoundaryManager.get_system_boundaries() if boundaries["enterprise"]["available"]: return { "status": "โœ… Valid License", "license_type": "Enterprise", "version": boundaries["enterprise"]["version"], "expires": "2025-12-31", "capabilities": boundaries["enterprise"]["capabilities"], "boundary_context": f"Real {boundaries['enterprise']['label']} detected" } else: return { "status": "โš ๏ธ Demo Mode", "license_type": "Simulated", "version": boundaries["enterprise"]["version"], "expires": "Demo only", "capabilities": boundaries["enterprise"]["capabilities"], "boundary_context": f"Simulating {boundaries['enterprise']['label']} - requires license", "contact": "sales@arf.dev" } validate_btn.click( fn=validate_license, inputs=[], outputs=[license_display] ) # Load default scenario - UPDATE outputs without realism_panel demo.load( fn=lambda: update_scenario_display_with_metrics(settings.default_scenario), inputs=[], outputs=[ scenario_card, telemetry_viz, impact_viz, timeline_viz, detection_time, recall_quality, confidence_score, sequencing_stage ] ) # Load ROI data demo.load( fn=lambda: calculate_roi(settings.default_scenario, 15, 5), inputs=[], outputs=[roi_output, roi_chart] ) logger.info("โœ… Demo interface created successfully with modern UI integration") return demo # =========================================== # DARK MODE TOGGLE FUNCTION # =========================================== def create_dark_mode_toggle(): """Create a dark mode toggle button with JavaScript""" return f"""
๐ŸŒ™
""" # =========================================== # MAIN EXECUTION - CRITICAL: THIS LAUNCHES THE APP - UPDATED FOR SPACES # =========================================== def main(): """Main entry point that actually launches the Gradio app""" try: logger.info("๐Ÿš€ ARF Ultimate Investor Demo v3.3.9 - ENTERPRISE EDITION") logger.info("=" * 60) logger.info("Enhanced with clear OSS vs Enterprise boundaries") logger.info("DOCTRINAL COMPLIANCE: Historical Evidence, Observation Gate, Sequencing") logger.info("PHASE 2: Dynamic Performance Metrics by Scenario") logger.info(f"Modern UI: {'Enabled' if get_feature_flags().get('modern_ui', True) else 'Disabled'}") logger.info(f"True ARF OSS v3.3.9 integration with simulated Enterprise execution") logger.info("=" * 60) # Create the demo interface demo = create_demo_interface() print("\n" + "="*60) print("๐Ÿš€ ARF Ultimate Investor Demo v3.3.9 - ENTERPRISE EDITION") print("๐Ÿ“Š Architecture: OSS advises โ†’ Enterprise executes") print("๐ŸŽญ DOCTRINAL: Historical Evidence + Observation Gate + Sequencing") print("๐ŸŽจ MODERN UI: Design system with responsive components") print("="*60 + "\n") # ============ HUGGING FACE SPACES SPECIFIC ============ # Spaces handles ports differently - use their system import os # Get port from environment (Spaces sets this) port = int(os.getenv("GRADIO_SERVER_PORT", "7860")) server_name = os.getenv("GRADIO_SERVER_NAME", "0.0.0.0") # Get CSS css_styles = load_css_files() logger.info(f"๐Ÿš€ Launching on {server_name}:{port}") print(f"๐ŸŒ Starting on http://{server_name}:{port}") # SIMPLE LAUNCH FOR SPACES COMPATIBILITY demo.launch( css=css_styles, # CSS moved to launch() - fixes Gradio 6.0 warning server_name=server_name, server_port=port, share=False, debug=False, show_error=True, quiet=True # Reduce log noise ) except KeyboardInterrupt: logger.info("๐Ÿ‘‹ Demo stopped by user") except Exception as e: logger.error(f"โŒ Fatal error: {e}", exc_info=True) print(f"\nโŒ ERROR: {e}") print("Please check the logs for more details.") sys.exit(1) # =========================================== # HUGGING FACE SPACES COMPATIBILITY - UPDATED # =========================================== # This is the entry point that Hugging Face Spaces will use if __name__ == "__main__": # For Hugging Face Spaces, we need to ensure the app stays alive import os # ============ CRITICAL FIXES FOR HUGGING FACE SPACES ============ # 1. Set environment variables for Hugging Face Spaces compatibility os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" os.environ["GRADIO_SERVER_PORT"] = "7860" # Spaces will override this if needed os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" os.environ["GRADIO_HOT_RELOAD"] = "False" # Disable hot reload in Spaces os.environ["GRADIO_QUEUE_ENABLED"] = "True" # Enable queue for stability # 2. Additional fixes for uvicorn warnings os.environ["UVICORN_LOG_LEVEL"] = "warning" # Reduce uvicorn log noise os.environ["UVICORN_ACCESS_LOG"] = "False" # Disable access logs print("\n" + "="*60) print("๐Ÿš€ ARF Demo Starting on Hugging Face Spaces") print(f"๐Ÿ“ Working directory: {os.getcwd()}") print(f"๐Ÿ“Š Python version: {sys.version}") print("="*60 + "\n") # 3. Detect if we're running in Hugging Face Spaces is_huggingface_space = "SPACE_ID" in os.environ or "HF_SPACE" in os.environ if is_huggingface_space: print("โœ… Hugging Face Spaces environment detected") print("๐Ÿค– Using Spaces-optimized configuration") # 4. Check for required files with better error handling required_files = ["styles/modern.css", "styles/responsive.css", "ui/components.py"] missing_files = [] for file in required_files: if not os.path.exists(file): missing_files.append(file) print(f"โš ๏ธ Warning: {file} not found") if missing_files: print(f"โš ๏ธ Missing {len(missing_files)} required files") print("โš ๏ธ Some features may not work correctly") # Create minimal fallback CSS files if missing for css_file in ["styles/modern.css", "styles/responsive.css"]: if css_file in missing_files: try: os.makedirs(os.path.dirname(css_file), exist_ok=True) with open(css_file, "w") as f: if "modern.css" in css_file: f.write("/* Modern CSS Fallback */\n:root { --color-primary: #3b82f6; }\n") else: f.write("/* Responsive CSS Fallback */\n@media (max-width: 768px) { .container { padding: 1rem; } }\n") print(f"โœ… Created fallback {css_file}") except Exception as e: print(f"โš ๏ธ Could not create {css_file}: {e}") # 5. Import gradio early to prevent threading issues try: import gradio as gr logger.info(f"โœ… Gradio {gr.__version__} loaded successfully") except Exception as e: logger.error(f"โŒ Failed to load gradio: {e}") print("โŒ CRITICAL: Gradio failed to load") raise # 6. Start the main application with better error handling try: main() except Exception as e: logger.error(f"โŒ Main application crashed: {e}", exc_info=True) print(f"\nโŒ FATAL ERROR: {e}") print("๐Ÿ’ก Troubleshooting tips:") print("1. Check all required files exist") print("2. Verify Python package versions") print("3. Check Hugging Face Spaces logs for details") # Try a minimal fallback launch if main() fails try: print("\n๐Ÿ”„ Attempting minimal fallback launch...") import gradio as gr def fallback_app(): with gr.Blocks(title="ARF Fallback") as demo: gr.Markdown("# ๐Ÿšจ ARF System Recovery") gr.Markdown("The main application failed, but the system is still running.") gr.Markdown("**Error:** " + str(e)) return demo demo = fallback_app() demo.launch( server_name="0.0.0.0", server_port=7860, quiet=True, show_error=False ) except Exception as fallback_error: print(f"โŒ Fallback also failed: {fallback_error}") sys.exit(1)