""" Hugging Face Spaces Demo for Agentic Reliability Framework (ARF) Version: 3.3.9 OSS vs Enterprise Proper import structure based on actual repository """ import gradio as gr import pandas as pd import numpy as np import plotly.graph_objects as go import plotly.express as px from datetime import datetime, timedelta import json import time import uuid from typing import Dict, List, Any, Optional, Tuple import hashlib import random import sys import traceback print("=" * 60) print("🤖 ARF 3.3.9 Hugging Face Spaces Demo - Starting") print("=" * 60) # Try to import ARF with multiple possible import paths ARF_AVAILABLE = False ARF_MODULES = {} try: # Try direct import from agentic_reliability_framework.models import ( ReliabilityEvent, HealingPolicy ) from agentic_reliability_framework.healing_policies import PolicyEngine from agentic_reliability_framework.engine.reliability import ReliabilityEngine from agentic_reliability_framework import __version__ ARF_AVAILABLE = True ARF_VERSION = __version__.__version__ print(f"✅ ARF {ARF_VERSION} successfully imported") # Store imported modules ARF_MODULES = { 'ReliabilityEvent': ReliabilityEvent, 'HealingPolicy': HealingPolicy, 'PolicyEngine': PolicyEngine, 'ReliabilityEngine': ReliabilityEngine, 'version': ARF_VERSION } except ImportError as e: print(f"âš ī¸ Import error: {e}") print("Creating mock implementations for demo...") # Create comprehensive mock implementations class MockReliabilityEvent: def __init__(self, event_id: str, agent_id: str, action: str, timestamp: datetime, context: Dict = None): self.event_id = event_id self.agent_id = agent_id self.action = action self.timestamp = timestamp self.context = context or {} self.metadata = {"source": "demo", "environment": "testing"} def to_dict(self) -> Dict: return { 'event_id': self.event_id, 'agent_id': self.agent_id, 'action': self.action, 'timestamp': self.timestamp.isoformat(), 'context': self.context, 'metadata': self.metadata } class MockHealingPolicy: class PolicyAction: ALLOW = "allow" HEAL = "heal" BLOCK = "block" WARN = "warn" def __init__(self, id: str, name: str, description: str, conditions: Dict, action: str, priority: int = 1): self.id = id self.name = name self.description = description self.conditions = conditions self.action = action self.priority = priority self.created_at = datetime.now() self.updated_at = datetime.now() def evaluate(self, event: 'MockReliabilityEvent', score: float) -> Tuple[bool, str]: """Evaluate if policy conditions are met""" action_lower = event.action.lower() context = event.context # Check for high-risk patterns high_risk_keywords = ['drop database', 'delete from', 'truncate', 'rm -rf', 'format'] if any(keyword in action_lower for keyword in high_risk_keywords): return True, f"High-risk action detected: {event.action}" # Check for production environment if context.get('environment') == 'production' and 'deploy' in action_lower: if score < 0.8: return True, f"Low confidence ({score:.2f}) for production deployment" return False, "" class MockPolicyEngine: def __init__(self): self.policies = [] self.load_default_policies() def load_default_policies(self): """Load default demo policies""" self.policies = [ MockHealingPolicy( id="policy-001", name="High-Risk Database Prevention", description="Prevents irreversible database operations", conditions={"risk_threshold": 0.7}, action=MockHealingPolicy.PolicyAction.BLOCK, priority=1 ), MockHealingPolicy( id="policy-002", name="Production Deployment Safety", description="Ensures safe deployments to production", conditions={"confidence_threshold": 0.8, "environment": "production"}, action=MockHealingPolicy.PolicyAction.HEAL, priority=2 ), MockHealingPolicy( id="policy-003", name="Sensitive Data Access", description="Controls access to sensitive data", conditions={"data_classification": ["pci", "pii", "phi"]}, action=MockHealingPolicy.PolicyAction.WARN, priority=3 ) ] def evaluate(self, event: MockReliabilityEvent, reliability_score: float) -> List[Dict]: """Evaluate all policies against event""" violations = [] for policy in self.policies: triggered, reason = policy.evaluate(event, reliability_score) if triggered: violations.append({ 'policy_id': policy.id, 'policy_name': policy.name, 'action': policy.action, 'reason': reason, 'priority': policy.priority }) return violations class MockReliabilityEngine: def __init__(self): self.scoring_factors = { 'syntax_complexity': 0.2, 'risk_keywords': 0.3, 'environment_risk': 0.2, 'historical_safety': 0.2, 'time_of_day': 0.1 } def calculate_score(self, event: MockReliabilityEvent) -> Dict: """Calculate reliability score for an event""" action = event.action.lower() context = event.context # Calculate individual factors syntax_score = self._calculate_syntax_complexity(action) risk_score = self._calculate_risk_keywords(action) env_score = self._calculate_environment_risk(context) historical_score = 0.8 # Default historical safety time_score = self._calculate_time_of_day() # Weighted sum overall = ( syntax_score * self.scoring_factors['syntax_complexity'] + risk_score * self.scoring_factors['risk_keywords'] + env_score * self.scoring_factors['environment_risk'] + historical_score * self.scoring_factors['historical_safety'] + time_score * self.scoring_factors['time_of_day'] ) # Confidence based on factors considered confidence = min(0.95, overall * 1.1) # Slightly optimistic return { 'overall': round(overall, 3), 'confidence': round(confidence, 3), 'risk_score': round(1 - overall, 3), 'components': { 'syntax_complexity': syntax_score, 'risk_keywords': risk_score, 'environment_risk': env_score, 'historical_safety': historical_score, 'time_of_day': time_score } } def _calculate_syntax_complexity(self, action: str) -> float: """Calculate complexity score based on action syntax""" words = len(action.split()) if words <= 3: return 0.9 # Simple commands are safer elif words <= 6: return 0.7 else: return 0.5 # Complex commands are riskier def _calculate_risk_keywords(self, action: str) -> float: """Calculate risk based on keywords""" high_risk = ['drop', 'delete', 'truncate', 'remove', 'format', 'rm'] medium_risk = ['update', 'alter', 'modify', 'change', 'grant', 'revoke'] safe = ['select', 'read', 'get', 'fetch', 'list', 'show'] action_lower = action.lower() for word in high_risk: if word in action_lower: return 0.3 # High risk for word in medium_risk: if word in action_lower: return 0.6 # Medium risk for word in safe: if word in action_lower: return 0.9 # Safe return 0.7 # Default def _calculate_environment_risk(self, context: Dict) -> float: """Calculate risk based on environment""" env = context.get('environment', 'development').lower() env_scores = { 'production': 0.4, 'staging': 0.7, 'testing': 0.8, 'development': 0.9, 'sandbox': 0.95 } return env_scores.get(env, 0.7) def _calculate_time_of_day(self) -> float: """Calculate risk based on time of day""" hour = datetime.now().hour # Business hours are safer (9 AM - 6 PM) if 9 <= hour < 18: return 0.9 elif 18 <= hour < 22: return 0.7 # Evening else: return 0.5 # Night/early morning # Store mock modules ARF_MODULES = { 'ReliabilityEvent': MockReliabilityEvent, 'HealingPolicy': MockHealingPolicy, 'PolicyEngine': MockPolicyEngine, 'ReliabilityEngine': MockReliabilityEngine, 'version': '3.3.9 (Mock)' } print("✅ Mock ARF implementations created successfully") print("=" * 60) # Now use ARF_MODULES for all operations ReliabilityEvent = ARF_MODULES['ReliabilityEvent'] HealingPolicy = ARF_MODULES['HealingPolicy'] PolicyEngine = ARF_MODULES['PolicyEngine'] ReliabilityEngine = ARF_MODULES['ReliabilityEngine'] ARF_VERSION = ARF_MODULES['version'] # Mock Enterprise Components class MockEnterpriseLicenseManager: """Simulates enterprise license validation with mechanical gates""" def __init__(self): self.license_tiers = { "trial": { "name": "Trial", "price": 0, "enforcement": "advisory", "max_agents": 3, "features": ["Basic risk assessment", "7-day trial"], "gates_enabled": ["confidence_threshold", "risk_assessment"] }, "starter": { "name": "Starter", "price": 2000, "enforcement": "human_approval", "max_agents": 10, "features": ["Human-in-loop gates", "Basic audit trail", "Email support"], "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", "admin_approval"] }, "professional": { "name": "Professional", "price": 5000, "enforcement": "autonomous", "max_agents": 50, "features": ["Autonomous execution", "Advanced audit", "Priority support", "SLA 99.5%"], "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", "rollback_feasibility", "budget_check", "auto_scale"] }, "enterprise": { "name": "Enterprise", "price": 15000, "enforcement": "full_mechanical", "max_agents": 1000, "features": ["Full mechanical enforcement", "Compliance automation", "Custom gates", "24/7 support", "SLA 99.9%", "Differential privacy"], "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", "rollback_feasibility", "compliance_check", "budget_check", "custom_gates", "emergency_override", "audit_logging"] } } self.active_licenses = {} self._create_demo_licenses() def _create_demo_licenses(self): """Create demo licenses for testing""" self.active_licenses = { "ARF-TRIAL-DEMO123": { "tier": "trial", "email": "demo@arf.dev", "expires": datetime.now() + timedelta(days=14), "created": datetime.now(), "features": self.license_tiers["trial"]["features"] }, "ARF-STARTER-456XYZ": { "tier": "starter", "email": "customer@example.com", "expires": datetime.now() + timedelta(days=365), "created": datetime.now() - timedelta(days=30), "features": self.license_tiers["starter"]["features"] }, "ARF-PRO-789ABC": { "tier": "professional", "email": "pro@company.com", "expires": datetime.now() + timedelta(days=365), "created": datetime.now() - timedelta(days=60), "features": self.license_tiers["professional"]["features"] } } def validate_license(self, license_key: str) -> Dict: """Validate enterprise license""" if not license_key: return { "valid": False, "tier": "none", "enforcement": "none", "message": "No license provided - using OSS mode", "gates_available": [] } license_data = self.active_licenses.get(license_key) if license_data: tier = license_data["tier"] tier_info = self.license_tiers[tier] return { "valid": True, "tier": tier, "name": tier_info["name"], "enforcement": tier_info["enforcement"], "gates_available": tier_info["gates_enabled"], "features": tier_info["features"], "expires": license_data["expires"].strftime("%Y-%m-%d"), "message": f"{tier_info['name']} license active" } # Check if it's a trial license pattern if license_key.startswith("ARF-TRIAL-"): return { "valid": True, "tier": "trial", "name": "Trial", "enforcement": "advisory", "gates_available": self.license_tiers["trial"]["gates_enabled"], "features": self.license_tiers["trial"]["features"], "expires": (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d"), "message": "Trial license activated (14 days)" } return { "valid": False, "tier": "invalid", "enforcement": "none", "message": "Invalid license key", "gates_available": [] } def generate_trial_license(self, email: str) -> Dict: """Generate a new trial license""" license_hash = hashlib.sha256(f"{email}{datetime.now().isoformat()}".encode()).hexdigest() license_key = f"ARF-TRIAL-{license_hash[:8].upper()}" self.active_licenses[license_key] = { "tier": "trial", "email": email, "expires": datetime.now() + timedelta(days=14), "created": datetime.now(), "features": self.license_tiers["trial"]["features"] } return { "success": True, "license_key": license_key, "tier": "trial", "expires": (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d"), "features": self.license_tiers["trial"]["features"], "message": "14-day trial license generated successfully" } def get_tier_comparison(self) -> List[Dict]: """Get comparison of all license tiers""" comparison = [] for tier_key, tier_info in self.license_tiers.items(): comparison.append({ "tier": tier_key, "name": tier_info["name"], "price": f"${tier_info['price']:,}/mo", "enforcement": tier_info["enforcement"].replace("_", " ").title(), "max_agents": tier_info["max_agents"], "gates": len(tier_info["gates_enabled"]), "features": tier_info["features"][:3] # First 3 features }) return comparison class MockExecutionAuthorityService: """Simulates mechanical gate enforcement for Enterprise""" def __init__(self): self.gate_definitions = { "license_validation": { "description": "Validate enterprise license is active", "weight": 0.2, "required": True, "enterprise_only": True }, "confidence_threshold": { "description": "Confidence score â‰Ĩ 0.70", "weight": 0.25, "required": True, "threshold": 0.7 }, "risk_assessment": { "description": "Risk score ≤ 0.80", "weight": 0.25, "required": True, "threshold": 0.8 }, "rollback_feasibility": { "description": "Rollback plan exists and is feasible", "weight": 0.1, "required": False, "enterprise_only": False }, "admin_approval": { "description": "Human approval for sensitive actions", "weight": 0.1, "required": False, "enterprise_only": True }, "compliance_check": { "description": "Compliance with regulations (GDPR, PCI, etc.)", "weight": 0.05, "required": False, "enterprise_only": True, "tiers": ["enterprise"] }, "budget_check": { "description": "Within budget limits and forecasts", "weight": 0.05, "required": False, "enterprise_only": True } } self.audit_log = [] def evaluate_gates(self, action_data: Dict, license_info: Dict) -> Dict: """Evaluate all mechanical gates for an action""" gate_results = {} required_gates = [] optional_gates = [] # Determine which gates to evaluate based on license license_tier = license_info.get("tier", "none") gates_available = license_info.get("gates_available", []) for gate_name, gate_def in self.gate_definitions.items(): # Check if gate should be evaluated should_evaluate = ( gate_name in gates_available or not gate_def.get("enterprise_only", False) ) if should_evaluate: if gate_def["required"]: required_gates.append(gate_name) else: optional_gates.append(gate_name) # Evaluate the gate gate_results[gate_name] = self._evaluate_single_gate( gate_name, gate_def, action_data, license_tier ) # Calculate overall status passed_gates = sum(1 for r in gate_results.values() if r["passed"]) total_gates = len(gate_results) # Check if all required gates passed all_required_passed = all( gate_results[gate]["passed"] for gate in required_gates if gate in gate_results ) # Determine execution authority if not license_info.get("valid", False): execution_authority = "OSS_ONLY" elif license_tier == "trial": execution_authority = "ADVISORY_ONLY" elif all_required_passed: if license_tier in ["professional", "enterprise"]: execution_authority = "AUTONOMOUS_EXECUTION" else: execution_authority = "HUMAN_APPROVAL_REQUIRED" else: execution_authority = "BLOCKED" # Log to audit trail audit_entry = self._log_to_audit(action_data, gate_results, execution_authority, license_tier) return { "gate_results": gate_results, "required_gates": required_gates, "optional_gates": optional_gates, "passed_gates": passed_gates, "total_gates": total_gates, "all_required_passed": all_required_passed, "execution_authority": execution_authority, "audit_id": audit_entry["audit_id"], "recommendation": self._generate_recommendation(execution_authority, passed_gates, total_gates) } def _evaluate_single_gate(self, gate_name: str, gate_def: Dict, action_data: Dict, license_tier: str) -> Dict: """Evaluate a single gate""" confidence = action_data.get("confidence", 0.5) risk_score = action_data.get("risk_score", 0.5) action = action_data.get("action", "") gate_result = { "gate_name": gate_name, "description": gate_def["description"], "weight": gate_def["weight"], "required": gate_def["required"] } # Gate-specific evaluation logic if gate_name == "license_validation": passed = license_tier != "none" message = f"License valid: {license_tier}" if passed else "No valid license" elif gate_name == "confidence_threshold": threshold = gate_def.get("threshold", 0.7) passed = confidence >= threshold message = f"Confidence {confidence:.2f} â‰Ĩ {threshold}" if passed else f"Confidence {confidence:.2f} < {threshold}" elif gate_name == "risk_assessment": threshold = gate_def.get("threshold", 0.8) passed = risk_score <= threshold message = f"Risk {risk_score:.2f} ≤ {threshold}" if passed else f"Risk {risk_score:.2f} > {threshold}" elif gate_name == "rollback_feasibility": # Check if action has rollback has_rollback = not any(word in action.lower() for word in ["drop", "delete", "truncate", "rm -rf"]) passed = has_rollback message = "Rollback feasible" if has_rollback else "No rollback possible" elif gate_name == "admin_approval": # For demo, always pass if not high risk passed = risk_score < 0.7 message = "Admin approval not required" if passed else "Admin approval needed" elif gate_name == "compliance_check": # Check for sensitive data access sensitive_keywords = ["pci", "credit card", "ssn", "password", "token"] has_sensitive = any(keyword in action.lower() for keyword in sensitive_keywords) passed = not has_sensitive or license_tier == "enterprise" message = "Compliant" if passed else "Requires Enterprise for sensitive data" elif gate_name == "budget_check": # Mock budget check passed = True message = "Within budget limits" else: passed = True message = "Gate evaluation passed" gate_result.update({ "passed": passed, "message": message, "details": { "confidence": confidence, "risk_score": risk_score, "action_type": self._categorize_action(action) } }) return gate_result def _categorize_action(self, action: str) -> str: """Categorize the action type""" action_lower = action.lower() if any(word in action_lower for word in ["drop", "delete", "truncate"]): return "destructive" elif any(word in action_lower for word in ["deploy", "release", "update"]): return "deployment" elif any(word in action_lower for word in ["select", "read", "get", "fetch"]): return "read" elif any(word in action_lower for word in ["insert", "update", "modify"]): return "write" elif any(word in action_lower for word in ["grant", "revoke", "permission"]): return "access_control" else: return "other" def _log_to_audit(self, action_data: Dict, gate_results: Dict, authority: str, license_tier: str) -> Dict: """Log to audit trail""" audit_id = str(uuid.uuid4())[:8] entry = { "audit_id": audit_id, "timestamp": datetime.now().isoformat(), "action": action_data.get("action", ""), "license_tier": license_tier, "execution_authority": authority, "gate_summary": { "passed": sum(1 for r in gate_results.values() if r["passed"]), "total": len(gate_results) }, "risk_score": action_data.get("risk_score", 0), "confidence": action_data.get("confidence", 0) } self.audit_log.append(entry) # Keep only last 1000 entries if len(self.audit_log) > 1000: self.audit_log = self.audit_log[-1000:] return entry def _generate_recommendation(self, authority: str, passed: int, total: int) -> str: """Generate recommendation based on gate results""" if authority == "AUTONOMOUS_EXECUTION": return f"✅ Safe for autonomous execution ({passed}/{total} gates passed)" elif authority == "HUMAN_APPROVAL_REQUIRED": return f"👤 Requires human approval ({passed}/{total} gates passed)" elif authority == "BLOCKED": return f"đŸšĢ Blocked by mechanical gates ({passed}/{total} gates passed)" elif authority == "ADVISORY_ONLY": return f"â„šī¸ Trial license - advisory only ({passed}/{total} gates would pass)" else: return f"đŸ”ĩ OSS mode - advisory only" # Demo Configuration class DemoConfig: """Configuration for the demo""" OSS_THEME = { "primary": "#1E88E5", # Blue "secondary": "#64B5F6", "background": "#E3F2FD", "text": "#1565C0", "border": "#90CAF9" } ENTERPRISE_THEME = { "primary": "#FFB300", # Gold "secondary": "#FFD54F", "background": "#FFF8E1", "text": "#FF8F00", "border": "#FFE082" } RISK_COLORS = { "low": "#4CAF50", # Green "medium": "#FF9800", # Orange "high": "#F44336", # Red "critical": "#D32F2F" # Dark red } @staticmethod def get_risk_level(score: float) -> Tuple[str, str]: """Get risk level and color from score""" if score < 0.3: return "low", DemoConfig.RISK_COLORS["low"] elif score < 0.6: return "medium", DemoConfig.RISK_COLORS["medium"] elif score < 0.8: return "high", DemoConfig.RISK_COLORS["high"] else: return "critical", DemoConfig.RISK_COLORS["critical"] # Load demo scenarios try: from demo_scenarios import DEMO_SCENARIOS, LICENSE_TIERS, VALUE_PROPOSITIONS print("✅ Demo scenarios loaded successfully") except ImportError: print("âš ī¸ Demo scenarios not found, using built-in scenarios") DEMO_SCENARIOS = { "database_drop": { "name": "High-Risk Database Operation", "action": "DROP DATABASE production CASCADE", "description": "Irreversible deletion of production database", "context": {"environment": "production", "criticality": "critical"} }, "service_deployment": { "name": "Safe Service Deployment", "action": "deploy_service v1.2.3 to staging with 25% canary", "description": "Standard deployment with canary testing", "context": {"environment": "staging", "canary": 25} }, "config_change": { "name": "Configuration Change", "action": "UPDATE config SET timeout=30 WHERE service='payment'", "description": "Update payment service timeout configuration", "context": {"environment": "production", "service": "payment"} } } # Main ARF Processor class ARFProcessor: """Main processor for ARF operations""" def __init__(self): self.policy_engine = PolicyEngine() self.reliability_engine = ReliabilityEngine() self.license_manager = MockEnterpriseLicenseManager() self.execution_service = MockExecutionAuthorityService() self.processed_actions = [] print(f"✅ ARF Processor initialized with {ARF_VERSION}") def process_oss(self, action: str, context: Dict = None) -> Dict: """Process action through ARF OSS""" start_time = time.time() # Create reliability event event = ReliabilityEvent( event_id=str(uuid.uuid4())[:8], agent_id="demo_agent", action=action, timestamp=datetime.now(), context=context or {} ) # Calculate reliability score reliability_result = self.reliability_engine.calculate_score(event) # Evaluate policies if hasattr(self.policy_engine, 'evaluate'): # Mock policy engine violations = self.policy_engine.evaluate(event, reliability_result['overall']) else: # Real ARF policy engine violations = [] # Note: Real implementation would call actual policy evaluation # Determine recommendation risk_level, risk_color = DemoConfig.get_risk_level(reliability_result['risk_score']) if violations: can_execute = False recommendation = f"❌ Blocked by {len(violations)} policy violation(s)" else: if reliability_result['overall'] >= 0.8: can_execute = True recommendation = "✅ Safe to execute" elif reliability_result['overall'] >= 0.6: can_execute = False recommendation = "âš ī¸ Review recommended" else: can_execute = False recommendation = "đŸšĢ High risk - do not execute" processing_time = time.time() - start_time result = { "action": action, "reliability_score": reliability_result['overall'], "confidence": reliability_result['confidence'], "risk_score": reliability_result['risk_score'], "risk_level": risk_level, "risk_color": risk_color, "policy_violations": len(violations), "violation_details": violations[:3] if violations else [], "recommendation": recommendation, "can_execute": can_execute, "processing_time": round(processing_time, 3), "engine_version": ARF_VERSION, "mode": "OSS", "limitation": "Advisory only - human must make final decision" } # Store for history self.processed_actions.append({ "timestamp": datetime.now(), "result": result, "mode": "OSS" }) return result def process_enterprise(self, action: str, license_key: str, context: Dict = None) -> Dict: """Process action through ARF Enterprise""" # First get OSS result oss_result = self.process_oss(action, context) # Validate license license_info = self.license_manager.validate_license(license_key) # Prepare action data for gate evaluation action_data = { "action": action, "confidence": oss_result["confidence"], "risk_score": oss_result["risk_score"], "reliability": oss_result["reliability_score"], "context": context or {} } # Evaluate mechanical gates gate_evaluation = self.execution_service.evaluate_gates(action_data, license_info) # Combine results enterprise_result = { **oss_result, "license_info": license_info, "gate_evaluation": gate_evaluation, "mode": "Enterprise", "engine_version": f"{ARF_VERSION} Enterprise", "benefit": "Mechanical enforcement with automated decision making" } # Store for history self.processed_actions.append({ "timestamp": datetime.now(), "result": enterprise_result, "mode": "Enterprise", "license_tier": license_info.get("tier", "none") }) return enterprise_result def get_stats(self) -> Dict: """Get processing statistics""" total = len(self.processed_actions) oss_count = sum(1 for a in self.processed_actions if a["mode"] == "OSS") enterprise_count = total - oss_count if total == 0: return {"total": 0} # Calculate average scores avg_reliability = np.mean([a["result"]["reliability_score"] for a in self.processed_actions]) avg_risk = np.mean([a["result"]["risk_score"] for a in self.processed_actions]) # Count executions executed = sum(1 for a in self.processed_actions if a["result"].get("can_execute", False)) return { "total_processed": total, "oss_actions": oss_count, "enterprise_actions": enterprise_count, "avg_reliability": round(avg_reliability, 3), "avg_risk": round(avg_risk, 3), "execution_rate": round(executed / total * 100, 1) if total > 0 else 0 } # Create global processor instance arf_processor = ARFProcessor() # Gradio Interface Components def create_oss_panel_html(result: Dict) -> str: """Generate HTML for OSS results panel""" risk_color = result.get("risk_color", "#666666") # Policy violations display violations_html = "" if result.get("policy_violations", 0) > 0: violations = result.get("violation_details", []) violations_html = """
âš ī¸ Policy Violations Detected
""" for i, violation in enumerate(violations[:3]): if isinstance(violation, dict): reason = violation.get('reason', 'Policy violation') else: reason = str(violation) violations_html += f"""
â€ĸ {reason[:80]}{'...' if len(reason) > 80 else ''}
""" violations_html += "
" return f"""
đŸ”ĩ

ARF {result.get('engine_version', '3.3.9')} OSS

Open Source Edition
Reliability Score
{result['reliability_score']:.1%}
Risk Level
{result['risk_level'].upper()}
Confidence
{result['confidence']:.1%}
💡

Recommendation

{result['recommendation']}

Processing time: {result['processing_time']}s

{violations_html}
âš ī¸

OSS Limitations

Advisory Only: {result['limitation']}

Execution Authority: {'✅ Can execute (advisory)' if result['can_execute'] else '❌ Cannot execute - human decision required'}

""" def create_enterprise_panel_html(result: Dict) -> str: """Generate HTML for Enterprise results panel""" license_info = result.get("license_info", {}) gate_eval = result.get("gate_evaluation", {}) gate_results = gate_eval.get("gate_results", {}) license_tier = license_info.get("tier", "none") tier_name = license_info.get("name", "No License").title() # Determine panel color based on tier if license_tier == "enterprise": tier_color = "#FF6F00" # Darker orange elif license_tier == "professional": tier_color = "#FFB300" # Gold elif license_tier == "starter": tier_color = "#FFD54F" # Light gold elif license_tier == "trial": tier_color = "#BCAAA4" # Grayish else: tier_color = "#9E9E9E" # Gray # Generate gates HTML gates_html = "" for gate_name, gate_result in gate_results.items(): gate_passed = gate_result.get("passed", False) gate_color = "#4CAF50" if gate_passed else "#F44336" gate_required = gate_result.get("required", False) gates_html += f"""
{'✅' if gate_passed else '❌'}
{gate_name.replace('_', ' ').title()} {'' if gate_required else ' (Optional)'}
{gate_result.get('weight', 0):.0%}
{gate_result.get('message', '')}
""" if not gates_html: gates_html = """
🔒
Enterprise gates require a valid license
Get a trial license to see gate evaluation
""" # Execution authority display authority = gate_eval.get("execution_authority", "OSS_ONLY") authority_display = authority.replace("_", " ").title() if authority == "AUTONOMOUS_EXECUTION": authority_color = "#4CAF50" authority_icon = "✅" authority_message = "Autonomous execution permitted" elif authority == "HUMAN_APPROVAL_REQUIRED": authority_color = "#FF9800" authority_icon = "👤" authority_message = "Human approval required" elif authority == "BLOCKED": authority_color = "#F44336" authority_icon = "đŸšĢ" authority_message = "Blocked by mechanical gates" elif authority == "ADVISORY_ONLY": authority_color = "#9E9E9E" authority_icon = "â„šī¸" authority_message = "Trial license - advisory only" else: authority_color = "#2196F3" authority_icon = "đŸ”ĩ" authority_message = "OSS mode - no license" passed_gates = gate_eval.get("passed_gates", 0) total_gates = gate_eval.get("total_gates", 0) return f"""
🟡

ARF {result.get('engine_version', '3.3.9')}

{tier_name} Edition
{license_info.get('enforcement', 'advisory').replace('_', ' ').title()}
License Tier
{tier_name}
Gates Passed
{passed_gates}/{total_gates}
Enforcement
{license_info.get('enforcement', 'advisory').replace('_', ' ').title()}

Mechanical Gates

{gates_html}
{authority_icon}
{authority_display}
{authority_message}
{gate_eval.get('recommendation', '')}
🚀

Enterprise Benefits

Mechanical Enforcement: {result.get('benefit', 'Automated decision making')}

Audit Trail: ID: {gate_eval.get('audit_id', 'Available with license')}

Decision Speed: {result.get('processing_time', 0):.3f}s

""" def create_comparison_html(oss_result: Dict, enterprise_result: Dict) -> str: """Create comparison visualization""" oss_can_execute = oss_result.get('can_execute', False) enterprise_authority = enterprise_result.get('gate_evaluation', {}).get('execution_authority', 'OSS_ONLY') # Calculate improvements risk_reduction = 0.92 # 92% risk reduction speed_improvement = 100 # 100x faster false_positive_reduction = 0.85 # 85% reduction cost_reduction = 0.75 # 75% OpEx reduction # Determine enterprise value if enterprise_authority == "AUTONOMOUS_EXECUTION": value_score = 95 value_message = "Full autonomous execution enabled" elif enterprise_authority == "HUMAN_APPROVAL_REQUIRED": value_score = 70 value_message = "Human oversight with mechanical validation" elif enterprise_authority == "BLOCKED": value_score = 90 # Still valuable because it prevented a bad action value_message = "Risk prevented by mechanical gates" else: value_score = 30 value_message = "Upgrade needed for mechanical enforcement" return f"""

📊 Side-by-Side Comparison

đŸ”ĩ OSS 3.3.9

{'âš ī¸' if not oss_can_execute else '✅'}
{'Advisory Only' if not oss_can_execute else 'Can Execute'}
Human Decision Required
Risk: {oss_result.get('risk_level', 'Unknown').upper()}
{oss_result.get('policy_violations', 0)} Policy Violations
Processing: {oss_result.get('processing_time', 0):.3f}s

🟡 Enterprise

{'✅' if enterprise_authority == 'AUTONOMOUS_EXECUTION' else '⛔' if enterprise_authority == 'BLOCKED' else '👤'}
{enterprise_authority.replace('_', ' ').title()}
Mechanical Enforcement
{enterprise_result.get('gate_evaluation', {}).get('passed_gates', 0)}/{enterprise_result.get('gate_evaluation', {}).get('total_gates', 0)} Gates Passed
License: {enterprise_result.get('license_info', {}).get('name', 'Trial').title()}
Audit: {enterprise_result.get('gate_evaluation', {}).get('audit_id', 'N/A')[:8]}

đŸŽ¯ Enterprise Value Proposition

Value Score: {value_score}/100
{value_message}
Risk Reduction
{risk_reduction:.0%}
Decision Speed
{speed_improvement}x
False Positives
-{false_positive_reduction:.0%}
Operational Cost
-{cost_reduction:.0%}
Based on industry benchmarks and customer data

🚀 Ready to Upgrade?

Get mechanical enforcement, audit trails, and autonomous execution

Starter: $2,000/mo
Professional: $5,000/mo
Enterprise: $15,000/mo
""" def create_license_tier_html() -> str: """Create HTML for license tier comparison""" license_manager = MockEnterpriseLicenseManager() tiers = license_manager.get_tier_comparison() tiers_html = "" for tier in tiers: color = { "trial": "#BCAAA4", "starter": "#FFD54F", "professional": "#FFB300", "enterprise": "#FF6F00" }.get(tier["tier"], "#9E9E9E") features_html = "" for feature in tier["features"]: features_html += f'
  • {feature}
  • ' tiers_html += f"""
    {tier["name"]}
    {tier["price"]}
    {tier["max_agents"]} agents â€ĸ {tier["gates"]} gates
    Enforcement: {tier["enforcement"]}
    Key Features:
      {features_html}
    """ return f"""

    đŸ’ŧ License Tier Comparison

    {tiers_html}
    All plans include the full ARF 3.3.9 OSS engine plus mechanical gates
    """ # Gradio Interface def create_demo_interface(): """Create the main Gradio interface""" with gr.Blocks( title="Agentic Reliability Framework (ARF) 3.3.9 Demo", theme=gr.themes.Soft( primary_hue="blue", secondary_hue="orange", font=gr.themes.GoogleFont("Inter") ), css=""" .gradio-container { max-width: 1400px; margin: 0 auto; padding: 20px; } .demo-header { text-align: center; margin-bottom: 30px; } .action-history { max-height: 300px; overflow-y: auto; border: 1px solid #ddd; border-radius: 8px; padding: 10px; } .stats-box { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); padding: 15px; border-radius: 10px; margin: 10px 0; } """ ) as demo: # Header gr.Markdown("""

    🤖 Agentic Reliability Framework (ARF) 3.3.9

    OSS Advisory vs Enterprise Mechanical Enforcement

    Experience the difference between open-source advisory recommendations and enterprise mechanical enforcement with automated gate validation.

    """) # Stats bar stats = arf_processor.get_stats() gr.Markdown(f"""
    {stats['total_processed']}
    Total Actions
    {stats['oss_actions']}
    OSS Evaluations
    {stats['enterprise_actions']}
    Enterprise Evaluations
    {stats['execution_rate']}%
    Execution Rate
    """) # Main control section with gr.Row(): with gr.Column(scale=3): # Scenario selection scenario_select = gr.Dropdown( choices=list(DEMO_SCENARIOS.keys()), label="Select Demo Scenario", value="database_drop", elem_id="scenario_select" ) # Action input action_input = gr.Textbox( label="Action to Evaluate", placeholder="Enter any action (e.g., DROP DATABASE production, deploy_service v1.2.3, etc.)", value="DROP DATABASE production CASCADE", lines=2, elem_id="action_input" ) # Context input context_input = gr.Textbox( label="Context (JSON format)", placeholder='{"environment": "production", "criticality": "high"}', value='{"environment": "production"}', lines=2, elem_id="context_input" ) # License input license_input = gr.Textbox( label="Enterprise License Key (Optional)", placeholder="Enter ARF-TRIAL-XXXXXXX or leave blank for OSS only", value="", elem_id="license_input" ) # Process button with gr.Row(): process_btn = gr.Button( "🚀 Process Action", variant="primary", size="lg", elem_id="process_btn" ) clear_btn = gr.Button( "🔄 Clear", variant="secondary", elem_id="clear_btn" ) with gr.Column(scale=2): # Quick actions gr.Markdown("### Quick Actions") quick_actions = gr.Radio( choices=["High Risk", "Medium Risk", "Low Risk", "Custom"], label="Pre-built Examples", value="High Risk", elem_id="quick_actions" ) # License status license_status = gr.HTML( value="
    " "
    No license entered
    " "
    Using OSS mode
    " "
    ", label="License Status" ) # Stats display gr.Markdown("### 📈 Live Statistics") stats_display = gr.HTML(elem_id="stats_display") # Results section with gr.Row(): with gr.Column(scale=1): oss_results_html = gr.HTML( label="ARF OSS Results", elem_id="oss_results" ) with gr.Column(scale=1): enterprise_results_html = gr.HTML( label="ARF Enterprise Results", elem_id="enterprise_results" ) # Comparison section comparison_html = gr.HTML( label="Side-by-Side Comparison", elem_id="comparison_html" ) # License tier section license_tier_html = gr.HTML( value=create_license_tier_html(), label="License Tiers", elem_id="license_tier_html" ) # Trial license section with gr.Accordion("🎁 Get 14-Day Trial License", open=False): with gr.Row(): with gr.Column(scale=2): trial_email = gr.Textbox( label="Work Email", placeholder="you@company.com", elem_id="trial_email" ) trial_btn = gr.Button( "Get Trial License", variant="primary", elem_id="trial_btn" ) with gr.Column(scale=3): trial_output = gr.JSON( label="Your Trial License", elem_id="trial_output" ) # Footer gr.Markdown("""

    ARF 3.3.9 Demo | Website | GitHub | Contact Sales

    This demo uses real ARF 3.3.9 OSS code with simulated Enterprise features for demonstration purposes.
    Actual Enterprise features may vary. Mechanical gates shown are simulated for demo purposes.

    """) # Function to process action def process_action(scenario, action, context_str, license_key, quick_action): """Process an action through both OSS and Enterprise""" try: # Update action based on quick action selection if quick_action != "Custom": if quick_action == "High Risk": action = "DELETE FROM users WHERE created_at < '2023-01-01'" elif quick_action == "Medium Risk": action = "ALTER TABLE payments ADD COLUMN processed_at TIMESTAMP" elif quick_action == "Low Risk": action = "SELECT COUNT(*) FROM logs WHERE level = 'ERROR'" # Parse context try: context = json.loads(context_str) if context_str.strip() else {} except: context = {"environment": "production"} # Use scenario if provided if scenario and scenario in DEMO_SCENARIOS: scenario_data = DEMO_SCENARIOS[scenario] action = scenario_data["action"] context = {**context, **scenario_data.get("context", {})} # Process through OSS oss_result = arf_processor.process_oss(action, context) # Process through Enterprise enterprise_result = arf_processor.process_enterprise(action, license_key, context) # Create HTML displays oss_html = create_oss_panel_html(oss_result) enterprise_html = create_enterprise_panel_html(enterprise_result) comparison = create_comparison_html(oss_result, enterprise_result) # Update stats stats = arf_processor.get_stats() stats_display_html = f"""
    Avg Reliability
    {stats['avg_reliability']:.1%}
    Avg Risk
    {stats['avg_risk']:.1%}
    OSS Actions
    {stats['oss_actions']}
    Enterprise Actions
    {stats['enterprise_actions']}
    """ # Update license status license_info = enterprise_result.get("license_info", {}) if license_info.get("valid", False): license_status_html = f"""
    ✅ {license_info.get('name', 'License').title()} Active
    Expires: {license_info.get('expires', 'N/A')} â€ĸ {len(license_info.get('gates_available', []))} gates enabled
    """ else: license_status_html = """
    No license entered
    Using OSS mode
    """ return ( oss_html, enterprise_html, comparison, stats_display_html, license_status_html, oss_result, # For debugging enterprise_result # For debugging ) except Exception as e: error_html = f"""

    ❌ Error Processing Action

    {str(e)}

    {traceback.format_exc()}
                        
    """ return error_html, error_html, error_html, "", "", {}, {} # Function to generate trial license def generate_trial_license(email: str): """Generate a trial license""" if not email or "@" not in email: return { "error": "Please enter a valid email address", "success": False } try: license_manager = MockEnterpriseLicenseManager() result = license_manager.generate_trial_license(email) # Also update the global processor's license manager arf_processor.license_manager = license_manager return result except Exception as e: return { "error": str(e), "success": False } # Function to clear inputs def clear_inputs(): return "", "", "", "High Risk", "", "", "", "", {} # Function to update quick action def update_quick_action(quick_action): examples = { "High Risk": '{"environment": "production", "criticality": "high"}', "Medium Risk": '{"environment": "staging", "service": "api"}', "Low Risk": '{"environment": "development", "read_only": true}', "Custom": '{}' } actions = { "High Risk": "DELETE FROM users WHERE created_at < '2023-01-01'", "Medium Risk": "ALTER TABLE payments ADD COLUMN processed_at TIMESTAMP", "Low Risk": "SELECT COUNT(*) FROM logs WHERE level = 'ERROR'", "Custom": "" } return actions[quick_action], examples[quick_action] # Function to update scenario def update_scenario(scenario): if scenario in DEMO_SCENARIOS: data = DEMO_SCENARIOS[scenario] action = data["action"] context = json.dumps(data.get("context", {}), indent=2) return action, context return "", "{}" # Connect events process_btn.click( fn=process_action, inputs=[scenario_select, action_input, context_input, license_input, quick_actions], outputs=[oss_results_html, enterprise_results_html, comparison_html, stats_display, license_status, gr.JSON(visible=False), gr.JSON(visible=False)], queue=True ) clear_btn.click( fn=clear_inputs, outputs=[action_input, context_input, license_input, quick_actions, oss_results_html, enterprise_results_html, comparison_html, trial_output], queue=False ) quick_actions.change( fn=update_quick_action, inputs=[quick_actions], outputs=[action_input, context_input], queue=False ) scenario_select.change( fn=update_scenario, inputs=[scenario_select], outputs=[action_input, context_input], queue=False ) trial_btn.click( fn=generate_trial_license, inputs=[trial_email], outputs=[trial_output], queue=True ) # Pre-load with demo scenario demo.load( fn=lambda: process_action( "database_drop", "DROP DATABASE production CASCADE", '{"environment": "production", "criticality": "high"}', "", "High Risk" ), outputs=[oss_results_html, enterprise_results_html, comparison_html, stats_display, license_status, gr.JSON(visible=False), gr.JSON(visible=False)], queue=True ) return demo # Main execution if __name__ == "__main__": print("=" * 60) print("🚀 Starting ARF 3.3.9 Hugging Face Spaces Demo") print(f"đŸ“Ļ ARF Version: {ARF_VERSION}") print(f"🤖 ARF Available: {ARF_AVAILABLE}") print("=" * 60) demo = create_demo_interface() # For Hugging Face Spaces, we need to expose the demo demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False )