diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,2407 +1,317 @@ +# agentic_reliability_framework/app.py (OSS-ONLY VERSION) """ -Enterprise Agentic Reliability Framework - PRODUCTION ENTERPRISE VERSION -Multi-Agent AI System for Production Reliability Monitoring +Agentic Reliability Framework - OSS Edition DEMO +Demo application showing OSS advisory capabilities -CRITICAL FIXES FOR ENTERPRISE SALES: -- Enterprise-scale revenue calculations ($5K+/minute, not $100/min) -- Realistic ROI for $47K+ implementations -- Updated demo scenarios with million-dollar impacts -- Enterprise ROI calculator dashboard +NOTES: +- OSS edition: Advisory only, no execution +- HealingIntent creation only, no execution +- In-memory storage only, no persistence """ -import os -import json -import numpy as np -import gradio as gr -import requests -import pandas as pd +import asyncio import datetime -import threading import logging -import asyncio -import tempfile -from typing import List, Dict, Any, Optional, Tuple -from collections import deque, OrderedDict -from dataclasses import dataclass, asdict -from enum import Enum -from concurrent.futures import ProcessPoolExecutor -from queue import Queue -from circuitbreaker import circuit -import atomicwrites +import sys +from typing import Optional, Dict, Any -# Import our modules -from models import ( - ReliabilityEvent, EventSeverity, AnomalyResult, - HealingAction, ForecastResult, PolicyCondition -) -from healing_policies import PolicyEngine, DEFAULT_HEALING_POLICIES +import gradio as gr +import numpy as np -# === Logging Configuration === -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +# Import OSS components directly +from agentic_reliability_framework.arf_core.models.healing_intent import ( + HealingIntent, + create_rollback_intent, + create_restart_intent, + create_scale_out_intent, ) -logger = logging.getLogger(__name__) - -# === ENTERPRISE-SCALE CONSTANTS === -class Constants: - """Enterprise-scale constants for $47K+ implementations""" - - # === ENTERPRISE REVENUE SCALE === - # OLD: BASE_REVENUE_PER_MINUTE = 100.0 # $100/min = $6K/hour (WRONG for enterprise) - # NEW: Enterprise reality for $47K deals: - BASE_REVENUE_PER_MINUTE = 5000.0 # $5K/min = $300K/hour = $7.2M/month business - BASE_USERS = 10000 # 10K active users, not 1K - - # === ENTERPRISE IMPACT MULTIPLIERS === - LATENCY_IMPACT_MULTIPLIER = 0.5 # Every 100ms over threshold costs 0.5% revenue - ERROR_IMPACT_MULTIPLIER = 2.0 # Every 1% error rate costs 2% revenue - RESOURCE_IMPACT_MULTIPLIER = 1.5 # Resource exhaustion compounds impact - - # === ENTERPRISE RESPONSE TIMES === - INDUSTRY_AVG_RESPONSE_MINUTES = 45 # Enterprise reality: 45+ minutes, not 14 - ARF_AVG_RESPONSE_MINUTES = 2.3 - TIME_IMPROVEMENT_PCT = ((45 - 2.3) / 45) * 100 # 95% faster - - # === ENTERPRISE INCIDENT FREQUENCY === - MONTHLY_INCIDENTS_ENTERPRISE = 20 # 20 incidents/month (real enterprise) - ANNUAL_INCIDENTS = 240 # 240 incidents/year - AUTO_HEAL_RATE_ENTERPRISE = 0.7 # 70% auto-heal rate (conservative) - - # === THRESHOLDS === - LATENCY_WARNING = 150.0 - LATENCY_CRITICAL = 300.0 - LATENCY_EXTREME = 500.0 - - ERROR_RATE_WARNING = 0.05 - ERROR_RATE_HIGH = 0.15 - ERROR_RATE_CRITICAL = 0.3 - - CPU_WARNING = 0.8 - CPU_CRITICAL = 0.9 - - MEMORY_WARNING = 0.8 - MEMORY_CRITICAL = 0.9 - - # === FORECASTING === - SLOPE_THRESHOLD_INCREASING = 5.0 - SLOPE_THRESHOLD_DECREASING = -2.0 - - FORECAST_MIN_DATA_POINTS = 5 - FORECAST_LOOKAHEAD_MINUTES = 15 - - # === PERFORMANCE === - HISTORY_WINDOW = 50 - MAX_EVENTS_STORED = 1000 - AGENT_TIMEOUT_SECONDS = 5 - CACHE_EXPIRY_MINUTES = 15 - - # === FAISS === - FAISS_BATCH_SIZE = 10 - FAISS_SAVE_INTERVAL_SECONDS = 30 - VECTOR_DIM = 384 - - # === RATE LIMITING === - MAX_REQUESTS_PER_MINUTE = 60 - MAX_REQUESTS_PER_HOUR = 500 - - -# === Configuration === -class Config: - """Centralized configuration for the reliability framework""" - HF_TOKEN: str = os.getenv("HF_TOKEN", "").strip() - HF_API_URL: str = "https://router.huggingface.co/hf-inference/v1/completions" - - INDEX_FILE: str = os.getenv("INDEX_FILE", "data/incident_vectors.index") - TEXTS_FILE: str = os.getenv("TEXTS_FILE", "data/incident_texts.json") - DATA_DIR: str = os.getenv("DATA_DIR", "data") - - # Create data directory if it doesn't exist - os.makedirs(DATA_DIR, exist_ok=True) +from agentic_reliability_framework.arf_core.engine.simple_mcp_client import OSSMCPClient +logger = logging.getLogger(__name__) -config = Config() -HEADERS = {"Authorization": f"Bearer {config.HF_TOKEN}"} if config.HF_TOKEN else {} +# ============================================================================ +# DEMO SCENARIOS FOR OSS EDITION +# ============================================================================ -# === ENTERPRISE DEMO SCENARIOS === DEMO_SCENARIOS = { - "๐Ÿ›๏ธ Black Friday Crisis": { - "description": "2:47 AM on Black Friday. Payment processing failing. $500K/minute at risk.", - "component": "payment-service", - "latency": 450, - "error_rate": 0.22, - "throughput": 8500, - "cpu_util": 0.95, - "memory_util": 0.88, - "story": """ -**ENTERPRISE SCENARIO: Black Friday Payment Crisis** - -๐Ÿ• **Time:** 2:47 AM EST -๐Ÿ’ฐ **Revenue at Risk:** $500,000 per minute -๐Ÿ‘ฅ **Users Impacted:** 45,000 concurrent customers -๐Ÿ”ฅ **Status:** CRITICAL (SLA violation imminent) - -Your payment service is buckling under Black Friday load. Database connection pool -is exhausted (95% utilization). Customers are abandoning carts at 15x normal rate. - -**Enterprise Impact:** -- $2.5M at risk in next 5 minutes -- Stock price impact: 3-5% if public company -- Regulatory penalties if payment data compromised -- Brand damage: 15% increase in social media complaints - -Traditional monitoring would alert you at 500ms latency - by then you've lost $2M. - -**ARF Enterprise Response:** -1. ๐Ÿ•ต๏ธ Detective detects anomaly in 0.8 seconds -2. ๐Ÿ” Diagnostician identifies DB pool exhaustion -3. ๐Ÿ”ฎ Predictive forecasts crash in 8.5 minutes -4. ๐Ÿ”ง Auto-heals: Scales DB pool 3x (saves $1.8M) - """ - }, - - "๐Ÿšจ Database Meltdown": { - "description": "Connection pool exhausted. Cascading failures across 12 services.", - "component": "database", - "latency": 850, - "error_rate": 0.35, - "throughput": 450, - "cpu_util": 0.78, - "memory_util": 0.98, - "story": """ -**ENTERPRISE SCENARIO: Database Connection Pool Exhaustion** - -๐Ÿ• **Time:** 11:23 AM -โš ๏ธ **Impact:** 12 services affected (cascading) -๐Ÿ’ฐ **Revenue Impact:** $1.2M/hour -๐Ÿ”ฅ **Status:** CRITICAL - -Primary database has hit max connections (500/500). API calls timing out. -Errors cascading to dependent services. Customer support calls spiking 800%. - -**Enterprise Impact:** -- 12 microservices failing (cascading failure) -- 78% of customer transactions failing -- Compliance audit failure risk -- $12K/minute in support escalation costs - -This is a textbook cascading failure requiring immediate root cause analysis. - -**ARF Enterprise Response:** -1. Identifies root cause in 1.2 seconds (DB pool exhaustion) -2. Triggers circuit breakers on affected services -3. Recommends connection pool tuning + failover -4. Prevents $850K in lost revenue - """ - }, - - "โšก Viral Traffic Spike": { - "description": "Viral tweet drives 50x traffic. Infrastructure at breaking point.", + "๐Ÿš€ OSS Demo - Latency Spike": { "component": "api-service", - "latency": 280, - "error_rate": 0.12, - "throughput": 15000, - "cpu_util": 0.88, - "memory_util": 0.65, + "latency": 250.0, + "error_rate": 0.15, "story": """ -**ENTERPRISE SCENARIO: Unexpected Viral Traffic** - -๐Ÿ• **Time:** 3:15 PM -๐Ÿ“ˆ **Traffic Spike:** 50x normal load -๐Ÿ’ฐ **At Risk:** $750K in conversion revenue -โš ๏ธ **Status:** HIGH +**OSS ADVISORY DEMO: Latency Spike Detection** -Celebrity tweeted about your product. Traffic jumped from 300 to 15,000 req/sec. -Auto-scaling struggling to keep up. Latency climbing exponentially. +This is the OSS edition showing advisory capabilities only. -**Enterprise Impact:** -- Conversion rate dropped from 3.2% to 0.8% -- 22% cart abandonment rate (normally 2.8%) -- CDN costs spiking $45K/hour -- Load balancers at 92% capacity +๐Ÿ” **What OSS detects:** +- Latency spike from 85ms to 250ms (294% increase) +- Error rate at 15% (above 5% threshold) +- Pattern matches historical incidents -You have 12 minutes before this becomes a full outage. +๐Ÿค– **What OSS recommends:** +- Create HealingIntent for container restart +- Confidence: 85% (based on similar incidents) +- Advisory only - requires Enterprise for execution -**ARF Enterprise Response:** -1. Predictive agent forecasts capacity exhaustion in 12 minutes -2. Triggers emergency scaling 10x -3. Routes traffic to backup regions -4. Preserves $520K in conversion revenue +๐Ÿ’ผ **Enterprise Upgrade Benefits:** +- Autonomous execution of HealingIntent +- Approval workflows for safety +- Learning engine for continuous improvement +- Persistent storage for incident history """ }, - "๐Ÿ”ฅ Memory Leak Discovery": { - "description": "Slow memory leak detected. $250K at risk in 18 minutes.", - "component": "cache-service", - "latency": 320, - "error_rate": 0.05, - "throughput": 2200, - "cpu_util": 0.45, - "memory_util": 0.94, + "๐Ÿ“ˆ OSS Demo - Error Rate Increase": { + "component": "database-service", + "latency": 120.0, + "error_rate": 0.25, "story": """ -**ENTERPRISE SCENARIO: Memory Leak Time Bomb** +**OSS ADVISORY DEMO: Error Rate Analysis** -๐Ÿ• **Time:** 9:42 PM -๐Ÿ’พ **Memory:** 94% (climbing 2%/hour) -โฐ **Time to Crash:** ~18 minutes -๐Ÿ’ฐ **At Risk:** $250K in international revenue +OSS edition analyzes patterns and creates HealingIntent. -Memory leak growing for 8 hours. Most monitoring tools won't catch this -until OOM crash. At current trajectory, service crashes at 10 PM - exactly -when APAC users come online. +๐Ÿ” **OSS Analysis:** +- Error rate at 25% (critical threshold: 15%) +- Moderate latency increase +- Database connection pattern detected -**Enterprise Impact:** -- 65,000 APAC users impacted at login -- $250K in nightly batch processing at risk -- Data corruption risk if crash during transactions -- 8-hour mean time to detect (traditional monitoring) +๐Ÿค– **OSS Recommendation:** +- Create HealingIntent for rollback to previous version +- Confidence: 78% (based on RAG similarity) +- Advisory analysis complete -**ARF Enterprise Response:** -1. Predictive agent spots trend 17 minutes before crash -2. Identifies memory leak pattern (2%/hour growth) -3. Triggers graceful restart + memory dump for analysis -4. Prevents outage during peak APAC hours +๐Ÿ”’ **OSS Safety:** +- No execution capability (safety feature) +- Creates intent for Enterprise execution +- Full audit trail when upgraded """ }, - - "โœ… Normal Operations": { - "description": "Enterprise-scale healthy operations baseline.", - "component": "api-service", - "latency": 85, - "error_rate": 0.008, - "throughput": 1200, - "cpu_util": 0.35, - "memory_util": 0.42, - "story": """ -**ENTERPRISE SCENARIO: Healthy System Baseline** - -๐Ÿ• **Time:** 2:30 PM -โœ… **Status:** NORMAL -๐Ÿ“Š **All Metrics:** Within enterprise SLAs - -Enterprise-scale operations running smoothly: -- 12,000 concurrent users -- $45K/hour revenue processing -- All services within 99.95% SLA - -**ARF Value:** -- Zero false positives (prevents alert fatigue) -- Adaptive thresholds learning from your environment -- Predictive maintenance forecasting -- 95% faster than human triage for real incidents - -*This baseline shows ARF's intelligence in distinguishing real incidents from normal variance* - """ - } } -# === ENTERPRISE ROI CALCULATOR === -def calculate_enterprise_roi(monthly_revenue: float) -> Dict[str, Any]: - """ - Real ROI calculation for enterprise sales ($47K implementations) - - Based on industry data from Fortune 500 deployments - """ - # Real enterprise metrics - incidents_per_month = Constants.MONTHLY_INCIDENTS_ENTERPRISE - avg_downtime_minutes = 120 # 2 hours average enterprise outage - auto_heal_rate = Constants.AUTO_HEAL_RATE_ENTERPRISE - - # Revenue at risk calculation (30% of revenue is service-dependent) - revenue_per_minute = monthly_revenue / (30 * 24 * 60) * 0.3 - - # Without ARF (traditional monitoring) - traditional_detection = Constants.INDUSTRY_AVG_RESPONSE_MINUTES - traditional_loss = incidents_per_month * (avg_downtime_minutes + traditional_detection) * revenue_per_minute - - # With ARF - arf_detection = Constants.ARF_AVG_RESPONSE_MINUTES - # Auto-healed incidents have minimal downtime - arf_loss = incidents_per_month * ( - (avg_downtime_minutes * (1 - auto_heal_rate)) + # Non-auto-healed - (5 * auto_heal_rate) + # Auto-healed recover in 5 min - arf_detection - ) * revenue_per_minute - - monthly_savings = traditional_loss - arf_loss - annual_savings = monthly_savings * 12 - - implementation_cost = 47500 - - return { - "monthly_revenue": monthly_revenue, - "monthly_incidents": incidents_per_month, - "traditional_monthly_loss": traditional_loss, - "arf_monthly_loss": arf_loss, - "monthly_savings": monthly_savings, - "traditional_annual_loss": traditional_loss * 12, - "arf_annual_loss": arf_loss * 12, - "annual_savings": annual_savings, - "implementation_cost": implementation_cost, - "roi_months": round(implementation_cost / monthly_savings, 1) if monthly_savings > 0 else 999, - "first_year_roi": round((annual_savings - implementation_cost) / implementation_cost * 100, 1), - "first_year_net_gain": annual_savings - implementation_cost - } - +# ============================================================================ +# OSS-ONLY APPLICATION LOGIC +# ============================================================================ -# === Input Validation === -def validate_component_id(component_id: str) -> Tuple[bool, str]: - """Validate component ID format""" - if not isinstance(component_id, str): - return False, "Component ID must be a string" - - if not (1 <= len(component_id) <= 255): - return False, "Component ID must be 1-255 characters" - - import re - if not re.match(r"^[a-z0-9-]+$", component_id): - return False, "Component ID must contain only lowercase letters, numbers, and hyphens" - - return True, "" - - -def validate_inputs( - latency: Any, - error_rate: Any, - throughput: Any, - cpu_util: Any, - memory_util: Any -) -> Tuple[bool, str]: +async def analyze_with_oss( + component: str, + latency: float, + error_rate: float, + scenario_name: str = "OSS Demo" +) -> Dict[str, Any]: """ - Comprehensive input validation with type checking + OSS-only analysis: Creates HealingIntent, never executes + + This is the core OSS capability: analyze and recommend. """ try: - # Type conversion with error handling - try: - latency_f = float(latency) - except (ValueError, TypeError): - return False, "โŒ Invalid latency: must be a number" - - try: - error_rate_f = float(error_rate) - except (ValueError, TypeError): - return False, "โŒ Invalid error rate: must be a number" - - try: - throughput_f = float(throughput) if throughput else 1000.0 - except (ValueError, TypeError): - return False, "โŒ Invalid throughput: must be a number" - - # CPU and memory are optional - cpu_util_f = None - if cpu_util: - try: - cpu_util_f = float(cpu_util) - except (ValueError, TypeError): - return False, "โŒ Invalid CPU utilization: must be a number" + # Create OSS MCP client (advisory only) + client = OSSMCPClient() - memory_util_f = None - if memory_util: - try: - memory_util_f = float(memory_util) - except (ValueError, TypeError): - return False, "โŒ Invalid memory utilization: must be a number" + # Based on metrics, decide which HealingIntent to create + healing_intent = None - # Range validation - if not (0 <= latency_f <= 10000): - return False, "โŒ Invalid latency: must be between 0-10000ms" - - if not (0 <= error_rate_f <= 1): - return False, "โŒ Invalid error rate: must be between 0-1" - - if throughput_f < 0: - return False, "โŒ Invalid throughput: must be positive" - - if cpu_util_f is not None and not (0 <= cpu_util_f <= 1): - return False, "โŒ Invalid CPU utilization: must be between 0-1" - - if memory_util_f is not None and not (0 <= memory_util_f <= 1): - return False, "โŒ Invalid memory utilization: must be between 0-1" - - return True, "" - - except Exception as e: - logger.error(f"Validation error: {e}", exc_info=True) - return False, f"โŒ Validation error: {str(e)}" - - -# === Thread-Safe Data Structures === -class ThreadSafeEventStore: - """Thread-safe storage for reliability events""" - - def __init__(self, max_size: int = Constants.MAX_EVENTS_STORED): - self._events = deque(maxlen=max_size) - self._lock = threading.RLock() - logger.info(f"Initialized ThreadSafeEventStore with max_size={max_size}") - - def add(self, event: ReliabilityEvent) -> None: - """Add event to store""" - with self._lock: - self._events.append(event) - logger.debug(f"Added event for {event.component}: {event.severity.value}") - - def get_recent(self, n: int = 15) -> List[ReliabilityEvent]: - """Get n most recent events""" - with self._lock: - return list(self._events)[-n:] if self._events else [] - - def get_all(self) -> List[ReliabilityEvent]: - """Get all events""" - with self._lock: - return list(self._events) - - def count(self) -> int: - """Get total event count""" - with self._lock: - return len(self._events) - - -# === FAISS Integration === -class ProductionFAISSIndex: - """Production-safe FAISS index with single-writer pattern""" - - def __init__(self, index, texts: List[str]): - self.index = index - self.texts = texts - self._lock = threading.RLock() - - self._shutdown = threading.Event() - - # Single writer thread - self._write_queue: Queue = Queue() - self._writer_thread = threading.Thread( - target=self._writer_loop, - daemon=True, - name="FAISSWriter" - ) - self._writer_thread.start() - - self._encoder_pool = ProcessPoolExecutor(max_workers=2) - - logger.info( - f"Initialized ProductionFAISSIndex with {len(texts)} vectors" - ) - - def add_async(self, vector: np.ndarray, text: str) -> None: - """Add vector and text asynchronously""" - self._write_queue.put((vector, text)) - logger.debug(f"Queued vector for indexing: {text[:50]}...") - - def _writer_loop(self) -> None: - """Single writer thread - processes queue in batches""" - batch = [] - last_save = datetime.datetime.now() - save_interval = datetime.timedelta( - seconds=Constants.FAISS_SAVE_INTERVAL_SECONDS - ) - - while not self._shutdown.is_set(): - try: - import queue - try: - item = self._write_queue.get(timeout=1.0) - batch.append(item) - except queue.Empty: - pass - - if len(batch) >= Constants.FAISS_BATCH_SIZE or \ - (batch and datetime.datetime.now() - last_save > save_interval): - self._flush_batch(batch) - batch = [] - - if datetime.datetime.now() - last_save > save_interval: - self._save_atomic() - last_save = datetime.datetime.now() - - except Exception as e: - logger.error(f"Writer loop error: {e}", exc_info=True) - - def _flush_batch(self, batch: List[Tuple[np.ndarray, str]]) -> None: - """Flush batch to FAISS index""" - if not batch: - return - - try: - vectors = np.vstack([v for v, _ in batch]) - texts = [t for _, t in batch] - - self.index.add(vectors) - - with self._lock: - self.texts.extend(texts) - - logger.info(f"Flushed batch of {len(batch)} vectors to FAISS index") - - except Exception as e: - logger.error(f"Error flushing batch: {e}", exc_info=True) - - def _save_atomic(self) -> None: - """Atomic save with fsync for durability""" - try: - import faiss - - with tempfile.NamedTemporaryFile( - mode='wb', - delete=False, - dir=os.path.dirname(config.INDEX_FILE), - prefix='index_', - suffix='.tmp' - ) as tmp: - temp_path = tmp.name - - faiss.write_index(self.index, temp_path) - - with open(temp_path, 'r+b') as f: - f.flush() - os.fsync(f.fileno()) - - os.replace(temp_path, config.INDEX_FILE) - - with self._lock: - texts_copy = self.texts.copy() - - with atomicwrites.atomic_write( - config.TEXTS_FILE, - mode='w', - overwrite=True - ) as f: - json.dump(texts_copy, f) - - logger.info( - f"Atomically saved FAISS index with {len(texts_copy)} vectors" + if error_rate > 0.2: + # High error rate โ†’ recommend rollback + healing_intent = create_rollback_intent( + component=component, + revision="previous", + justification=f"High error rate ({error_rate*100:.1f}%) detected in {component}", + incident_id=f"oss_demo_{datetime.datetime.now().timestamp()}" ) - - except Exception as e: - logger.error(f"Error saving index: {e}", exc_info=True) - - def get_count(self) -> int: - """Get total count of vectors""" - with self._lock: - return len(self.texts) + self._write_queue.qsize() - - def force_save(self) -> None: - """Force immediate save of pending vectors""" - logger.info("Forcing FAISS index save...") - - timeout = 10.0 - start = datetime.datetime.now() - - while not self._write_queue.empty(): - if (datetime.datetime.now() - start).total_seconds() > timeout: - logger.warning("Force save timeout - queue not empty") - break - import time - time.sleep(0.1) - - self._save_atomic() - - def shutdown(self) -> None: - """Graceful shutdown""" - logger.info("Shutting down FAISS index...") - self._shutdown.set() - self.force_save() - self._writer_thread.join(timeout=5.0) - self._encoder_pool.shutdown(wait=True) - - -# === FAISS & Embeddings Setup === -model = None - -def get_model(): - """Lazy-load SentenceTransformer model on first use""" - global model - if model is None: - from sentence_transformers import SentenceTransformer - logger.info("Loading SentenceTransformer model...") - model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") - logger.info("Model loaded on demand") - return model - -try: - from sentence_transformers import SentenceTransformer - import faiss - - if os.path.exists(config.INDEX_FILE): - logger.info(f"Loading existing FAISS index from {config.INDEX_FILE}") - index = faiss.read_index(config.INDEX_FILE) - - if index.d != Constants.VECTOR_DIM: - logger.warning( - f"Index dimension mismatch: {index.d} != {Constants.VECTOR_DIM}. " - f"Creating new index." + action = "rollback" + elif latency > 200: + # High latency โ†’ recommend restart + healing_intent = create_restart_intent( + component=component, + justification=f"High latency ({latency:.0f}ms) detected in {component}", + incident_id=f"oss_demo_{datetime.datetime.now().timestamp()}" ) - index = faiss.IndexFlatL2(Constants.VECTOR_DIM) - incident_texts = [] + action = "restart_container" else: - with open(config.TEXTS_FILE, "r") as f: - incident_texts = json.load(f) - logger.info(f"Loaded {len(incident_texts)} incident texts") - else: - logger.info("Creating new FAISS index") - index = faiss.IndexFlatL2(Constants.VECTOR_DIM) - incident_texts = [] - - thread_safe_index = ProductionFAISSIndex(index, incident_texts) - -except ImportError as e: - logger.warning(f"FAISS or SentenceTransformers not available: {e}") - index = None - incident_texts = [] - model = None - thread_safe_index = None -except Exception as e: - logger.error(f"Error initializing FAISS: {e}", exc_info=True) - index = None - incident_texts = [] - model = None - thread_safe_index = None - -# === Predictive Models === -class SimplePredictiveEngine: - """Lightweight forecasting engine""" - - def __init__(self, history_window: int = Constants.HISTORY_WINDOW): - self.history_window = history_window - self.service_history: Dict[str, deque] = {} - self.prediction_cache: Dict[str, Tuple[ForecastResult, datetime.datetime]] = {} - self.max_cache_age = datetime.timedelta(minutes=Constants.CACHE_EXPIRY_MINUTES) - self._lock = threading.RLock() - logger.info(f"Initialized SimplePredictiveEngine with history_window={history_window}") - - def add_telemetry(self, service: str, event_data: Dict) -> None: - """Add telemetry data to service history""" - with self._lock: - if service not in self.service_history: - self.service_history[service] = deque(maxlen=self.history_window) - - telemetry_point = { - 'timestamp': datetime.datetime.now(datetime.timezone.utc), - 'latency': event_data.get('latency_p99', 0), - 'error_rate': event_data.get('error_rate', 0), - 'throughput': event_data.get('throughput', 0), - 'cpu_util': event_data.get('cpu_util'), - 'memory_util': event_data.get('memory_util') - } - - self.service_history[service].append(telemetry_point) - self._clean_cache() - - def _clean_cache(self) -> None: - """Remove expired entries from prediction cache""" - now = datetime.datetime.now(datetime.timezone.utc) - expired = [k for k, (_, ts) in self.prediction_cache.items() - if now - ts > self.max_cache_age] - for k in expired: - del self.prediction_cache[k] - - if expired: - logger.debug(f"Cleaned {len(expired)} expired cache entries") - - def forecast_service_health( - self, - service: str, - lookahead_minutes: int = Constants.FORECAST_LOOKAHEAD_MINUTES - ) -> List[ForecastResult]: - """Forecast service health metrics""" - with self._lock: - if service not in self.service_history or \ - len(self.service_history[service]) < Constants.FORECAST_MIN_DATA_POINTS: - return [] - - history = list(self.service_history[service]) - - forecasts = [] - - # Forecast latency - latency_forecast = self._forecast_latency(history, lookahead_minutes) - if latency_forecast: - forecasts.append(latency_forecast) - - # Forecast error rate - error_forecast = self._forecast_error_rate(history, lookahead_minutes) - if error_forecast: - forecasts.append(error_forecast) - - # Forecast resource utilization - resource_forecasts = self._forecast_resources(history, lookahead_minutes) - forecasts.extend(resource_forecasts) - - # Cache results - with self._lock: - for forecast in forecasts: - cache_key = f"{service}_{forecast.metric}" - self.prediction_cache[cache_key] = (forecast, datetime.datetime.now(datetime.timezone.utc)) - - return forecasts - - def _forecast_latency( - self, - history: List, - lookahead_minutes: int - ) -> Optional[ForecastResult]: - """Forecast latency using linear regression""" - try: - latencies = [point['latency'] for point in history[-20:]] - - if len(latencies) < Constants.FORECAST_MIN_DATA_POINTS: - return None - - x = np.arange(len(latencies)) - slope, intercept = np.polyfit(x, latencies, 1) - - next_x = len(latencies) - predicted_latency = slope * next_x + intercept - - residuals = latencies - (slope * x + intercept) - confidence = max(0, 1 - (np.std(residuals) / max(1, np.mean(latencies)))) - - if slope > Constants.SLOPE_THRESHOLD_INCREASING: - trend = "increasing" - risk = "critical" if predicted_latency > Constants.LATENCY_EXTREME else "high" - elif slope < Constants.SLOPE_THRESHOLD_DECREASING: - trend = "decreasing" - risk = "low" - else: - trend = "stable" - risk = "low" if predicted_latency < Constants.LATENCY_WARNING else "medium" - - time_to_critical = None - if slope > 0 and predicted_latency < Constants.LATENCY_EXTREME: - denominator = predicted_latency - latencies[-1] - if abs(denominator) > 0.1: - minutes_to_critical = lookahead_minutes * \ - (Constants.LATENCY_EXTREME - predicted_latency) / denominator - if minutes_to_critical > 0: - time_to_critical = minutes_to_critical - - return ForecastResult( - metric="latency", - predicted_value=predicted_latency, - confidence=confidence, - trend=trend, - time_to_threshold=time_to_critical, - risk_level=risk - ) - - except Exception as e: - logger.error(f"Latency forecast error: {e}", exc_info=True) - return None - - def _forecast_error_rate( - self, - history: List, - lookahead_minutes: int - ) -> Optional[ForecastResult]: - """Forecast error rate using exponential smoothing""" - try: - error_rates = [point['error_rate'] for point in history[-15:]] - - if len(error_rates) < Constants.FORECAST_MIN_DATA_POINTS: - return None - - alpha = 0.3 - forecast = error_rates[0] - for rate in error_rates[1:]: - forecast = alpha * rate + (1 - alpha) * forecast - - predicted_rate = forecast - - recent_trend = np.mean(error_rates[-3:]) - np.mean(error_rates[-6:-3]) - - if recent_trend > 0.02: - trend = "increasing" - risk = "critical" if predicted_rate > Constants.ERROR_RATE_CRITICAL else "high" - elif recent_trend < -0.01: - trend = "decreasing" - risk = "low" - else: - trend = "stable" - risk = "low" if predicted_rate < Constants.ERROR_RATE_WARNING else "medium" - - confidence = max(0, 1 - (np.std(error_rates) / max(0.01, np.mean(error_rates)))) - - return ForecastResult( - metric="error_rate", - predicted_value=predicted_rate, - confidence=confidence, - trend=trend, - risk_level=risk + # Scale out recommendation + healing_intent = create_scale_out_intent( + component=component, + scale_factor=2, + justification=f"Performance degradation in {component}", + incident_id=f"oss_demo_{datetime.datetime.now().timestamp()}" ) - - except Exception as e: - logger.error(f"Error rate forecast error: {e}", exc_info=True) - return None - - def _forecast_resources( - self, - history: List, - lookahead_minutes: int - ) -> List[ForecastResult]: - """Forecast CPU and memory utilization""" - forecasts = [] - - # CPU forecast - cpu_values = [point['cpu_util'] for point in history if point.get('cpu_util') is not None] - if len(cpu_values) >= Constants.FORECAST_MIN_DATA_POINTS: - try: - predicted_cpu = np.mean(cpu_values[-5:]) - trend = "increasing" if cpu_values[-1] > np.mean(cpu_values[-10:-5]) else "stable" - - risk = "low" - if predicted_cpu > Constants.CPU_CRITICAL: - risk = "critical" - elif predicted_cpu > Constants.CPU_WARNING: - risk = "high" - elif predicted_cpu > 0.7: - risk = "medium" - - forecasts.append(ForecastResult( - metric="cpu_util", - predicted_value=predicted_cpu, - confidence=0.7, - trend=trend, - risk_level=risk - )) - except Exception as e: - logger.error(f"CPU forecast error: {e}", exc_info=True) + action = "scale_out" - # Memory forecast - memory_values = [point['memory_util'] for point in history if point.get('memory_util') is not None] - if len(memory_values) >= Constants.FORECAST_MIN_DATA_POINTS: - try: - predicted_memory = np.mean(memory_values[-5:]) - trend = "increasing" if memory_values[-1] > np.mean(memory_values[-10:-5]) else "stable" - - risk = "low" - if predicted_memory > Constants.MEMORY_CRITICAL: - risk = "critical" - elif predicted_memory > Constants.MEMORY_WARNING: - risk = "high" - elif predicted_memory > 0.7: - risk = "medium" - - forecasts.append(ForecastResult( - metric="memory_util", - predicted_value=predicted_memory, - confidence=0.7, - trend=trend, - risk_level=risk - )) - except Exception as e: - logger.error(f"Memory forecast error: {e}", exc_info=True) - - return forecasts - - def get_predictive_insights(self, service: str) -> Dict[str, Any]: - """Generate actionable insights from forecasts""" - forecasts = self.forecast_service_health(service) - - critical_risks = [f for f in forecasts if f.risk_level in ["high", "critical"]] - warnings = [] - recommendations = [] - - for forecast in critical_risks: - if forecast.metric == "latency" and forecast.risk_level in ["high", "critical"]: - warnings.append(f"๐Ÿ“ˆ Latency expected to reach {forecast.predicted_value:.0f}ms") - if forecast.time_to_threshold: - minutes = int(forecast.time_to_threshold) - recommendations.append(f"โฐ Critical latency (~{Constants.LATENCY_EXTREME}ms) in ~{minutes} minutes") - recommendations.append("๐Ÿ”ง Consider scaling or optimizing dependencies") - - elif forecast.metric == "error_rate" and forecast.risk_level in ["high", "critical"]: - warnings.append(f"๐Ÿšจ Errors expected to reach {forecast.predicted_value*100:.1f}%") - recommendations.append("๐Ÿ› Investigate recent deployments or dependency issues") - - elif forecast.metric == "cpu_util" and forecast.risk_level in ["high", "critical"]: - warnings.append(f"๐Ÿ”ฅ CPU expected at {forecast.predicted_value*100:.1f}%") - recommendations.append("โšก Consider scaling compute resources") - - elif forecast.metric == "memory_util" and forecast.risk_level in ["high", "critical"]: - warnings.append(f"๐Ÿ’พ Memory expected at {forecast.predicted_value*100:.1f}%") - recommendations.append("๐Ÿงน Check for memory leaks or optimize usage") + # Get OSS MCP analysis (advisory only) + mcp_result = await client.execute_tool({ + "tool": action, + "component": component, + "parameters": {}, + "justification": healing_intent.justification, + "metadata": { + "scenario": scenario_name, + "latency": latency, + "error_rate": error_rate, + "oss_edition": True + } + }) return { - 'service': service, - 'forecasts': [ - { - 'metric': f.metric, - 'predicted_value': f.predicted_value, - 'confidence': f.confidence, - 'trend': f.trend, - 'risk_level': f.risk_level, - 'time_to_threshold': f.time_to_threshold - } - for f in forecasts - ], - 'warnings': warnings[:3], - 'recommendations': list(dict.fromkeys(recommendations))[:3], - 'critical_risk_count': len(critical_risks), - 'forecast_timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat() + "status": "OSS_ADVISORY_COMPLETE", + "healing_intent": healing_intent.to_enterprise_request(), + "oss_analysis": mcp_result, + "confidence": healing_intent.confidence, + "requires_enterprise": True, + "message": f"โœ… OSS analysis complete. Created HealingIntent for {action} on {component}.", + "enterprise_upgrade_url": "https://arf.dev/enterprise", + "enterprise_features": [ + "Autonomous execution", + "Approval workflows", + "Learning engine", + "Persistent storage", + "Audit trails", + "Compliance reporting" + ] } - - -# === ENTERPRISE BUSINESS IMPACT CALCULATOR === -class BusinessImpactCalculator: - """Enterprise-scale business impact calculation for $47K+ deals""" - - def __init__(self): - logger.info("Initialized Enterprise BusinessImpactCalculator") - - def calculate_impact( - self, - event: ReliabilityEvent, - duration_minutes: int = 5 - ) -> Dict[str, Any]: - """ - Calculate ENTERPRISE business impact for reliability events - - Based on real enterprise data for $1M+/month businesses - """ - # ENTERPRISE: $5K/min baseline for $7.2M/month business - base_revenue_per_minute = Constants.BASE_REVENUE_PER_MINUTE - - impact_multiplier = 1.0 - - # ENTERPRISE impact factors - if event.latency_p99 > Constants.LATENCY_CRITICAL: - latency_impact = (event.latency_p99 - Constants.LATENCY_WARNING) / 100 - impact_multiplier += latency_impact * Constants.LATENCY_IMPACT_MULTIPLIER - - if event.error_rate > Constants.ERROR_RATE_WARNING: - error_impact = (event.error_rate - Constants.ERROR_RATE_WARNING) * 100 - impact_multiplier += error_impact * Constants.ERROR_IMPACT_MULTIPLIER - - if event.cpu_util and event.cpu_util > Constants.CPU_WARNING: - cpu_impact = (event.cpu_util - Constants.CPU_WARNING) * 10 - impact_multiplier += cpu_impact * Constants.RESOURCE_IMPACT_MULTIPLIER - - if event.memory_util and event.memory_util > Constants.MEMORY_WARNING: - memory_impact = (event.memory_util - Constants.MEMORY_WARNING) * 10 - impact_multiplier += memory_impact * Constants.RESOURCE_IMPACT_MULTIPLIER - - # ENTERPRISE revenue impact (thousands, not hundreds) - revenue_loss = base_revenue_per_minute * impact_multiplier * (duration_minutes / 60) - - # ENTERPRISE user impact (thousands, not hundreds) - base_users_affected = Constants.BASE_USERS - user_impact_multiplier = (event.error_rate * 15) + \ - (max(0, event.latency_p99 - 100) / 400) - affected_users = int(base_users_affected * user_impact_multiplier) - - # ENTERPRISE severity classification - if revenue_loss > 50000 or affected_users > 20000: - severity = "CRITICAL" - elif revenue_loss > 10000 or affected_users > 5000: - severity = "HIGH" - elif revenue_loss > 5000 or affected_users > 1000: - severity = "MEDIUM" - else: - severity = "LOW" - - logger.info( - f"Enterprise impact: \${revenue_loss:,.0f} revenue loss, " - f"{affected_users:,} users, {severity} severity" - ) + except Exception as e: + logger.error(f"OSS analysis error: {e}", exc_info=True) return { - 'revenue_loss_estimate': round(revenue_loss, 2), - 'affected_users_estimate': affected_users, - 'severity_level': severity, - 'throughput_reduction_pct': round(min(100, user_impact_multiplier * 100), 1), - 'impact_multiplier': round(impact_multiplier, 2) - } - - -class AdvancedAnomalyDetector: - """Enhanced anomaly detection with adaptive thresholds""" - - def __init__(self): - self.historical_data = deque(maxlen=100) - self.adaptive_thresholds = { - 'latency_p99': Constants.LATENCY_WARNING, - 'error_rate': Constants.ERROR_RATE_WARNING - } - self._lock = threading.RLock() - logger.info("Initialized AdvancedAnomalyDetector") - - def detect_anomaly(self, event: ReliabilityEvent) -> bool: - """Detect if event is anomalous using adaptive thresholds""" - with self._lock: - latency_anomaly = event.latency_p99 > self.adaptive_thresholds['latency_p99'] - error_anomaly = event.error_rate > self.adaptive_thresholds['error_rate'] - - resource_anomaly = False - if event.cpu_util and event.cpu_util > Constants.CPU_CRITICAL: - resource_anomaly = True - if event.memory_util and event.memory_util > Constants.MEMORY_CRITICAL: - resource_anomaly = True - - self._update_thresholds(event) - - is_anomaly = latency_anomaly or error_anomaly or resource_anomaly - - if is_anomaly: - logger.info( - f"Anomaly detected for {event.component}: " - f"latency={latency_anomaly}, error={error_anomaly}, " - f"resource={resource_anomaly}" - ) - - return is_anomaly - - def _update_thresholds(self, event: ReliabilityEvent) -> None: - """Update adaptive thresholds based on historical data""" - self.historical_data.append(event) - - if len(self.historical_data) > 10: - recent_latencies = [e.latency_p99 for e in list(self.historical_data)[-20:]] - new_threshold = np.percentile(recent_latencies, 90) - self.adaptive_thresholds['latency_p99'] = new_threshold - logger.debug(f"Updated adaptive latency threshold to {new_threshold:.2f}ms") - -# === Multi-Agent System === -class AgentSpecialization(Enum): - """Agent specialization types""" - DETECTIVE = "anomaly_detection" - DIAGNOSTICIAN = "root_cause_analysis" - PREDICTIVE = "predictive_analytics" - - -class BaseAgent: - """Base class for all specialized agents""" - - def __init__(self, specialization: AgentSpecialization): - self.specialization = specialization - self.performance_metrics = { - 'processed_events': 0, - 'successful_analyses': 0, - 'average_confidence': 0.0 + "status": "OSS_ERROR", + "message": f"โŒ OSS analysis failed: {str(e)}", + "requires_enterprise": False } - - async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: - """Base analysis method to be implemented by specialized agents""" - raise NotImplementedError - - -class AnomalyDetectionAgent(BaseAgent): - """Specialized agent for anomaly detection and pattern recognition""" - - def __init__(self): - super().__init__(AgentSpecialization.DETECTIVE) - logger.info("Initialized AnomalyDetectionAgent") - - async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: - """Perform comprehensive anomaly analysis""" - try: - anomaly_score = self._calculate_anomaly_score(event) - - return { - 'specialization': self.specialization.value, - 'confidence': anomaly_score, - 'findings': { - 'anomaly_score': anomaly_score, - 'severity_tier': self._classify_severity(anomaly_score), - 'primary_metrics_affected': self._identify_affected_metrics(event) - }, - 'recommendations': self._generate_detection_recommendations(event, anomaly_score) - } - except Exception as e: - logger.error(f"AnomalyDetectionAgent error: {e}", exc_info=True) - return { - 'specialization': self.specialization.value, - 'confidence': 0.0, - 'findings': {}, - 'recommendations': [f"Analysis error: {str(e)}"] - } - - def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float: - """Calculate comprehensive anomaly score (0-1)""" - scores = [] - - # Latency anomaly (weighted 40%) - if event.latency_p99 > Constants.LATENCY_WARNING: - latency_score = min(1.0, (event.latency_p99 - Constants.LATENCY_WARNING) / 500) - scores.append(0.4 * latency_score) - - # Error rate anomaly (weighted 30%) - if event.error_rate > Constants.ERROR_RATE_WARNING: - error_score = min(1.0, event.error_rate / 0.3) - scores.append(0.3 * error_score) - - # Resource anomaly (weighted 30%) - resource_score = 0 - if event.cpu_util and event.cpu_util > Constants.CPU_WARNING: - resource_score += 0.15 * min(1.0, (event.cpu_util - Constants.CPU_WARNING) / 0.2) - if event.memory_util and event.memory_util > Constants.MEMORY_WARNING: - resource_score += 0.15 * min(1.0, (event.memory_util - Constants.MEMORY_WARNING) / 0.2) - scores.append(resource_score) - - return min(1.0, sum(scores)) - - def _classify_severity(self, anomaly_score: float) -> str: - """Classify severity tier based on anomaly score""" - if anomaly_score > 0.8: - return "CRITICAL" - elif anomaly_score > 0.6: - return "HIGH" - elif anomaly_score > 0.4: - return "MEDIUM" - else: - return "LOW" - - def _identify_affected_metrics(self, event: ReliabilityEvent) -> List[Dict[str, Any]]: - """Identify which metrics are outside normal ranges""" - affected = [] - - # Latency checks - if event.latency_p99 > Constants.LATENCY_EXTREME: - affected.append({ - "metric": "latency", - "value": event.latency_p99, - "severity": "CRITICAL", - "threshold": Constants.LATENCY_WARNING - }) - elif event.latency_p99 > Constants.LATENCY_CRITICAL: - affected.append({ - "metric": "latency", - "value": event.latency_p99, - "severity": "HIGH", - "threshold": Constants.LATENCY_WARNING - }) - elif event.latency_p99 > Constants.LATENCY_WARNING: - affected.append({ - "metric": "latency", - "value": event.latency_p99, - "severity": "MEDIUM", - "threshold": Constants.LATENCY_WARNING - }) - - # Error rate checks - if event.error_rate > Constants.ERROR_RATE_CRITICAL: - affected.append({ - "metric": "error_rate", - "value": event.error_rate, - "severity": "CRITICAL", - "threshold": Constants.ERROR_RATE_WARNING - }) - elif event.error_rate > Constants.ERROR_RATE_HIGH: - affected.append({ - "metric": "error_rate", - "value": event.error_rate, - "severity": "HIGH", - "threshold": Constants.ERROR_RATE_WARNING - }) - elif event.error_rate > Constants.ERROR_RATE_WARNING: - affected.append({ - "metric": "error_rate", - "value": event.error_rate, - "severity": "MEDIUM", - "threshold": Constants.ERROR_RATE_WARNING - }) - - # CPU checks - if event.cpu_util and event.cpu_util > Constants.CPU_CRITICAL: - affected.append({ - "metric": "cpu", - "value": event.cpu_util, - "severity": "CRITICAL", - "threshold": Constants.CPU_WARNING - }) - elif event.cpu_util and event.cpu_util > Constants.CPU_WARNING: - affected.append({ - "metric": "cpu", - "value": event.cpu_util, - "severity": "HIGH", - "threshold": Constants.CPU_WARNING - }) - - # Memory checks - if event.memory_util and event.memory_util > Constants.MEMORY_CRITICAL: - affected.append({ - "metric": "memory", - "value": event.memory_util, - "severity": "CRITICAL", - "threshold": Constants.MEMORY_WARNING - }) - elif event.memory_util and event.memory_util > Constants.MEMORY_WARNING: - affected.append({ - "metric": "memory", - "value": event.memory_util, - "severity": "HIGH", - "threshold": Constants.MEMORY_WARNING - }) - - return affected - - def _generate_detection_recommendations( - self, - event: ReliabilityEvent, - anomaly_score: float - ) -> List[str]: - """Generate actionable recommendations""" - recommendations = [] - affected_metrics = self._identify_affected_metrics(event) - - for metric in affected_metrics: - metric_name = metric["metric"] - severity = metric["severity"] - value = metric["value"] - threshold = metric["threshold"] - - if metric_name == "latency": - if severity == "CRITICAL": - recommendations.append( - f"๐Ÿšจ CRITICAL: Latency {value:.0f}ms (>{threshold}ms) - " - f"Check database & external dependencies" - ) - elif severity == "HIGH": - recommendations.append( - f"โš ๏ธ HIGH: Latency {value:.0f}ms (>{threshold}ms) - " - f"Investigate service performance" - ) - else: - recommendations.append( - f"๐Ÿ“ˆ Latency elevated: {value:.0f}ms (>{threshold}ms) - Monitor trend" - ) - - elif metric_name == "error_rate": - if severity == "CRITICAL": - recommendations.append( - f"๐Ÿšจ CRITICAL: Error rate {value*100:.1f}% (>{threshold*100:.1f}%) - " - f"Check recent deployments" - ) - elif severity == "HIGH": - recommendations.append( - f"โš ๏ธ HIGH: Error rate {value*100:.1f}% (>{threshold*100:.1f}%) - " - f"Review application logs" - ) - else: - recommendations.append( - f"๐Ÿ“ˆ Errors increasing: {value*100:.1f}% (>{threshold*100:.1f}%)" - ) - - elif metric_name == "cpu": - recommendations.append( - f"๐Ÿ”ฅ CPU {severity}: {value*100:.1f}% utilization - Consider scaling" - ) - - elif metric_name == "memory": - recommendations.append( - f"๐Ÿ’พ Memory {severity}: {value*100:.1f}% utilization - Check for memory leaks" - ) - - # Overall severity recommendations - if anomaly_score > 0.8: - recommendations.append("๐ŸŽฏ IMMEDIATE ACTION REQUIRED: Multiple critical metrics affected") - elif anomaly_score > 0.6: - recommendations.append("๐ŸŽฏ INVESTIGATE: Significant performance degradation detected") - elif anomaly_score > 0.4: - recommendations.append("๐Ÿ“Š MONITOR: Early warning signs detected") - - return recommendations[:4] - - -class RootCauseAgent(BaseAgent): - """Specialized agent for root cause analysis""" - - def __init__(self): - super().__init__(AgentSpecialization.DIAGNOSTICIAN) - logger.info("Initialized RootCauseAgent") - - async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: - """Perform root cause analysis""" - try: - causes = self._analyze_potential_causes(event) - - return { - 'specialization': self.specialization.value, - 'confidence': 0.7, - 'findings': { - 'likely_root_causes': causes, - 'evidence_patterns': self._identify_evidence(event), - 'investigation_priority': self._prioritize_investigation(causes) - }, - 'recommendations': [ - f"Check {cause['cause']} for issues" for cause in causes[:2] - ] - } - except Exception as e: - logger.error(f"RootCauseAgent error: {e}", exc_info=True) - return { - 'specialization': self.specialization.value, - 'confidence': 0.0, - 'findings': {}, - 'recommendations': [f"Analysis error: {str(e)}"] - } - - def _analyze_potential_causes(self, event: ReliabilityEvent) -> List[Dict[str, Any]]: - """Analyze potential root causes based on event patterns""" - causes = [] - - # Pattern 1: Database/External Dependency Failure - if event.latency_p99 > Constants.LATENCY_EXTREME and event.error_rate > 0.2: - causes.append({ - "cause": "Database/External Dependency Failure", - "confidence": 0.85, - "evidence": f"Extreme latency ({event.latency_p99:.0f}ms) with high errors ({event.error_rate*100:.1f}%)", - "investigation": "Check database connection pool, external API health" - }) - - # Pattern 2: Resource Exhaustion - if (event.cpu_util and event.cpu_util > Constants.CPU_CRITICAL and - event.memory_util and event.memory_util > Constants.MEMORY_CRITICAL): - causes.append({ - "cause": "Resource Exhaustion", - "confidence": 0.90, - "evidence": f"CPU ({event.cpu_util*100:.1f}%) and Memory ({event.memory_util*100:.1f}%) critically high", - "investigation": "Check for memory leaks, infinite loops, insufficient resources" - }) - - # Pattern 3: Application Bug / Configuration Issue - if event.error_rate > Constants.ERROR_RATE_CRITICAL and event.latency_p99 < 200: - causes.append({ - "cause": "Application Bug / Configuration Issue", - "confidence": 0.75, - "evidence": f"High error rate ({event.error_rate*100:.1f}%) without latency impact", - "investigation": "Review recent deployments, configuration changes, application logs" - }) - - # Pattern 4: Gradual Performance Degradation - if (200 <= event.latency_p99 <= 400 and - Constants.ERROR_RATE_WARNING <= event.error_rate <= Constants.ERROR_RATE_HIGH): - causes.append({ - "cause": "Gradual Performance Degradation", - "confidence": 0.65, - "evidence": f"Moderate latency ({event.latency_p99:.0f}ms) and errors ({event.error_rate*100:.1f}%)", - "investigation": "Check resource trends, dependency performance, capacity planning" - }) - - # Default: Unknown pattern - if not causes: - causes.append({ - "cause": "Unknown - Requires Investigation", - "confidence": 0.3, - "evidence": "Pattern does not match known failure modes", - "investigation": "Complete system review needed" - }) - - return causes - - def _identify_evidence(self, event: ReliabilityEvent) -> List[str]: - """Identify evidence patterns in the event data""" - evidence = [] - - if event.latency_p99 > event.error_rate * 1000: - evidence.append("latency_disproportionate_to_errors") - - if (event.cpu_util and event.cpu_util > Constants.CPU_WARNING and - event.memory_util and event.memory_util > Constants.MEMORY_WARNING): - evidence.append("correlated_resource_exhaustion") - - if event.error_rate > Constants.ERROR_RATE_HIGH and event.latency_p99 < Constants.LATENCY_CRITICAL: - evidence.append("errors_without_latency_impact") - - return evidence - - def _prioritize_investigation(self, causes: List[Dict[str, Any]]) -> str: - """Determine investigation priority""" - for cause in causes: - if "Database" in cause["cause"] or "Resource Exhaustion" in cause["cause"]: - return "HIGH" - return "MEDIUM" - - -class PredictiveAgent(BaseAgent): - """Specialized agent for predictive analytics""" - - def __init__(self, engine: SimplePredictiveEngine): - super().__init__(AgentSpecialization.PREDICTIVE) - self.engine = engine - logger.info("Initialized PredictiveAgent") - - async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: - """Perform predictive analysis for future risks""" - try: - event_data = { - 'latency_p99': event.latency_p99, - 'error_rate': event.error_rate, - 'throughput': event.throughput, - 'cpu_util': event.cpu_util, - 'memory_util': event.memory_util - } - self.engine.add_telemetry(event.component, event_data) - - insights = self.engine.get_predictive_insights(event.component) - - return { - 'specialization': self.specialization.value, - 'confidence': 0.8 if insights['critical_risk_count'] > 0 else 0.5, - 'findings': insights, - 'recommendations': insights['recommendations'] - } - except Exception as e: - logger.error(f"PredictiveAgent error: {e}", exc_info=True) - return { - 'specialization': self.specialization.value, - 'confidence': 0.0, - 'findings': {}, - 'recommendations': [f"Analysis error: {str(e)}"] - } +# ============================================================================ +# SIMPLE OSS DEMO UI +# ============================================================================ -# Circuit breaker for agent resilience -@circuit(failure_threshold=3, recovery_timeout=30, name="agent_circuit_breaker") -async def call_agent_with_protection(agent: BaseAgent, event: ReliabilityEvent) -> Dict[str, Any]: - """ - Call agent with circuit breaker protection - """ - try: - result = await asyncio.wait_for( - agent.analyze(event), - timeout=Constants.AGENT_TIMEOUT_SECONDS - ) - return result - except asyncio.TimeoutError: - logger.warning(f"Agent {agent.specialization.value} timed out") - raise - except Exception as e: - logger.error(f"Agent {agent.specialization.value} error: {e}", exc_info=True) - raise - - -class OrchestrationManager: - """Orchestrates multiple specialized agents for comprehensive analysis""" - - def __init__( - self, - detective: Optional[AnomalyDetectionAgent] = None, - diagnostician: Optional[RootCauseAgent] = None, - predictive: Optional[PredictiveAgent] = None - ): - """ - Initialize orchestration manager - """ - self.agents = { - AgentSpecialization.DETECTIVE: detective or AnomalyDetectionAgent(), - AgentSpecialization.DIAGNOSTICIAN: diagnostician or RootCauseAgent(), - AgentSpecialization.PREDICTIVE: predictive or PredictiveAgent(SimplePredictiveEngine()), - } - logger.info(f"Initialized OrchestrationManager with {len(self.agents)} agents") - - async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]: - """ - Coordinate multiple agents for comprehensive analysis - """ - agent_tasks = [] - agent_specs = [] - - for spec, agent in self.agents.items(): - agent_tasks.append(call_agent_with_protection(agent, event)) - agent_specs.append(spec) - - agent_results = {} - - try: - results = await asyncio.wait_for( - asyncio.gather(*agent_tasks, return_exceptions=True), - timeout=Constants.AGENT_TIMEOUT_SECONDS + 1 - ) - - for spec, result in zip(agent_specs, results): - if isinstance(result, Exception): - logger.error(f"Agent {spec.value} failed: {result}") - continue - - agent_results[spec.value] = result - logger.debug(f"Agent {spec.value} completed successfully") - - except asyncio.TimeoutError: - logger.warning("Agent orchestration timed out") - except Exception as e: - logger.error(f"Agent orchestration error: {e}", exc_info=True) - - return self._synthesize_agent_findings(event, agent_results) - - def _synthesize_agent_findings( - self, - event: ReliabilityEvent, - agent_results: Dict - ) -> Dict[str, Any]: - """Combine insights from all specialized agents""" - detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value) - diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value) - predictive_result = agent_results.get(AgentSpecialization.PREDICTIVE.value) - - if not detective_result: - logger.warning("No detective agent results available") - return {'error': 'No agent results available'} - - synthesis = { - 'incident_summary': { - 'severity': detective_result['findings'].get('severity_tier', 'UNKNOWN'), - 'anomaly_confidence': detective_result['confidence'], - 'primary_metrics_affected': [ - metric["metric"] for metric in - detective_result['findings'].get('primary_metrics_affected', []) - ] - }, - 'root_cause_insights': diagnostician_result['findings'] if diagnostician_result else {}, - 'predictive_insights': predictive_result['findings'] if predictive_result else {}, - 'recommended_actions': self._prioritize_actions( - detective_result.get('recommendations', []), - diagnostician_result.get('recommendations', []) if diagnostician_result else [], - predictive_result.get('recommendations', []) if predictive_result else [] - ), - 'agent_metadata': { - 'participating_agents': list(agent_results.keys()), - 'analysis_timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat() - } - } - - return synthesis - - def _prioritize_actions( - self, - detection_actions: List[str], - diagnosis_actions: List[str], - predictive_actions: List[str] - ) -> List[str]: - """Combine and prioritize actions from multiple agents""" - all_actions = detection_actions + diagnosis_actions + predictive_actions - seen = set() - unique_actions = [] - for action in all_actions: - if action not in seen: - seen.add(action) - unique_actions.append(action) - return unique_actions[:5] - -# === Enhanced Reliability Engine === -class EnhancedReliabilityEngine: - """ - Main engine for processing reliability events - """ - - def __init__( - self, - orchestrator: Optional[OrchestrationManager] = None, - policy_engine: Optional[PolicyEngine] = None, - event_store: Optional[ThreadSafeEventStore] = None, - anomaly_detector: Optional[AdvancedAnomalyDetector] = None, - business_calculator: Optional[BusinessImpactCalculator] = None - ): - """ - Initialize reliability engine with dependency injection - """ - self.orchestrator = orchestrator or OrchestrationManager() - self.policy_engine = policy_engine or PolicyEngine() - self.event_store = event_store or ThreadSafeEventStore() - self.anomaly_detector = anomaly_detector or AdvancedAnomalyDetector() - self.business_calculator = business_calculator or BusinessImpactCalculator() - - self.performance_metrics = { - 'total_incidents_processed': 0, - 'multi_agent_analyses': 0, - 'anomalies_detected': 0 - } - self._lock = threading.RLock() - logger.info("Initialized EnhancedReliabilityEngine") - - async def process_event_enhanced( - self, - component: str, - latency: float, - error_rate: float, - throughput: float = 1000, - cpu_util: Optional[float] = None, - memory_util: Optional[float] = None - ) -> Dict[str, Any]: - """ - Process a reliability event through the complete analysis pipeline - """ - logger.info( - f"Processing event for {component}: latency={latency}ms, " - f"error_rate={error_rate*100:.1f}%" - ) - - # Validate component ID - is_valid, error_msg = validate_component_id(component) - if not is_valid: - return {'error': error_msg, 'status': 'INVALID'} - - # Create event - try: - event = ReliabilityEvent( - component=component, - latency_p99=latency, - error_rate=error_rate, - throughput=throughput, - cpu_util=cpu_util, - memory_util=memory_util, - upstream_deps=["auth-service", "database"] if component == "api-service" else [] - ) - except Exception as e: - logger.error(f"Event creation error: {e}", exc_info=True) - return {'error': f'Invalid event data: {str(e)}', 'status': 'INVALID'} - - # Multi-agent analysis - agent_analysis = await self.orchestrator.orchestrate_analysis(event) - - # Anomaly detection - is_anomaly = self.anomaly_detector.detect_anomaly(event) - - # Determine severity based on agent confidence - agent_confidence = 0.0 - if agent_analysis and 'incident_summary' in agent_analysis: - agent_confidence = agent_analysis.get('incident_summary', {}).get('anomaly_confidence', 0) - else: - agent_confidence = 0.8 if is_anomaly else 0.1 - - # Set event severity - if agent_confidence > 0.8: - severity = EventSeverity.CRITICAL - elif agent_confidence > 0.6: - severity = EventSeverity.HIGH - elif agent_confidence > 0.4: - severity = EventSeverity.MEDIUM - else: - severity = EventSeverity.LOW - - # Create mutable copy with updated severity - event = event.model_copy(update={'severity': severity}) - - # Evaluate healing policies - healing_actions = self.policy_engine.evaluate_policies(event) - - # Calculate ENTERPRISE business impact - business_impact = self.business_calculator.calculate_impact(event) if is_anomaly else None - - # Store in vector database for similarity detection - if thread_safe_index is not None and model is not None and is_anomaly: - try: - analysis_text = agent_analysis.get('recommended_actions', ['No analysis'])[0] - vector_text = f"{component} {latency} {error_rate} {analysis_text}" - - loop = asyncio.get_event_loop() - vec = await loop.run_in_executor( - thread_safe_index._encoder_pool, - get_model().encode, - [vector_text] - ) - - thread_safe_index.add_async(np.array(vec, dtype=np.float32), vector_text) - except Exception as e: - logger.error(f"Error storing vector: {e}", exc_info=True) - - # Build comprehensive result - result = { - "timestamp": event.timestamp.isoformat(), - "component": component, - "latency_p99": latency, - "error_rate": error_rate, - "throughput": throughput, - "status": "ANOMALY" if is_anomaly else "NORMAL", - "multi_agent_analysis": agent_analysis, - "healing_actions": [action.value for action in healing_actions], - "business_impact": business_impact, - "severity": event.severity.value, - "similar_incidents_count": thread_safe_index.get_count() if thread_safe_index and is_anomaly else 0, - "processing_metadata": { - "agents_used": agent_analysis.get('agent_metadata', {}).get('participating_agents', []), - "analysis_confidence": agent_analysis.get('incident_summary', {}).get('anomaly_confidence', 0) - } - } - - # Store event in history - self.event_store.add(event) +def create_oss_demo_ui(): + """Create simple OSS demo UI""" + with gr.Blocks(title="๐Ÿง  ARF OSS Edition - Advisory Demo", theme="soft") as demo: + gr.Markdown(""" + # ๐Ÿง  Agentic Reliability Framework - OSS Edition + **Advisory Analysis Only - Apache 2.0 Licensed** - # Update performance metrics - with self._lock: - self.performance_metrics['total_incidents_processed'] += 1 - self.performance_metrics['multi_agent_analyses'] += 1 - if is_anomaly: - self.performance_metrics['anomalies_detected'] += 1 + _OSS creates HealingIntent recommendations. Enterprise executes them._ - logger.info(f"Event processed: {result['status']} with {result['severity']} severity") + --- - # Track business metrics for ROI dashboard - if is_anomaly and business_impact: - auto_healed = len(healing_actions) > 0 and healing_actions[0] != HealingAction.NO_ACTION - business_metrics.record_incident( - severity=event.severity.value, - auto_healed=auto_healed, - revenue_loss=business_impact['revenue_loss_estimate'], - detection_time_seconds=120.0 - ) + **OSS Capabilities:** + โœ… Anomaly detection & pattern recognition + โœ… HealingIntent creation (advisory only) + โœ… RAG similarity search (in-memory) + โœ… Safety validation & guardrails - logger.info(f"Event processed: {result['status']} with {result['severity']} severity") - - return result - -# === Initialize Engine === -enhanced_engine = EnhancedReliabilityEngine() - - -# === ENTERPRISE BUSINESS METRICS TRACKER === -class BusinessMetricsTracker: - """Track cumulative ENTERPRISE business metrics for ROI dashboard""" - - def __init__(self): - self.total_incidents = 0 - self.incidents_auto_healed = 0 - self.total_revenue_saved = 0.0 - self.total_revenue_at_risk = 0.0 - self.detection_times = [] - self._lock = threading.RLock() - logger.info("Initialized Enterprise BusinessMetricsTracker") - - def record_incident( - self, - severity: str, - auto_healed: bool, - revenue_loss: float, - detection_time_seconds: float = 120.0 - ): - """Record an incident and update ENTERPRISE metrics""" - with self._lock: - self.total_incidents += 1 - - if auto_healed: - self.incidents_auto_healed += 1 - - # ENTERPRISE: Industry average 45 minutes for enterprises - industry_avg_response_minutes = Constants.INDUSTRY_AVG_RESPONSE_MINUTES - arf_response_minutes = detection_time_seconds / 60 - - # Revenue at risk if using traditional monitoring - revenue_per_minute = revenue_loss / max(1, arf_response_minutes) - traditional_loss = revenue_per_minute * industry_avg_response_minutes - - self.total_revenue_at_risk += traditional_loss - self.total_revenue_saved += (traditional_loss - revenue_loss) - - self.detection_times.append(detection_time_seconds) - - logger.info( - f"Recorded ENTERPRISE incident: auto_healed={auto_healed}, " - f"loss=\${revenue_loss:,.0f}, saved=\${traditional_loss - revenue_loss:,.0f}" - ) - - def get_metrics(self) -> dict: - """Get current cumulative ENTERPRISE metrics""" - with self._lock: - auto_heal_rate = ( - (self.incidents_auto_healed / self.total_incidents * 100) - if self.total_incidents > 0 else 0 - ) - - avg_detection_time = ( - sum(self.detection_times) / len(self.detection_times) - if self.detection_times else 120.0 - ) - - time_improvement = ( - (Constants.INDUSTRY_AVG_RESPONSE_MINUTES - (avg_detection_time / 60)) / - Constants.INDUSTRY_AVG_RESPONSE_MINUTES * 100 - ) - - return { - "total_incidents": self.total_incidents, - "incidents_auto_healed": self.incidents_auto_healed, - "auto_heal_rate": auto_heal_rate, - "total_revenue_saved": self.total_revenue_saved, - "total_revenue_at_risk": self.total_revenue_at_risk, - "avg_detection_time_seconds": avg_detection_time, - "avg_detection_time_minutes": avg_detection_time / 60, - "time_improvement": time_improvement - } - - def reset(self): - """Reset all metrics (for demo purposes)""" - with self._lock: - self.total_incidents = 0 - self.incidents_auto_healed = 0 - self.total_revenue_saved = 0.0 - self.total_revenue_at_risk = 0.0 - self.detection_times = [] - logger.info("Reset Enterprise BusinessMetricsTracker") - - -# Initialize global tracker -business_metrics = BusinessMetricsTracker() - -# === Rate Limiting === -class RateLimiter: - """Simple rate limiter for request throttling""" - - def __init__(self, max_per_minute: int = Constants.MAX_REQUESTS_PER_MINUTE): - self.max_per_minute = max_per_minute - self.requests: deque = deque(maxlen=max_per_minute) - self._lock = threading.RLock() - - def is_allowed(self) -> Tuple[bool, str]: - """Check if request is allowed""" - with self._lock: - now = datetime.datetime.now(datetime.timezone.utc) - - one_minute_ago = now - datetime.timedelta(minutes=1) - while self.requests and self.requests[0] < one_minute_ago: - self.requests.popleft() - - if len(self.requests) >= self.max_per_minute: - return False, f"Rate limit exceeded: {self.max_per_minute} requests/minute" - - self.requests.append(now) - return True, "" - - -rate_limiter = RateLimiter() - -# === Gradio UI === -def create_enhanced_ui(): - """ - Create the comprehensive Gradio UI for ENTERPRISE reliability framework - """ - - with gr.Blocks(title="๐Ÿง  Agentic Reliability Framework", theme="soft") as demo: - gr.Markdown(""" - # ๐Ÿง  Agentic Reliability Framework - **Enterprise Multi-Agent AI System for Production Reliability** - - *Specialized AI agents working together to detect, diagnose, predict, and heal system issues* - *Designed for $1M+/month businesses requiring 99.9%+ uptime* + **Enterprise Upgrade Required For:** + ๐Ÿ”ง Actual tool execution + ๐Ÿ“Š Learning from outcomes + ๐Ÿ’พ Persistent storage + ๐Ÿ“ Audit trails & compliance """) - # === ENTERPRISE ROI DASHBOARD === - with gr.Accordion("๐Ÿ’ฐ Enterprise ROI Calculator", open=True): - gr.Markdown(""" - ### Real Enterprise Impact Analysis - *Based on industry data from Fortune 500 deployments* - """) - - with gr.Row(): - with gr.Column(scale=2): - monthly_revenue = gr.Slider( - minimum=100000, maximum=10000000, value=1000000, step=100000, - label="Monthly Revenue (\$)", - info="Enter your company's monthly revenue", - interactive=True - ) - - calculate_roi_btn = gr.Button("๐Ÿ“ˆ Calculate ROI", variant="primary") - - with gr.Column(scale=1): - gr.Markdown(""" - **Enterprise Baseline:** - - ๐Ÿข 20 incidents/month - - โฑ๏ธ 45 min avg response (industry) - - ๐Ÿ’ธ 70% auto-heal rate (ARF) - - ๐Ÿ“Š 240 incidents/year - """) - - roi_output = gr.Markdown(""" - **Enter your revenue to see enterprise ROI** - - *Example: $1M/month SaaS company:* - - Annual incidents: 240 - - Traditional loss: \$864,000/year - - ARF recovery: \$691,200/year - - **Net Savings: \$172,800/year** - - **ROI: 264% first year** - - **Payback: 3.3 months** - """) - - # ROI calculation function - def calculate_roi_display(revenue): - results = calculate_enterprise_roi(revenue) - return f""" - ### ๐Ÿ“ˆ ENTERPRISE ROI ANALYSIS - **For \${revenue:,.0f}/month Business** - - **Annual Impact:** - - ๐Ÿ“Š **Incidents**: {results['monthly_incidents']}/month ({results['monthly_incidents']*12}/year) - - ๐Ÿ’ธ **Traditional Loss**: \${results['traditional_annual_loss']:,.0f}/year - - ๐Ÿ›ก๏ธ **ARF Protected Loss**: \${results['arf_annual_loss']:,.0f}/year - - โœ… **Annual Savings**: **\${results['annual_savings']:,.0f}** - - **Investment (\$47,500 implementation):** - - ๐Ÿ“… **Payback Period**: {results['roi_months']} months - - ๐Ÿš€ **First Year ROI**: **{results['first_year_roi']}%** - - ๐Ÿ’ฐ **Year 1 Net Gain**: **\${results['first_year_net_gain']:,.0f}** - - **Breakdown:** - - ๐ŸŽฏ 70% incidents auto-healed - - โšก 95% faster detection (45min โ†’ 2.3min) - - ๐Ÿ›ก๏ธ 65% reduction in downtime costs - - ๐Ÿ“ˆ 10:1 ROI in first year - """ - - calculate_roi_btn.click( - fn=calculate_roi_display, - inputs=[monthly_revenue], - outputs=[roi_output] - ) - - # === LIVE METRICS DASHBOARD === - with gr.Accordion("๐Ÿ“Š Live Demo Metrics", open=True): - gr.Markdown(""" - ### Real-Time Demo Metrics - *Track cumulative value delivered in this demo session* - """) - - with gr.Row(): - with gr.Column(scale=1): - total_incidents_display = gr.Number( - label="๐Ÿ“Š Total Incidents Analyzed", - value=0, - interactive=False - ) - with gr.Column(scale=1): - incidents_healed_display = gr.Number( - label="๐Ÿ”ง Incidents Auto-Healed", - value=0, - interactive=False - ) - with gr.Column(scale=1): - auto_heal_rate_display = gr.Number( - label="โšก Auto-Heal Rate (%)", - value=0, - interactive=False, - precision=1 - ) - - with gr.Row(): - with gr.Column(scale=1): - revenue_saved_display = gr.Number( - label="๐Ÿ’ฐ Revenue Saved (\$)", - value=0, - interactive=False, - precision=0 - ) - with gr.Column(scale=1): - avg_detection_display = gr.Number( - label="โฑ๏ธ Avg Detection Time (min)", - value=2.3, - interactive=False, - precision=1 - ) - with gr.Column(scale=1): - time_improvement_display = gr.Number( - label="๐Ÿš€ Time Improvement vs Enterprise (%)", - value=Constants.TIME_IMPROVEMENT_PCT, - interactive=False, - precision=1 - ) - - with gr.Row(): - gr.Markdown(f""" - **๐Ÿ“ˆ Enterprise Comparison:** - - **Industry Average Response:** {Constants.INDUSTRY_AVG_RESPONSE_MINUTES} minutes - - **ARF Average Response:** {Constants.ARF_AVG_RESPONSE_MINUTES} minutes - - **Result:** {(Constants.INDUSTRY_AVG_RESPONSE_MINUTES / Constants.ARF_AVG_RESPONSE_MINUTES):.1f}x faster incident resolution - - *Live metrics update as incidents are processed* - """) - - reset_metrics_btn = gr.Button("๐Ÿ”„ Reset Demo Metrics", size="sm") - - # === TELEMETRY INPUT === with gr.Row(): with gr.Column(scale=1): - gr.Markdown("### ๐Ÿ“Š Enterprise Telemetry Input") - - # Demo Scenarios Dropdown - with gr.Row(): - scenario_dropdown = gr.Dropdown( - choices=["Manual Entry"] + list(DEMO_SCENARIOS.keys()), - value="Manual Entry", - label="๐ŸŽฌ Enterprise Demo Scenario", - info="Select a pre-configured enterprise incident or enter manually" - ) + gr.Markdown("### ๐Ÿ“Š Demo Input") - # Scenario Story Display - scenario_story = gr.Markdown( - value="*Select an enterprise demo scenario above for a pre-configured incident, or enter values manually below.*", - visible=True + scenario = gr.Dropdown( + choices=list(DEMO_SCENARIOS.keys()), + value="๐Ÿš€ OSS Demo - Latency Spike", + label="Demo Scenario" ) - component = gr.Dropdown( - choices=["api-service", "auth-service", "payment-service", "database", "cache-service"], + component = gr.Textbox( value="api-service", label="Component", - info="Select the service being monitored" + interactive=True ) + latency = gr.Slider( - minimum=10, maximum=1000, value=100, step=1, - label="Latency P99 (ms)", - info=f"Enterprise alert threshold: >{Constants.LATENCY_WARNING}ms (adaptive)" + minimum=10, maximum=1000, value=250, + label="Latency (ms)", + info="P99 latency in milliseconds" ) + error_rate = gr.Slider( - minimum=0, maximum=0.5, value=0.02, step=0.001, + minimum=0, maximum=1, value=0.15, step=0.01, label="Error Rate", - info=f"Enterprise alert threshold: >{Constants.ERROR_RATE_WARNING}" - ) - throughput = gr.Number( - value=1000, - label="Throughput (req/sec)", - info="Current enterprise request rate" + info="Error rate (0.0 to 1.0)" ) - cpu_util = gr.Slider( - minimum=0, maximum=1, value=0.4, step=0.01, - label="CPU Utilization", - info="0.0 - 1.0 scale" - ) - memory_util = gr.Slider( - minimum=0, maximum=1, value=0.3, step=0.01, - label="Memory Utilization", - info="0.0 - 1.0 scale" - ) - submit_btn = gr.Button("๐Ÿš€ Submit Enterprise Telemetry", variant="primary", size="lg") + analyze_btn = gr.Button("๐Ÿค– Analyze with OSS", variant="primary") + with gr.Column(scale=2): - gr.Markdown("### ๐Ÿ” Multi-Agent Enterprise Analysis") - output_text = gr.Textbox( - label="Enterprise Agent Synthesis", - placeholder="Enterprise AI agents are analyzing...", - lines=6 - ) - - with gr.Accordion("๐Ÿค– Enterprise Agent Specialists", open=False): - gr.Markdown(""" - **Enterprise Specialized AI Agents:** - - ๐Ÿ•ต๏ธ **Detective**: Anomaly detection & pattern recognition - - ๐Ÿ” **Diagnostician**: Root cause analysis & investigation - - ๐Ÿ”ฎ **Predictive**: Future risk forecasting & trend analysis - """) - - agent_insights = gr.JSON( - label="Detailed Enterprise Findings", - value={} - ) + gr.Markdown("### ๐Ÿ“‹ OSS Analysis Results") - with gr.Accordion("๐Ÿ”ฎ Enterprise Predictive Analytics", open=False): - gr.Markdown(""" - **Enterprise Risk Forecasting:** - - ๐Ÿ“ˆ Latency trends and thresholds - - ๐Ÿšจ Error rate predictions - - ๐Ÿ”ฅ Resource utilization forecasts - - โฐ Time-to-failure estimates - """) - - predictive_insights = gr.JSON( - label="Enterprise Predictive Forecasts", - value={} - ) + scenario_story = gr.Markdown( + value=DEMO_SCENARIOS["๐Ÿš€ OSS Demo - Latency Spike"]["story"] + ) - gr.Markdown("### ๐Ÿ“ˆ Recent Enterprise Events (Last 15)") - events_table = gr.Dataframe( - headers=["Timestamp", "Component", "Latency", "Error Rate", "Throughput", "Severity", "Analysis"], - label="Enterprise Event History", - wrap=True, + output = gr.JSON( + label="OSS Analysis Output", + value={} ) - with gr.Accordion("โ„น๏ธ Enterprise Framework Capabilities", open=False): - gr.Markdown(f""" - **Designed for \$1M+/month businesses:** - - **๐Ÿค– Multi-Agent AI**: Specialized agents for detection, diagnosis, prediction, and healing - - **๐Ÿ”ฎ Predictive Analytics**: Forecast future risks and performance degradation - - **๐Ÿ”ง Policy-Based Healing**: Automated recovery actions based on severity and context - - **๐Ÿ’ฐ Enterprise Impact**: Revenue and user impact quantification at scale - - **๐ŸŽฏ Adaptive Detection**: ML-powered thresholds that learn from your environment - - **๐Ÿ“š Vector Memory**: FAISS-based incident memory for similarity detection - - **โšก Production Ready**: Circuit breakers, cooldowns, thread safety, enterprise features - - **๐Ÿ”’ Security Patched**: All critical CVEs fixed - - **Enterprise ROI:** - - **Implementation Cost**: \$47,500 - - **Typical Payback**: 3-6 months - - **First Year ROI**: 200-500% - - **Annual Savings**: \$100K-\$2M+ depending on revenue - """) - - with gr.Accordion("๐Ÿ”ง Enterprise Healing Policies", open=False): - policy_info = [] - for policy in enhanced_engine.policy_engine.policies: - if policy.enabled: - actions = ", ".join([action.value for action in policy.actions]) - policy_info.append( - f"**{policy.name}** (Priority {policy.priority}): {actions}\n" - f" - Cooldown: {policy.cool_down_seconds}s\n" - f" - Max executions: {policy.max_executions_per_hour}/hour" - ) - - gr.Markdown("\n\n".join(policy_info)) - - # Scenario change handler - def on_scenario_change(scenario_name): - """Update input fields when enterprise demo scenario is selected""" - if scenario_name == "Manual Entry": - return { - scenario_story: gr.update(value="*Enter values manually below.*"), - component: gr.update(value="api-service"), - latency: gr.update(value=100), - error_rate: gr.update(value=0.02), - throughput: gr.update(value=1000), - cpu_util: gr.update(value=0.4), - memory_util: gr.update(value=0.3) - } - - scenario = DEMO_SCENARIOS.get(scenario_name) - if not scenario: - return {} - + def update_scenario(scenario_name): + """Update UI when scenario changes""" + scenario_data = DEMO_SCENARIOS.get(scenario_name, {}) return { - scenario_story: gr.update(value=scenario["story"]), - component: gr.update(value=scenario["component"]), - latency: gr.update(value=scenario["latency"]), - error_rate: gr.update(value=scenario["error_rate"]), - throughput: gr.update(value=scenario["throughput"]), - cpu_util: gr.update(value=scenario.get("cpu_util", 0.5)), - memory_util: gr.update(value=scenario.get("memory_util", 0.5)) + scenario_story: gr.update(value=scenario_data.get("story", "")), + component: gr.update(value=scenario_data.get("component", "api-service")), + latency: gr.update(value=scenario_data.get("latency", 100)), + error_rate: gr.update(value=scenario_data.get("error_rate", 0.05)), } - # Reset metrics handler - def reset_metrics(): - """Reset business metrics for demo purposes""" - business_metrics.reset() - return 0, 0, 0.0, 0.0, Constants.ARF_AVG_RESPONSE_MINUTES, Constants.TIME_IMPROVEMENT_PCT + async def analyze_oss_async(component, latency, error_rate, scenario_name): + """Async analysis handler""" + result = await analyze_with_oss(component, latency, error_rate, scenario_name) + return result - # Connect scenario dropdown to inputs - scenario_dropdown.change( - fn=on_scenario_change, - inputs=[scenario_dropdown], - outputs=[scenario_story, component, latency, error_rate, throughput, cpu_util, memory_util] + # Connect events + scenario.change( + fn=update_scenario, + inputs=[scenario], + outputs=[scenario_story, component, latency, error_rate] ) - # Connect reset button - reset_metrics_btn.click( - fn=reset_metrics, - outputs=[ - total_incidents_display, - incidents_healed_display, - auto_heal_rate_display, - revenue_saved_display, - avg_detection_display, - time_improvement_display - ] + analyze_btn.click( + fn=analyze_oss_async, + inputs=[component, latency, error_rate, scenario], + outputs=[output] ) - - # Event submission handler with ROI tracking - async def submit_event_enhanced_async( - component, latency, error_rate, throughput, cpu_util, memory_util - ): - """ - Async event handler for enterprise telemetry - """ - try: - # Rate limiting check - allowed, rate_msg = rate_limiter.is_allowed() - if not allowed: - logger.warning(f"Rate limit exceeded") - metrics = business_metrics.get_metrics() - return ( - rate_msg, {}, {}, gr.update(value=[]), - metrics["total_incidents"], - metrics["incidents_auto_healed"], - metrics["auto_heal_rate"], - metrics["total_revenue_saved"], - metrics["avg_detection_time_minutes"], - metrics["time_improvement"] - ) - - # Type conversion - try: - latency = float(latency) - error_rate = float(error_rate) - throughput = float(throughput) if throughput else 1000 - cpu_util = float(cpu_util) if cpu_util else None - memory_util = float(memory_util) if memory_util else None - except (ValueError, TypeError) as e: - error_msg = f"โŒ Invalid input types: {str(e)}" - logger.warning(error_msg) - metrics = business_metrics.get_metrics() - return ( - error_msg, {}, {}, gr.update(value=[]), - metrics["total_incidents"], - metrics["incidents_auto_healed"], - metrics["auto_heal_rate"], - metrics["total_revenue_saved"], - metrics["avg_detection_time_minutes"], - metrics["time_improvement"] - ) - - # Input validation - is_valid, error_msg = validate_inputs( - latency, error_rate, throughput, cpu_util, memory_util - ) - if not is_valid: - logger.warning(f"Invalid input: {error_msg}") - metrics = business_metrics.get_metrics() - return ( - error_msg, {}, {}, gr.update(value=[]), - metrics["total_incidents"], - metrics["incidents_auto_healed"], - metrics["auto_heal_rate"], - metrics["total_revenue_saved"], - metrics["avg_detection_time_minutes"], - metrics["time_improvement"] - ) - - # Process event through engine - result = await enhanced_engine.process_event_enhanced( - component, latency, error_rate, throughput, cpu_util, memory_util - ) - - # Handle errors - if 'error' in result: - metrics = business_metrics.get_metrics() - return ( - f"โŒ {result['error']}", {}, {}, gr.update(value=[]), - metrics["total_incidents"], - metrics["incidents_auto_healed"], - metrics["auto_heal_rate"], - metrics["total_revenue_saved"], - metrics["avg_detection_time_minutes"], - metrics["time_improvement"] - ) - - # Build table data (THREAD-SAFE) - table_data = [] - for event in enhanced_engine.event_store.get_recent(15): - table_data.append([ - event.timestamp.strftime("%Y-%m-%d %H:%M:%S"), - event.component, - f"{event.latency_p99:.0f}ms", - f"{event.error_rate:.3f}", - f"{event.throughput:.0f}", - event.severity.value.upper(), - "Enterprise multi-agent analysis" - ]) - - # Format output message with ENTERPRISE impact - status_emoji = "๐Ÿšจ" if result["status"] == "ANOMALY" else "โœ…" - output_msg = f"{status_emoji} **ENTERPRISE {result['status']}**\n" - - if "multi_agent_analysis" in result: - analysis = result["multi_agent_analysis"] - confidence = analysis.get('incident_summary', {}).get('anomaly_confidence', 0) - output_msg += f"๐ŸŽฏ **Confidence**: {confidence*100:.1f}%\n" - - predictive_data = analysis.get('predictive_insights', {}) - if predictive_data.get('critical_risk_count', 0) > 0: - output_msg += f"๐Ÿ”ฎ **PREDICTIVE**: {predictive_data['critical_risk_count']} critical risks forecast\n" - - if analysis.get('recommended_actions'): - actions_preview = ', '.join(analysis['recommended_actions'][:2]) - output_msg += f"๐Ÿ’ก **Enterprise Insights**: {actions_preview}\n" - - if result.get("business_impact"): - impact = result["business_impact"] - output_msg += ( - f"๐Ÿ’ฐ **Enterprise Impact**: \${impact['revenue_loss_estimate']:,.0f} | " - f"๐Ÿ‘ฅ {impact['affected_users_estimate']:,} users | " - f"๐Ÿšจ {impact['severity_level']}\n" - ) - if impact.get('impact_multiplier'): - output_msg += f"๐Ÿ“ˆ **Impact Multiplier**: {impact['impact_multiplier']}x baseline\n" - - if result.get("healing_actions") and result["healing_actions"] != ["no_action"]: - actions = ", ".join(result["healing_actions"]) - output_msg += f"๐Ÿ”ง **Auto-Actions**: {actions}" - - agent_insights_data = result.get("multi_agent_analysis", {}) - predictive_insights_data = agent_insights_data.get('predictive_insights', {}) - - # Get updated metrics - metrics = business_metrics.get_metrics() - - # RETURN THE RESULTS WITH ROI METRICS - return ( - output_msg, - agent_insights_data, - predictive_insights_data, - gr.update(value=table_data), - metrics["total_incidents"], - metrics["incidents_auto_healed"], - metrics["auto_heal_rate"], - metrics["total_revenue_saved"], - metrics["avg_detection_time_minutes"], - metrics["time_improvement"] - ) - - except Exception as e: - error_msg = f"โŒ Error processing enterprise event: {str(e)}" - logger.error(error_msg, exc_info=True) - metrics = business_metrics.get_metrics() - return ( - error_msg, {}, {}, gr.update(value=[]), - metrics["total_incidents"], - metrics["incidents_auto_healed"], - metrics["auto_heal_rate"], - metrics["total_revenue_saved"], - metrics["avg_detection_time_minutes"], - metrics["time_improvement"] - ) - # Connect submit button with all outputs - submit_btn.click( - fn=submit_event_enhanced_async, - inputs=[component, latency, error_rate, throughput, cpu_util, memory_util], - outputs=[ - output_text, - agent_insights, - predictive_insights, - events_table, - total_incidents_display, - incidents_healed_display, - auto_heal_rate_display, - revenue_saved_display, - avg_detection_display, - time_improvement_display - ] - ) + # Footer with upgrade info + gr.Markdown(""" + --- + + **Upgrade to Enterprise Edition:** + + | Feature | OSS Edition | Enterprise Edition | + |---------|-------------|-------------------| + | Execution | โŒ Advisory only | โœ… Autonomous + Approval | + | Storage | โš ๏ธ In-memory only | โœ… Persistent (Neo4j, PostgreSQL) | + | Learning | โŒ None | โœ… Continuous learning engine | + | Audit | โŒ None | โœ… Full audit trails (SOC2, HIPAA) | + | Support | โŒ Community | โœ… 24/7 Enterprise support | + + **Contact:** enterprise@petterjuan.com | **Website:** https://arf.dev + """) return demo -# Create demo at module level for Hugging Face Spaces -demo = create_enhanced_ui() +# ============================================================================ +# MAIN ENTRY POINT +# ============================================================================ + +def main(): + """Main entry point for OSS demo""" + logging.basicConfig(level=logging.INFO) + logger.info("=" * 60) + logger.info("Starting ARF OSS Edition - Advisory Demo") + logger.info("=" * 60) + + demo = create_oss_demo_ui() + demo.launch( + server_name="0.0.0.0", + server_port=7860, + share=False, + show_error=True + ) -# === Main Entry Point === if __name__ == "__main__": - logger.info("=" * 80) - logger.info("Starting ENTERPRISE Agentic Reliability Framework") - logger.info(f"Enterprise Scale: ${Constants.BASE_REVENUE_PER_MINUTE}/min = ${Constants.BASE_REVENUE_PER_MINUTE*60:,.0f}/hour") - logger.info("=" * 80) - logger.info(f"Python version: {os.sys.version}") - logger.info(f"Total events in history: {enhanced_engine.event_store.count()}") - logger.info(f"Vector index size: {thread_safe_index.get_count() if thread_safe_index else 0}") - logger.info(f"Agents initialized: {len(enhanced_engine.orchestrator.agents)}") - logger.info(f"Policies loaded: {len(enhanced_engine.policy_engine.policies)}") - logger.info(f"Enterprise demo scenarios: {len(DEMO_SCENARIOS)}") - logger.info(f"Configuration: HF_TOKEN={'SET' if config.HF_TOKEN else 'NOT SET'}") - logger.info(f"Rate limit: {Constants.MAX_REQUESTS_PER_MINUTE} requests/minute") - logger.info("=" * 80) - - try: - logger.info("Launching ENTERPRISE Gradio UI on 0.0.0.0:7860...") - demo.launch( - server_name="0.0.0.0", - server_port=7860, - share=False, - show_error=True - ) - except KeyboardInterrupt: - logger.info("Received shutdown signal...") - except Exception as e: - logger.error(f"Enterprise application error: {e}", exc_info=True) - finally: - # Graceful shutdown - logger.info("Shutting down gracefully...") - - if thread_safe_index: - logger.info("Saving pending vectors before shutdown...") - thread_safe_index.shutdown() - - logger.info("=" * 80) - logger.info("Enterprise application shutdown complete") - logger.info("=" * 80) \ No newline at end of file + main() \ No newline at end of file