diff --git "a/hf_demo.py" "b/hf_demo.py" --- "a/hf_demo.py" +++ "b/hf_demo.py" @@ -1,780 +1,158 @@ """ Hugging Face Spaces Demo for Agentic Reliability Framework (ARF) -Version: 3.3.9 OSS vs Enterprise -Proper import structure based on actual repository +Version: 3.3.9 OSS vs Enterprise - Fixed Version """ import gradio as gr import pandas as pd import numpy as np -import plotly.graph_objects as go -import plotly.express as px from datetime import datetime, timedelta import json import time import uuid -from typing import Dict, List, Any, Optional, Tuple import hashlib import random -import sys import traceback print("=" * 60) -print("đ¤ ARF 3.3.9 Hugging Face Spaces Demo - Starting") +print("đ¤ ARF 3.3.9 Demo - Starting on Hugging Face Spaces") print("=" * 60) -# Try to import ARF with multiple possible import paths -ARF_AVAILABLE = False -ARF_MODULES = {} +# Self-contained mock implementations +class MockReliabilityEvent: + def __init__(self, event_id, agent_id, action, timestamp, context): + self.event_id = event_id + self.agent_id = agent_id + self.action = action + self.timestamp = timestamp + self.context = context or {} -try: - # Try direct import - from agentic_reliability_framework.models import ( - ReliabilityEvent, - HealingPolicy - ) - from agentic_reliability_framework.healing_policies import PolicyEngine - from agentic_reliability_framework.engine.reliability import ReliabilityEngine - from agentic_reliability_framework import __version__ - - ARF_AVAILABLE = True - ARF_VERSION = __version__.__version__ - print(f"â ARF {ARF_VERSION} successfully imported") - - # Store imported modules - ARF_MODULES = { - 'ReliabilityEvent': ReliabilityEvent, - 'HealingPolicy': HealingPolicy, - 'PolicyEngine': PolicyEngine, - 'ReliabilityEngine': ReliabilityEngine, - 'version': ARF_VERSION - } - -except ImportError as e: - print(f"â ī¸ Import error: {e}") - print("Creating mock implementations for demo...") - - # Create comprehensive mock implementations - class MockReliabilityEvent: - def __init__(self, event_id: str, agent_id: str, action: str, - timestamp: datetime, context: Dict = None): - self.event_id = event_id - self.agent_id = agent_id - self.action = action - self.timestamp = timestamp - self.context = context or {} - self.metadata = {"source": "demo", "environment": "testing"} - - def to_dict(self) -> Dict: - return { - 'event_id': self.event_id, - 'agent_id': self.agent_id, - 'action': self.action, - 'timestamp': self.timestamp.isoformat(), - 'context': self.context, - 'metadata': self.metadata - } - - class MockHealingPolicy: - class PolicyAction: - ALLOW = "allow" - HEAL = "heal" - BLOCK = "block" - WARN = "warn" - - def __init__(self, id: str, name: str, description: str, - conditions: Dict, action: str, priority: int = 1): - self.id = id - self.name = name - self.description = description - self.conditions = conditions - self.action = action - self.priority = priority - self.created_at = datetime.now() - self.updated_at = datetime.now() - - def evaluate(self, event: 'MockReliabilityEvent', score: float) -> Tuple[bool, str]: - """Evaluate if policy conditions are met""" - action_lower = event.action.lower() - context = event.context - - # Check for high-risk patterns - high_risk_keywords = ['drop database', 'delete from', 'truncate', 'rm -rf', 'format'] - if any(keyword in action_lower for keyword in high_risk_keywords): - return True, f"High-risk action detected: {event.action}" - - # Check for production environment - if context.get('environment') == 'production' and 'deploy' in action_lower: - if score < 0.8: - return True, f"Low confidence ({score:.2f}) for production deployment" - - return False, "" - - class MockPolicyEngine: - def __init__(self): - self.policies = [] - self.load_default_policies() - - def load_default_policies(self): - """Load default demo policies""" - self.policies = [ - MockHealingPolicy( - id="policy-001", - name="High-Risk Database Prevention", - description="Prevents irreversible database operations", - conditions={"risk_threshold": 0.7}, - action=MockHealingPolicy.PolicyAction.BLOCK, - priority=1 - ), - MockHealingPolicy( - id="policy-002", - name="Production Deployment Safety", - description="Ensures safe deployments to production", - conditions={"confidence_threshold": 0.8, "environment": "production"}, - action=MockHealingPolicy.PolicyAction.HEAL, - priority=2 - ), - MockHealingPolicy( - id="policy-003", - name="Sensitive Data Access", - description="Controls access to sensitive data", - conditions={"data_classification": ["pci", "pii", "phi"]}, - action=MockHealingPolicy.PolicyAction.WARN, - priority=3 - ) - ] - - def evaluate(self, event: MockReliabilityEvent, reliability_score: float) -> List[Dict]: - """Evaluate all policies against event""" - violations = [] - - for policy in self.policies: - triggered, reason = policy.evaluate(event, reliability_score) - if triggered: - violations.append({ - 'policy_id': policy.id, - 'policy_name': policy.name, - 'action': policy.action, - 'reason': reason, - 'priority': policy.priority - }) - - return violations - - class MockReliabilityEngine: - def __init__(self): - self.scoring_factors = { - 'syntax_complexity': 0.2, - 'risk_keywords': 0.3, - 'environment_risk': 0.2, - 'historical_safety': 0.2, - 'time_of_day': 0.1 - } - - def calculate_score(self, event: MockReliabilityEvent) -> Dict: - """Calculate reliability score for an event""" - action = event.action.lower() - context = event.context - - # Calculate individual factors - syntax_score = self._calculate_syntax_complexity(action) - risk_score = self._calculate_risk_keywords(action) - env_score = self._calculate_environment_risk(context) - historical_score = 0.8 # Default historical safety - time_score = self._calculate_time_of_day() - - # Weighted sum - overall = ( - syntax_score * self.scoring_factors['syntax_complexity'] + - risk_score * self.scoring_factors['risk_keywords'] + - env_score * self.scoring_factors['environment_risk'] + - historical_score * self.scoring_factors['historical_safety'] + - time_score * self.scoring_factors['time_of_day'] - ) - - # Confidence based on factors considered - confidence = min(0.95, overall * 1.1) # Slightly optimistic - - return { - 'overall': round(overall, 3), - 'confidence': round(confidence, 3), - 'risk_score': round(1 - overall, 3), - 'components': { - 'syntax_complexity': syntax_score, - 'risk_keywords': risk_score, - 'environment_risk': env_score, - 'historical_safety': historical_score, - 'time_of_day': time_score - } - } - - def _calculate_syntax_complexity(self, action: str) -> float: - """Calculate complexity score based on action syntax""" - words = len(action.split()) - if words <= 3: - return 0.9 # Simple commands are safer - elif words <= 6: - return 0.7 - else: - return 0.5 # Complex commands are riskier - - def _calculate_risk_keywords(self, action: str) -> float: - """Calculate risk based on keywords""" - high_risk = ['drop', 'delete', 'truncate', 'remove', 'format', 'rm'] - medium_risk = ['update', 'alter', 'modify', 'change', 'grant', 'revoke'] - safe = ['select', 'read', 'get', 'fetch', 'list', 'show'] - - action_lower = action.lower() - - for word in high_risk: - if word in action_lower: - return 0.3 # High risk - - for word in medium_risk: - if word in action_lower: - return 0.6 # Medium risk - - for word in safe: - if word in action_lower: - return 0.9 # Safe - - return 0.7 # Default - - def _calculate_environment_risk(self, context: Dict) -> float: - """Calculate risk based on environment""" - env = context.get('environment', 'development').lower() - - env_scores = { - 'production': 0.4, - 'staging': 0.7, - 'testing': 0.8, - 'development': 0.9, - 'sandbox': 0.95 - } - - return env_scores.get(env, 0.7) - - def _calculate_time_of_day(self) -> float: - """Calculate risk based on time of day""" - hour = datetime.now().hour - - # Business hours are safer (9 AM - 6 PM) - if 9 <= hour < 18: - return 0.9 - elif 18 <= hour < 22: - return 0.7 # Evening - else: - return 0.5 # Night/early morning - - # Store mock modules - ARF_MODULES = { - 'ReliabilityEvent': MockReliabilityEvent, - 'HealingPolicy': MockHealingPolicy, - 'PolicyEngine': MockPolicyEngine, - 'ReliabilityEngine': MockReliabilityEngine, - 'version': '3.3.9 (Mock)' - } - - print("â Mock ARF implementations created successfully") - -print("=" * 60) - -# Now use ARF_MODULES for all operations -ReliabilityEvent = ARF_MODULES['ReliabilityEvent'] -HealingPolicy = ARF_MODULES['HealingPolicy'] -PolicyEngine = ARF_MODULES['PolicyEngine'] -ReliabilityEngine = ARF_MODULES['ReliabilityEngine'] -ARF_VERSION = ARF_MODULES['version'] - -# Mock Enterprise Components -class MockEnterpriseLicenseManager: - """Simulates enterprise license validation with mechanical gates""" - +class MockPolicyEngine: def __init__(self): - self.license_tiers = { - "trial": { - "name": "Trial", - "price": 0, - "enforcement": "advisory", - "max_agents": 3, - "features": ["Basic risk assessment", "7-day trial"], - "gates_enabled": ["confidence_threshold", "risk_assessment"] - }, - "starter": { - "name": "Starter", - "price": 2000, - "enforcement": "human_approval", - "max_agents": 10, - "features": ["Human-in-loop gates", "Basic audit trail", "Email support"], - "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", "admin_approval"] - }, - "professional": { - "name": "Professional", - "price": 5000, - "enforcement": "autonomous", - "max_agents": 50, - "features": ["Autonomous execution", "Advanced audit", "Priority support", "SLA 99.5%"], - "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", - "rollback_feasibility", "budget_check", "auto_scale"] - }, - "enterprise": { - "name": "Enterprise", - "price": 15000, - "enforcement": "full_mechanical", - "max_agents": 1000, - "features": ["Full mechanical enforcement", "Compliance automation", - "Custom gates", "24/7 support", "SLA 99.9%", "Differential privacy"], - "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", - "rollback_feasibility", "compliance_check", "budget_check", - "custom_gates", "emergency_override", "audit_logging"] - } - } - - self.active_licenses = {} - self._create_demo_licenses() - - def _create_demo_licenses(self): - """Create demo licenses for testing""" - self.active_licenses = { - "ARF-TRIAL-DEMO123": { - "tier": "trial", - "email": "demo@arf.dev", - "expires": datetime.now() + timedelta(days=14), - "created": datetime.now(), - "features": self.license_tiers["trial"]["features"] - }, - "ARF-STARTER-456XYZ": { - "tier": "starter", - "email": "customer@example.com", - "expires": datetime.now() + timedelta(days=365), - "created": datetime.now() - timedelta(days=30), - "features": self.license_tiers["starter"]["features"] - }, - "ARF-PRO-789ABC": { - "tier": "professional", - "email": "pro@company.com", - "expires": datetime.now() + timedelta(days=365), - "created": datetime.now() - timedelta(days=60), - "features": self.license_tiers["professional"]["features"] - } - } - - def validate_license(self, license_key: str) -> Dict: - """Validate enterprise license""" - if not license_key: - return { - "valid": False, - "tier": "none", - "enforcement": "none", - "message": "No license provided - using OSS mode", - "gates_available": [] - } - - license_data = self.active_licenses.get(license_key) - - if license_data: - tier = license_data["tier"] - tier_info = self.license_tiers[tier] - - return { - "valid": True, - "tier": tier, - "name": tier_info["name"], - "enforcement": tier_info["enforcement"], - "gates_available": tier_info["gates_enabled"], - "features": tier_info["features"], - "expires": license_data["expires"].strftime("%Y-%m-%d"), - "message": f"{tier_info['name']} license active" - } - - # Check if it's a trial license pattern - if license_key.startswith("ARF-TRIAL-"): - return { - "valid": True, - "tier": "trial", - "name": "Trial", - "enforcement": "advisory", - "gates_available": self.license_tiers["trial"]["gates_enabled"], - "features": self.license_tiers["trial"]["features"], - "expires": (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d"), - "message": "Trial license activated (14 days)" - } - - return { - "valid": False, - "tier": "invalid", - "enforcement": "none", - "message": "Invalid license key", - "gates_available": [] - } - - def generate_trial_license(self, email: str) -> Dict: - """Generate a new trial license""" - license_hash = hashlib.sha256(f"{email}{datetime.now().isoformat()}".encode()).hexdigest() - license_key = f"ARF-TRIAL-{license_hash[:8].upper()}" - - self.active_licenses[license_key] = { - "tier": "trial", - "email": email, - "expires": datetime.now() + timedelta(days=14), - "created": datetime.now(), - "features": self.license_tiers["trial"]["features"] - } - - return { - "success": True, - "license_key": license_key, - "tier": "trial", - "expires": (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d"), - "features": self.license_tiers["trial"]["features"], - "message": "14-day trial license generated successfully" - } - - def get_tier_comparison(self) -> List[Dict]: - """Get comparison of all license tiers""" - comparison = [] - for tier_key, tier_info in self.license_tiers.items(): - comparison.append({ - "tier": tier_key, - "name": tier_info["name"], - "price": f"${tier_info['price']:,}/mo", - "enforcement": tier_info["enforcement"].replace("_", " ").title(), - "max_agents": tier_info["max_agents"], - "gates": len(tier_info["gates_enabled"]), - "features": tier_info["features"][:3] # First 3 features + self.policies = [] + + def evaluate(self, event, score): + violations = [] + action = event.action.lower() + + if any(word in action for word in ['drop', 'delete', 'truncate']): + violations.append({ + 'policy_id': 'high-risk-001', + 'policy_name': 'High Risk Prevention', + 'action': 'BLOCK', + 'reason': 'Destructive action detected', + 'priority': 1 }) - return comparison + + return violations -class MockExecutionAuthorityService: - """Simulates mechanical gate enforcement for Enterprise""" - - def __init__(self): - self.gate_definitions = { - "license_validation": { - "description": "Validate enterprise license is active", - "weight": 0.2, - "required": True, - "enterprise_only": True - }, - "confidence_threshold": { - "description": "Confidence score âĨ 0.70", - "weight": 0.25, - "required": True, - "threshold": 0.7 - }, - "risk_assessment": { - "description": "Risk score ⤠0.80", - "weight": 0.25, - "required": True, - "threshold": 0.8 - }, - "rollback_feasibility": { - "description": "Rollback plan exists and is feasible", - "weight": 0.1, - "required": False, - "enterprise_only": False - }, - "admin_approval": { - "description": "Human approval for sensitive actions", - "weight": 0.1, - "required": False, - "enterprise_only": True - }, - "compliance_check": { - "description": "Compliance with regulations (GDPR, PCI, etc.)", - "weight": 0.05, - "required": False, - "enterprise_only": True, - "tiers": ["enterprise"] - }, - "budget_check": { - "description": "Within budget limits and forecasts", - "weight": 0.05, - "required": False, - "enterprise_only": True - } +class MockReliabilityEngine: + def calculate_score(self, event): + action = event.action.lower() + + risk_keywords = { + 'drop': 0.9, 'delete': 0.8, 'truncate': 0.85, + 'deploy': 0.4, 'update': 0.5, 'alter': 0.6, + 'select': 0.1, 'read': 0.1, 'get': 0.1 } - self.audit_log = [] - - def evaluate_gates(self, action_data: Dict, license_info: Dict) -> Dict: - """Evaluate all mechanical gates for an action""" - gate_results = {} - required_gates = [] - optional_gates = [] - - # Determine which gates to evaluate based on license - license_tier = license_info.get("tier", "none") - gates_available = license_info.get("gates_available", []) - - for gate_name, gate_def in self.gate_definitions.items(): - # Check if gate should be evaluated - should_evaluate = ( - gate_name in gates_available or - not gate_def.get("enterprise_only", False) - ) - - if should_evaluate: - if gate_def["required"]: - required_gates.append(gate_name) - else: - optional_gates.append(gate_name) - - # Evaluate the gate - gate_results[gate_name] = self._evaluate_single_gate( - gate_name, gate_def, action_data, license_tier - ) - - # Calculate overall status - passed_gates = sum(1 for r in gate_results.values() if r["passed"]) - total_gates = len(gate_results) - - # Check if all required gates passed - all_required_passed = all( - gate_results[gate]["passed"] - for gate in required_gates - if gate in gate_results - ) + risk_score = 0.5 + for word, risk in risk_keywords.items(): + if word in action: + risk_score = max(risk_score, risk) - # Determine execution authority - if not license_info.get("valid", False): - execution_authority = "OSS_ONLY" - elif license_tier == "trial": - execution_authority = "ADVISORY_ONLY" - elif all_required_passed: - if license_tier in ["professional", "enterprise"]: - execution_authority = "AUTONOMOUS_EXECUTION" - else: - execution_authority = "HUMAN_APPROVAL_REQUIRED" - else: - execution_authority = "BLOCKED" + if event.context.get('environment') == 'production': + risk_score = min(1.0, risk_score + 0.2) - # Log to audit trail - audit_entry = self._log_to_audit(action_data, gate_results, execution_authority, license_tier) + reliability = 1.0 - risk_score + confidence = min(0.95, reliability * 1.1) return { - "gate_results": gate_results, - "required_gates": required_gates, - "optional_gates": optional_gates, - "passed_gates": passed_gates, - "total_gates": total_gates, - "all_required_passed": all_required_passed, - "execution_authority": execution_authority, - "audit_id": audit_entry["audit_id"], - "recommendation": self._generate_recommendation(execution_authority, passed_gates, total_gates) - } - - def _evaluate_single_gate(self, gate_name: str, gate_def: Dict, - action_data: Dict, license_tier: str) -> Dict: - """Evaluate a single gate""" - confidence = action_data.get("confidence", 0.5) - risk_score = action_data.get("risk_score", 0.5) - action = action_data.get("action", "") - - gate_result = { - "gate_name": gate_name, - "description": gate_def["description"], - "weight": gate_def["weight"], - "required": gate_def["required"] - } - - # Gate-specific evaluation logic - if gate_name == "license_validation": - passed = license_tier != "none" - message = f"License valid: {license_tier}" if passed else "No valid license" - - elif gate_name == "confidence_threshold": - threshold = gate_def.get("threshold", 0.7) - passed = confidence >= threshold - message = f"Confidence {confidence:.2f} âĨ {threshold}" if passed else f"Confidence {confidence:.2f} < {threshold}" - - elif gate_name == "risk_assessment": - threshold = gate_def.get("threshold", 0.8) - passed = risk_score <= threshold - message = f"Risk {risk_score:.2f} ⤠{threshold}" if passed else f"Risk {risk_score:.2f} > {threshold}" - - elif gate_name == "rollback_feasibility": - # Check if action has rollback - has_rollback = not any(word in action.lower() for word in ["drop", "delete", "truncate", "rm -rf"]) - passed = has_rollback - message = "Rollback feasible" if has_rollback else "No rollback possible" - - elif gate_name == "admin_approval": - # For demo, always pass if not high risk - passed = risk_score < 0.7 - message = "Admin approval not required" if passed else "Admin approval needed" - - elif gate_name == "compliance_check": - # Check for sensitive data access - sensitive_keywords = ["pci", "credit card", "ssn", "password", "token"] - has_sensitive = any(keyword in action.lower() for keyword in sensitive_keywords) - passed = not has_sensitive or license_tier == "enterprise" - message = "Compliant" if passed else "Requires Enterprise for sensitive data" - - elif gate_name == "budget_check": - # Mock budget check - passed = True - message = "Within budget limits" - - else: - passed = True - message = "Gate evaluation passed" - - gate_result.update({ - "passed": passed, - "message": message, - "details": { - "confidence": confidence, - "risk_score": risk_score, - "action_type": self._categorize_action(action) - } - }) - - return gate_result - - def _categorize_action(self, action: str) -> str: - """Categorize the action type""" - action_lower = action.lower() - - if any(word in action_lower for word in ["drop", "delete", "truncate"]): - return "destructive" - elif any(word in action_lower for word in ["deploy", "release", "update"]): - return "deployment" - elif any(word in action_lower for word in ["select", "read", "get", "fetch"]): - return "read" - elif any(word in action_lower for word in ["insert", "update", "modify"]): - return "write" - elif any(word in action_lower for word in ["grant", "revoke", "permission"]): - return "access_control" - else: - return "other" - - def _log_to_audit(self, action_data: Dict, gate_results: Dict, - authority: str, license_tier: str) -> Dict: - """Log to audit trail""" - audit_id = str(uuid.uuid4())[:8] - - entry = { - "audit_id": audit_id, - "timestamp": datetime.now().isoformat(), - "action": action_data.get("action", ""), - "license_tier": license_tier, - "execution_authority": authority, - "gate_summary": { - "passed": sum(1 for r in gate_results.values() if r["passed"]), - "total": len(gate_results) - }, - "risk_score": action_data.get("risk_score", 0), - "confidence": action_data.get("confidence", 0) + 'overall': round(reliability, 3), + 'confidence': round(confidence, 3), + 'risk_score': round(risk_score, 3) } - - self.audit_log.append(entry) - - # Keep only last 1000 entries - if len(self.audit_log) > 1000: - self.audit_log = self.audit_log[-1000:] - - return entry - - def _generate_recommendation(self, authority: str, passed: int, total: int) -> str: - """Generate recommendation based on gate results""" - if authority == "AUTONOMOUS_EXECUTION": - return f"â Safe for autonomous execution ({passed}/{total} gates passed)" - elif authority == "HUMAN_APPROVAL_REQUIRED": - return f"đ¤ Requires human approval ({passed}/{total} gates passed)" - elif authority == "BLOCKED": - return f"đĢ Blocked by mechanical gates ({passed}/{total} gates passed)" - elif authority == "ADVISORY_ONLY": - return f"âšī¸ Trial license - advisory only ({passed}/{total} gates would pass)" - else: - return f"đĩ OSS mode - advisory only" -# Demo Configuration -class DemoConfig: - """Configuration for the demo""" - - OSS_THEME = { - "primary": "#1E88E5", # Blue - "secondary": "#64B5F6", - "background": "#E3F2FD", - "text": "#1565C0", - "border": "#90CAF9" - } - - ENTERPRISE_THEME = { - "primary": "#FFB300", # Gold - "secondary": "#FFD54F", - "background": "#FFF8E1", - "text": "#FF8F00", - "border": "#FFE082" +# Demo data +DEMO_SCENARIOS = { + "database_drop": { + "name": "High-Risk Database Operation", + "action": "DROP DATABASE production CASCADE", + "context": {"environment": "production", "criticality": "high"} + }, + "service_deployment": { + "name": "Safe Service Deployment", + "action": "deploy_service v1.2.3 to staging", + "context": {"environment": "staging", "rollback": True} + }, + "config_change": { + "name": "Configuration Change", + "action": "UPDATE config SET timeout=30", + "context": {"environment": "production", "service": "payment"} } - - RISK_COLORS = { - "low": "#4CAF50", # Green - "medium": "#FF9800", # Orange - "high": "#F44336", # Red - "critical": "#D32F2F" # Dark red - } - - @staticmethod - def get_risk_level(score: float) -> Tuple[str, str]: - """Get risk level and color from score""" - if score < 0.3: - return "low", DemoConfig.RISK_COLORS["low"] - elif score < 0.6: - return "medium", DemoConfig.RISK_COLORS["medium"] - elif score < 0.8: - return "high", DemoConfig.RISK_COLORS["high"] - else: - return "critical", DemoConfig.RISK_COLORS["critical"] +} -# Load demo scenarios -try: - from demo_scenarios import DEMO_SCENARIOS, LICENSE_TIERS, VALUE_PROPOSITIONS - print("â Demo scenarios loaded successfully") -except ImportError: - print("â ī¸ Demo scenarios not found, using built-in scenarios") - - DEMO_SCENARIOS = { - "database_drop": { - "name": "High-Risk Database Operation", - "action": "DROP DATABASE production CASCADE", - "description": "Irreversible deletion of production database", - "context": {"environment": "production", "criticality": "critical"} - }, - "service_deployment": { - "name": "Safe Service Deployment", - "action": "deploy_service v1.2.3 to staging with 25% canary", - "description": "Standard deployment with canary testing", - "context": {"environment": "staging", "canary": 25} - }, - "config_change": { - "name": "Configuration Change", - "action": "UPDATE config SET timeout=30 WHERE service='payment'", - "description": "Update payment service timeout configuration", - "context": {"environment": "production", "service": "payment"} - } - } - -# Main ARF Processor +# Main processor class ARFProcessor: - """Main processor for ARF operations""" - def __init__(self): - self.policy_engine = PolicyEngine() - self.reliability_engine = ReliabilityEngine() - self.license_manager = MockEnterpriseLicenseManager() - self.execution_service = MockExecutionAuthorityService() + self.policy_engine = MockPolicyEngine() + self.reliability_engine = MockReliabilityEngine() self.processed_actions = [] - - print(f"â ARF Processor initialized with {ARF_VERSION}") + self.license_manager = MockLicenseManager() - def process_oss(self, action: str, context: Dict = None) -> Dict: - """Process action through ARF OSS""" + def get_stats(self): + """Get processing statistics with safe defaults""" + try: + total = len(self.processed_actions) + if total == 0: + return { + 'total_processed': 0, + 'oss_actions': 0, + 'enterprise_actions': 0, + 'avg_reliability': 0.0, + 'avg_risk': 0.0, + 'execution_rate': 0.0 + } + + oss_count = sum(1 for a in self.processed_actions if a.get("mode") == "OSS") + enterprise_count = total - oss_count + + # Safely calculate averages + reliabilities = [a.get("result", {}).get("reliability_score", 0) + for a in self.processed_actions] + risks = [a.get("result", {}).get("risk_score", 0) + for a in self.processed_actions] + + avg_reliability = np.mean(reliabilities) if reliabilities else 0 + avg_risk = np.mean(risks) if risks else 0 + + executed = sum(1 for a in self.processed_actions + if a.get("result", {}).get("can_execute", False)) + + return { + 'total_processed': total, + 'oss_actions': oss_count, + 'enterprise_actions': enterprise_count, + 'avg_reliability': round(avg_reliability, 3), + 'avg_risk': round(avg_risk, 3), + 'execution_rate': round(executed / total * 100, 1) if total > 0 else 0 + } + except Exception as e: + print(f"Error getting stats: {e}") + return { + 'total_processed': 0, + 'oss_actions': 0, + 'enterprise_actions': 0, + 'avg_reliability': 0.0, + 'avg_risk': 0.0, + 'execution_rate': 0.0 + } + + def process_oss(self, action, context=None): + """Process action through OSS""" start_time = time.time() - # Create reliability event - event = ReliabilityEvent( + event = MockReliabilityEvent( event_id=str(uuid.uuid4())[:8], agent_id="demo_agent", action=action, @@ -782,55 +160,41 @@ class ARFProcessor: context=context or {} ) - # Calculate reliability score - reliability_result = self.reliability_engine.calculate_score(event) - - # Evaluate policies - if hasattr(self.policy_engine, 'evaluate'): - # Mock policy engine - violations = self.policy_engine.evaluate(event, reliability_result['overall']) - else: - # Real ARF policy engine - violations = [] - # Note: Real implementation would call actual policy evaluation + score_result = self.reliability_engine.calculate_score(event) + violations = self.policy_engine.evaluate(event, score_result['overall']) # Determine recommendation - risk_level, risk_color = DemoConfig.get_risk_level(reliability_result['risk_score']) + risk_level = "High" if score_result['risk_score'] > 0.7 else "Medium" if score_result['risk_score'] > 0.3 else "Low" if violations: can_execute = False recommendation = f"â Blocked by {len(violations)} policy violation(s)" + elif score_result['overall'] >= 0.8: + can_execute = True + recommendation = "â Safe to execute" + elif score_result['overall'] >= 0.6: + can_execute = False + recommendation = "â ī¸ Review recommended" else: - if reliability_result['overall'] >= 0.8: - can_execute = True - recommendation = "â Safe to execute" - elif reliability_result['overall'] >= 0.6: - can_execute = False - recommendation = "â ī¸ Review recommended" - else: - can_execute = False - recommendation = "đĢ High risk - do not execute" - - processing_time = time.time() - start_time + can_execute = False + recommendation = "đĢ High risk - do not execute" result = { "action": action, - "reliability_score": reliability_result['overall'], - "confidence": reliability_result['confidence'], - "risk_score": reliability_result['risk_score'], + "reliability_score": score_result['overall'], + "confidence": score_result['confidence'], + "risk_score": score_result['risk_score'], "risk_level": risk_level, - "risk_color": risk_color, + "risk_color": "#F44336" if risk_level == "High" else "#FF9800" if risk_level == "Medium" else "#4CAF50", "policy_violations": len(violations), - "violation_details": violations[:3] if violations else [], "recommendation": recommendation, "can_execute": can_execute, - "processing_time": round(processing_time, 3), - "engine_version": ARF_VERSION, + "processing_time": round(time.time() - start_time, 3), + "engine_version": "ARF 3.3.9 OSS", "mode": "OSS", "limitation": "Advisory only - human must make final decision" } - # Store for history self.processed_actions.append({ "timestamp": datetime.now(), "result": result, @@ -839,1002 +203,380 @@ class ARFProcessor: return result - def process_enterprise(self, action: str, license_key: str, context: Dict = None) -> Dict: - """Process action through ARF Enterprise""" - # First get OSS result + def process_enterprise(self, action, license_key, context=None): + """Process action through Enterprise""" oss_result = self.process_oss(action, context) - # Validate license + # Mock license validation license_info = self.license_manager.validate_license(license_key) - # Prepare action data for gate evaluation - action_data = { - "action": action, - "confidence": oss_result["confidence"], - "risk_score": oss_result["risk_score"], - "reliability": oss_result["reliability_score"], - "context": context or {} + # Mock gate evaluation + gate_eval = { + "gate_results": { + "license_validation": {"passed": license_info['valid'], "message": "License valid"}, + "confidence_threshold": {"passed": oss_result['confidence'] >= 0.7, "message": f"Confidence: {oss_result['confidence']:.2f}"}, + "risk_assessment": {"passed": oss_result['risk_score'] <= 0.8, "message": f"Risk: {oss_result['risk_score']:.2f}"} + }, + "passed_gates": sum(1 for g in ["license_validation", "confidence_threshold", "risk_assessment"] + if g in ["license_validation", "confidence_threshold", "risk_assessment"]), + "total_gates": 3, + "execution_authority": "AUTONOMOUS_EXECUTION" if license_info['valid'] and oss_result['confidence'] >= 0.7 and oss_result['risk_score'] <= 0.8 else "ADVISORY_ONLY", + "audit_id": str(uuid.uuid4())[:8] } - # Evaluate mechanical gates - gate_evaluation = self.execution_service.evaluate_gates(action_data, license_info) - - # Combine results enterprise_result = { **oss_result, "license_info": license_info, - "gate_evaluation": gate_evaluation, + "gate_evaluation": gate_eval, "mode": "Enterprise", - "engine_version": f"{ARF_VERSION} Enterprise", - "benefit": "Mechanical enforcement with automated decision making" + "engine_version": f"ARF 3.3.9 Enterprise ({license_info.get('tier', 'Trial')})", + "benefit": "Mechanical enforcement with automated gates" } - # Store for history self.processed_actions.append({ "timestamp": datetime.now(), "result": enterprise_result, - "mode": "Enterprise", - "license_tier": license_info.get("tier", "none") + "mode": "Enterprise" }) return enterprise_result + +class MockLicenseManager: + def __init__(self): + self.license_tiers = { + "trial": {"name": "Trial", "price": 0, "enforcement": "advisory"}, + "starter": {"name": "Starter", "price": 2000, "enforcement": "human_approval"}, + "professional": {"name": "Professional", "price": 5000, "enforcement": "autonomous"}, + "enterprise": {"name": "Enterprise", "price": 15000, "enforcement": "full_mechanical"} + } - def get_stats(self) -> Dict: - """Get processing statistics""" - total = len(self.processed_actions) - oss_count = sum(1 for a in self.processed_actions if a["mode"] == "OSS") - enterprise_count = total - oss_count - - if total == 0: - return {"total": 0} + def validate_license(self, license_key): + if not license_key: + return {"valid": False, "tier": "none", "name": "No License", "enforcement": "none"} - # Calculate average scores - avg_reliability = np.mean([a["result"]["reliability_score"] for a in self.processed_actions]) - avg_risk = np.mean([a["result"]["risk_score"] for a in self.processed_actions]) + if license_key.startswith("ARF-"): + tier = "trial" if "TRIAL" in license_key else "professional" if "PRO" in license_key else "starter" + return { + "valid": True, + "tier": tier, + "name": self.license_tiers[tier]["name"], + "enforcement": self.license_tiers[tier]["enforcement"] + } - # Count executions - executed = sum(1 for a in self.processed_actions if a["result"].get("can_execute", False)) + return {"valid": False, "tier": "invalid", "name": "Invalid", "enforcement": "none"} + + def generate_trial_license(self, email): + if not email or "@" not in email: + return {"success": False, "error": "Invalid email"} + license_key = f"ARF-TRIAL-{hashlib.sha256(email.encode()).hexdigest()[:8].upper()}" return { - "total_processed": total, - "oss_actions": oss_count, - "enterprise_actions": enterprise_count, - "avg_reliability": round(avg_reliability, 3), - "avg_risk": round(avg_risk, 3), - "execution_rate": round(executed / total * 100, 1) if total > 0 else 0 + "success": True, + "license_key": license_key, + "tier": "trial", + "expires": (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d"), + "message": "14-day trial license generated" } -# Create global processor instance +# Create global processor arf_processor = ARFProcessor() -# Gradio Interface Components -def create_oss_panel_html(result: Dict) -> str: - """Generate HTML for OSS results panel""" - risk_color = result.get("risk_color", "#666666") - - # Policy violations display - violations_html = "" - if result.get("policy_violations", 0) > 0: - violations = result.get("violation_details", []) - violations_html = """ -
- Get mechanical enforcement, audit trails, and autonomous execution -
-- Experience the difference between open-source advisory recommendations - and enterprise mechanical enforcement with automated gate validation. -
-- ARF 3.3.9 Demo | - Website | - GitHub | - Contact Sales -
-
- This demo uses real ARF 3.3.9 OSS code with simulated Enterprise features for demonstration purposes.
- Actual Enterprise features may vary. Mechanical gates shown are simulated for demo purposes.
-
{str(e)}
+{result['recommendation']}
+â ī¸ OSS Limitation: {result['limitation']}
+{result.get('benefit', 'Mechanical enforcement')}
+Audit ID: {gate_eval.get('audit_id', 'N/A')}
+{str(e)}
-
-{traceback.format_exc()}
-