| import os |
| import json |
| import numpy as np |
| import gradio as gr |
| import requests |
| import pandas as pd |
| import datetime |
| from typing import List, Dict, Any |
| import hashlib |
| import asyncio |
|
|
| |
| from models import ReliabilityEvent, EventSeverity, AnomalyResult, HealingAction |
| from healing_policies import PolicyEngine |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN", "").strip() |
| HF_API_URL = "https://router.huggingface.co/hf-inference/v1/completions" |
| HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
|
|
| |
| try: |
| from sentence_transformers import SentenceTransformer |
| import faiss |
| |
| VECTOR_DIM = 384 |
| INDEX_FILE = "incident_vectors.index" |
| TEXTS_FILE = "incident_texts.json" |
| |
| |
| try: |
| model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
| except Exception as e: |
| print(f"Model loading warning: {e}") |
| from sentence_transformers import SentenceTransformer as ST |
| model = ST("sentence-transformers/all-MiniLM-L6-v2") |
| |
| if os.path.exists(INDEX_FILE): |
| index = faiss.read_index(INDEX_FILE) |
| with open(TEXTS_FILE, "r") as f: |
| incident_texts = json.load(f) |
| else: |
| index = faiss.IndexFlatL2(VECTOR_DIM) |
| incident_texts = [] |
| |
| except ImportError as e: |
| print(f"Warning: FAISS or SentenceTransformers not available: {e}") |
| index = None |
| incident_texts = [] |
| model = None |
|
|
| def save_index(): |
| """Save FAISS index and incident texts""" |
| if index is not None: |
| faiss.write_index(index, INDEX_FILE) |
| with open(TEXTS_FILE, "w") as f: |
| json.dump(incident_texts, f) |
|
|
| |
| policy_engine = PolicyEngine() |
| events_history: List[ReliabilityEvent] = [] |
|
|
| class BusinessImpactCalculator: |
| """Calculate business impact of anomalies""" |
| |
| def __init__(self, revenue_per_request: float = 0.01): |
| self.revenue_per_request = revenue_per_request |
| |
| def calculate_impact(self, event: ReliabilityEvent, duration_minutes: int = 5) -> Dict[str, Any]: |
| """Enhanced business impact calculation""" |
| |
| |
| base_revenue_per_minute = 100 |
| |
| |
| impact_multiplier = 1.0 |
| |
| if event.latency_p99 > 300: |
| impact_multiplier += 0.5 |
| if event.error_rate > 0.1: |
| impact_multiplier += 0.8 |
| if event.cpu_util and event.cpu_util > 0.9: |
| impact_multiplier += 0.3 |
| |
| revenue_loss = base_revenue_per_minute * impact_multiplier * (duration_minutes / 60) |
| |
| |
| base_users_affected = 1000 |
| user_impact_multiplier = (event.error_rate * 10) + (max(0, event.latency_p99 - 100) / 500) |
| affected_users = int(base_users_affected * user_impact_multiplier) |
| |
| |
| if revenue_loss > 500 or affected_users > 5000: |
| severity = "CRITICAL" |
| elif revenue_loss > 100 or affected_users > 1000: |
| severity = "HIGH" |
| elif revenue_loss > 50 or affected_users > 500: |
| severity = "MEDIUM" |
| else: |
| severity = "LOW" |
| |
| return { |
| 'revenue_loss_estimate': round(revenue_loss, 2), |
| 'affected_users_estimate': affected_users, |
| 'severity_level': severity, |
| 'throughput_reduction_pct': round(min(100, user_impact_multiplier * 100), 1) |
| } |
|
|
| business_calculator = BusinessImpactCalculator() |
|
|
| class AdvancedAnomalyDetector: |
| """Enhanced anomaly detection with adaptive thresholds""" |
| |
| def __init__(self): |
| self.historical_data = [] |
| self.adaptive_thresholds = { |
| 'latency_p99': 150, |
| 'error_rate': 0.05 |
| } |
| |
| def detect_anomaly(self, event: ReliabilityEvent) -> bool: |
| """Enhanced anomaly detection with adaptive thresholds""" |
| |
| |
| latency_anomaly = event.latency_p99 > self.adaptive_thresholds['latency_p99'] |
| error_anomaly = event.error_rate > self.adaptive_thresholds['error_rate'] |
| |
| |
| resource_anomaly = False |
| if event.cpu_util and event.cpu_util > 0.9: |
| resource_anomaly = True |
| if event.memory_util and event.memory_util > 0.9: |
| resource_anomaly = True |
| |
| |
| self._update_thresholds(event) |
| |
| return latency_anomaly or error_anomaly or resource_anomaly |
| |
| def _update_thresholds(self, event: ReliabilityEvent): |
| """Update adaptive thresholds based on historical data""" |
| self.historical_data.append(event) |
| |
| |
| if len(self.historical_data) > 100: |
| self.historical_data.pop(0) |
| |
| |
| if len(self.historical_data) > 10: |
| recent_latencies = [e.latency_p99 for e in self.historical_data[-20:]] |
| self.adaptive_thresholds['latency_p99'] = np.percentile(recent_latencies, 90) |
|
|
| anomaly_detector = AdvancedAnomalyDetector() |
|
|
| |
| from enum import Enum |
|
|
| class AgentSpecialization(Enum): |
| DETECTIVE = "anomaly_detection" |
| DIAGNOSTICIAN = "root_cause_analysis" |
|
|
| class BaseAgent: |
| def __init__(self, specialization: AgentSpecialization): |
| self.specialization = specialization |
| |
| async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| raise NotImplementedError |
|
|
| class AnomalyDetectionAgent(BaseAgent): |
| def __init__(self): |
| super().__init__(AgentSpecialization.DETECTIVE) |
| |
| async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| """Enhanced anomaly detection with confidence scoring""" |
| anomaly_score = self._calculate_anomaly_score(event) |
| |
| return { |
| 'specialization': self.specialization.value, |
| 'confidence': anomaly_score, |
| 'findings': { |
| 'anomaly_score': anomaly_score, |
| 'severity_tier': self._classify_severity(anomaly_score), |
| 'primary_metrics_affected': self._identify_affected_metrics(event) |
| }, |
| 'recommendations': [ |
| f"Investigate {metric} anomalies" for metric in self._identify_affected_metrics(event) |
| ] |
| } |
| |
| def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float: |
| """Calculate comprehensive anomaly score (0-1)""" |
| scores = [] |
| |
| |
| if event.latency_p99 > 150: |
| latency_score = min(1.0, (event.latency_p99 - 150) / 500) |
| scores.append(0.4 * latency_score) |
| |
| |
| if event.error_rate > 0.05: |
| error_score = min(1.0, event.error_rate / 0.3) |
| scores.append(0.3 * error_score) |
| |
| |
| resource_score = 0 |
| if event.cpu_util and event.cpu_util > 0.8: |
| resource_score += 0.15 * min(1.0, (event.cpu_util - 0.8) / 0.2) |
| if event.memory_util and event.memory_util > 0.8: |
| resource_score += 0.15 * min(1.0, (event.memory_util - 0.8) / 0.2) |
| scores.append(resource_score) |
| |
| return min(1.0, sum(scores)) |
| |
| def _identify_affected_metrics(self, event: ReliabilityEvent) -> List[str]: |
| """Identify which metrics are contributing to anomalies""" |
| affected = [] |
| if event.latency_p99 > 150: |
| affected.append("latency") |
| if event.error_rate > 0.05: |
| affected.append("error_rate") |
| if event.cpu_util and event.cpu_util > 0.8: |
| affected.append("cpu_utilization") |
| if event.memory_util and event.memory_util > 0.8: |
| affected.append("memory_utilization") |
| return affected |
| |
| def _classify_severity(self, anomaly_score: float) -> str: |
| if anomaly_score > 0.8: |
| return "CRITICAL" |
| elif anomaly_score > 0.6: |
| return "HIGH" |
| elif anomaly_score > 0.4: |
| return "MEDIUM" |
| else: |
| return "LOW" |
|
|
| class RootCauseAgent(BaseAgent): |
| def __init__(self): |
| super().__init__(AgentSpecialization.DIAGNOSTICIAN) |
| |
| async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| """Basic root cause analysis""" |
| causes = self._analyze_potential_causes(event) |
| |
| return { |
| 'specialization': self.specialization.value, |
| 'confidence': 0.7, |
| 'findings': { |
| 'likely_root_causes': causes, |
| 'evidence_patterns': self._identify_evidence(event), |
| 'investigation_priority': self._prioritize_investigation(causes) |
| }, |
| 'recommendations': [ |
| f"Check {cause} for issues" for cause in causes[:2] |
| ] |
| } |
| |
| def _analyze_potential_causes(self, event: ReliabilityEvent) -> List[str]: |
| """Analyze potential root causes based on metrics""" |
| causes = [] |
| |
| if event.latency_p99 > 300 and event.error_rate > 0.1: |
| causes.append("database_connection_pool") |
| causes.append("external_dependency_timeout") |
| elif event.cpu_util and event.cpu_util > 0.9: |
| causes.append("resource_exhaustion") |
| causes.append("memory_leak") |
| elif event.error_rate > 0.2: |
| causes.append("recent_deployment") |
| causes.append("configuration_change") |
| |
| return causes if causes else ["unknown_cause_requires_investigation"] |
| |
| def _identify_evidence(self, event: ReliabilityEvent) -> List[str]: |
| """Identify evidence patterns""" |
| evidence = [] |
| if event.latency_p99 > event.error_rate * 1000: |
| evidence.append("latency_disproportionate_to_errors") |
| if event.cpu_util and event.cpu_util > 0.8 and event.memory_util and event.memory_util > 0.8: |
| evidence.append("correlated_resource_exhaustion") |
| return evidence |
| |
| def _prioritize_investigation(self, causes: List[str]) -> str: |
| if "database_connection_pool" in causes: |
| return "HIGH" |
| elif "resource_exhaustion" in causes: |
| return "HIGH" |
| else: |
| return "MEDIUM" |
|
|
| class OrchestrationManager: |
| def __init__(self): |
| self.agents = { |
| AgentSpecialization.DETECTIVE: AnomalyDetectionAgent(), |
| AgentSpecialization.DIAGNOSTICIAN: RootCauseAgent(), |
| } |
| |
| async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| """Coordinate multiple agents for comprehensive analysis""" |
| agent_tasks = { |
| spec: agent.analyze(event) |
| for spec, agent in self.agents.items() |
| } |
| |
| |
| agent_results = {} |
| for specialization, task in agent_tasks.items(): |
| try: |
| result = await asyncio.wait_for(task, timeout=5.0) |
| agent_results[specialization.value] = result |
| except asyncio.TimeoutError: |
| continue |
| |
| return self._synthesize_agent_findings(event, agent_results) |
| |
| def _synthesize_agent_findings(self, event: ReliabilityEvent, agent_results: Dict) -> Dict[str, Any]: |
| """Combine insights from all specialized agents""" |
| detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value) |
| diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value) |
| |
| if not detective_result: |
| return {'error': 'No agent results available'} |
| |
| synthesis = { |
| 'incident_summary': { |
| 'severity': detective_result['findings'].get('severity_tier', 'UNKNOWN'), |
| 'anomaly_confidence': detective_result['confidence'], |
| 'primary_metrics_affected': detective_result['findings'].get('primary_metrics_affected', []) |
| }, |
| 'root_cause_insights': diagnostician_result['findings'] if diagnostician_result else {}, |
| 'recommended_actions': self._prioritize_actions( |
| detective_result.get('recommendations', []), |
| diagnostician_result.get('recommendations', []) if diagnostician_result else [] |
| ), |
| 'agent_metadata': { |
| 'participating_agents': list(agent_results.keys()), |
| 'analysis_timestamp': datetime.datetime.now().isoformat() |
| } |
| } |
| |
| return synthesis |
| |
| def _prioritize_actions(self, detection_actions: List[str], diagnosis_actions: List[str]) -> List[str]: |
| """Combine and prioritize actions from different agents""" |
| all_actions = detection_actions + diagnosis_actions |
| |
| seen = set() |
| unique_actions = [] |
| for action in all_actions: |
| if action not in seen: |
| seen.add(action) |
| unique_actions.append(action) |
| return unique_actions[:4] |
|
|
| |
| orchestration_manager = OrchestrationManager() |
|
|
| class EnhancedReliabilityEngine: |
| def __init__(self): |
| self.performance_metrics = { |
| 'total_incidents_processed': 0, |
| 'multi_agent_analyses': 0 |
| } |
| |
| async def process_event_enhanced(self, component: str, latency: float, error_rate: float, |
| throughput: float = 1000, cpu_util: float = None, |
| memory_util: float = None) -> Dict[str, Any]: |
| """Enhanced event processing with multi-agent orchestration""" |
| |
| |
| event = ReliabilityEvent( |
| component=component, |
| latency_p99=latency, |
| error_rate=error_rate, |
| throughput=throughput, |
| cpu_util=cpu_util, |
| memory_util=memory_util, |
| upstream_deps=["auth-service", "database"] if component == "api-service" else [] |
| ) |
| |
| |
| agent_analysis = await orchestration_manager.orchestrate_analysis(event) |
| |
| |
| is_anomaly = anomaly_detector.detect_anomaly(event) |
| |
| |
| healing_actions = policy_engine.evaluate_policies(event) |
| |
| |
| business_impact = business_calculator.calculate_impact(event) if is_anomaly else None |
| |
| |
| if index is not None and is_anomaly: |
| analysis_text = agent_analysis.get('recommended_actions', ['No analysis'])[0] |
| vector_text = f"{component} {latency} {error_rate} {analysis_text}" |
| vec = model.encode([vector_text]) |
| index.add(np.array(vec, dtype=np.float32)) |
| incident_texts.append(vector_text) |
| save_index() |
| |
| |
| result = { |
| "timestamp": event.timestamp, |
| "component": component, |
| "latency_p99": latency, |
| "error_rate": error_rate, |
| "throughput": throughput, |
| "status": "ANOMALY" if is_anomaly else "NORMAL", |
| "multi_agent_analysis": agent_analysis, |
| "healing_actions": [action.value for action in healing_actions], |
| "business_impact": business_impact, |
| "severity": event.severity.value, |
| "similar_incidents_count": len(incident_texts) if is_anomaly else 0, |
| "processing_metadata": { |
| "agents_used": agent_analysis.get('agent_metadata', {}).get('participating_agents', []), |
| "analysis_confidence": agent_analysis.get('incident_summary', {}).get('anomaly_confidence', 0) |
| } |
| } |
| |
| events_history.append(event) |
| self.performance_metrics['total_incidents_processed'] += 1 |
| self.performance_metrics['multi_agent_analyses'] += 1 |
| |
| return result |
|
|
| |
| enhanced_engine = EnhancedReliabilityEngine() |
|
|
| def call_huggingface_analysis(prompt: str) -> str: |
| """Use HF Inference API or fallback simulation""" |
| if not HF_TOKEN: |
| fallback_insights = [ |
| "High latency detected - possible resource contention or network issues", |
| "Error rate increase suggests recent deployment instability", |
| "Latency spike correlates with increased user traffic patterns", |
| "Intermittent failures indicate potential dependency service degradation", |
| "Performance degradation detected - consider scaling compute resources" |
| ] |
| import random |
| return random.choice(fallback_insights) |
|
|
| try: |
| enhanced_prompt = f""" |
| As a senior reliability engineer, analyze this telemetry event and provide a concise root cause analysis: |
| |
| {prompt} |
| |
| Focus on: |
| - Potential infrastructure or application issues |
| - Correlation between metrics |
| - Business impact assessment |
| - Recommended investigation areas |
| |
| Provide 1-2 sentences maximum with actionable insights. |
| """ |
| |
| payload = { |
| "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", |
| "prompt": enhanced_prompt, |
| "max_tokens": 150, |
| "temperature": 0.4, |
| } |
| response = requests.post(HF_API_URL, headers=HEADERS, json=payload, timeout=15) |
| if response.status_code == 200: |
| result = response.json() |
| analysis_text = result.get("choices", [{}])[0].get("text", "").strip() |
| if analysis_text and len(analysis_text) > 10: |
| return analysis_text.split('\n')[0] |
| return analysis_text |
| else: |
| return f"API Error {response.status_code}: Service temporarily unavailable" |
| except Exception as e: |
| return f"Analysis service error: {str(e)}" |
|
|
| |
| def create_enhanced_ui(): |
| """Create enhanced UI with multi-agent capabilities""" |
| with gr.Blocks(title="π§ Enterprise Agentic Reliability Framework", theme="soft") as demo: |
| gr.Markdown(""" |
| # π§ Enterprise Agentic Reliability Framework |
| **Multi-Agent AI System for Production Reliability** |
| |
| *Specialized AI agents working together to detect, diagnose, and heal system issues* |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### π Telemetry Input") |
| component = gr.Dropdown( |
| choices=["api-service", "auth-service", "payment-service", "database", "cache-service"], |
| value="api-service", |
| label="Component", |
| info="Select the service being monitored" |
| ) |
| latency = gr.Slider( |
| minimum=10, maximum=1000, value=100, step=1, |
| label="Latency P99 (ms)", |
| info="Alert threshold: >150ms (adaptive)" |
| ) |
| error_rate = gr.Slider( |
| minimum=0, maximum=0.5, value=0.02, step=0.001, |
| label="Error Rate", |
| info="Alert threshold: >0.05" |
| ) |
| throughput = gr.Number( |
| value=1000, |
| label="Throughput (req/sec)", |
| info="Current request rate" |
| ) |
| cpu_util = gr.Slider( |
| minimum=0, maximum=1, value=0.4, step=0.01, |
| label="CPU Utilization", |
| info="0.0 - 1.0 scale" |
| ) |
| memory_util = gr.Slider( |
| minimum=0, maximum=1, value=0.3, step=0.01, |
| label="Memory Utilization", |
| info="0.0 - 1.0 scale" |
| ) |
| submit_btn = gr.Button("π Submit Telemetry Event", variant="primary", size="lg") |
| |
| with gr.Column(scale=2): |
| gr.Markdown("### π Multi-Agent Analysis") |
| output_text = gr.Textbox( |
| label="Agent Synthesis", |
| placeholder="AI agents are analyzing...", |
| lines=5 |
| ) |
| |
| |
| with gr.Accordion("π€ Agent Specialists Analysis", open=False): |
| gr.Markdown(""" |
| **Specialized AI Agents:** |
| - π΅οΈ **Detective**: Anomaly detection & pattern recognition |
| - π **Diagnostician**: Root cause analysis & investigation |
| """) |
| |
| agent_insights = gr.JSON( |
| label="Detailed Agent Findings", |
| value={} |
| ) |
| |
| gr.Markdown("### π Recent Events (Last 15)") |
| events_table = gr.Dataframe( |
| headers=["Timestamp", "Component", "Latency", "Error Rate", "Throughput", "Severity", "Analysis"], |
| label="Event History", |
| wrap=True, |
| ) |
| |
| |
| with gr.Accordion("βΉοΈ Framework Capabilities", open=False): |
| gr.Markdown(""" |
| - **π€ Multi-Agent AI**: Specialized agents for detection, diagnosis, and healing |
| - **π§ Policy-Based Healing**: Automated recovery actions based on severity and context |
| - **π° Business Impact**: Revenue and user impact quantification |
| - **π― Adaptive Detection**: ML-powered thresholds that learn from your environment |
| - **π Vector Memory**: FAISS-based incident memory for similarity detection |
| - **β‘ Production Ready**: Circuit breakers, cooldowns, and enterprise features |
| """) |
| |
| with gr.Accordion("π§ Healing Policies", open=False): |
| policy_info = [] |
| for policy in policy_engine.policies: |
| if policy.enabled: |
| actions = ", ".join([action.value for action in policy.actions]) |
| policy_info.append(f"**{policy.name}**: {actions} (Priority: {policy.priority})") |
| |
| gr.Markdown("\n\n".join(policy_info)) |
| |
| |
| async def submit_event_enhanced(component, latency, error_rate, throughput, cpu_util, memory_util): |
| """Enhanced event submission with async processing""" |
| try: |
| |
| latency = float(latency) |
| error_rate = float(error_rate) |
| throughput = float(throughput) if throughput else 1000 |
| cpu_util = float(cpu_util) if cpu_util else None |
| memory_util = float(memory_util) if memory_util else None |
| |
| |
| result = await enhanced_engine.process_event_enhanced( |
| component, latency, error_rate, throughput, cpu_util, memory_util |
| ) |
| |
| |
| table_data = [] |
| for event in events_history[-15:]: |
| table_data.append([ |
| event.timestamp[:19], |
| event.component, |
| event.latency_p99, |
| f"{event.error_rate:.3f}", |
| event.throughput, |
| event.severity.value.upper(), |
| "Multi-agent analysis" if 'multi_agent_analysis' in result else 'N/A' |
| ]) |
| |
| |
| status_emoji = "π¨" if result["status"] == "ANOMALY" else "β
" |
| output_msg = f"{status_emoji} {result['status']}" |
| |
| |
| if "multi_agent_analysis" in result: |
| analysis = result["multi_agent_analysis"] |
| confidence = analysis.get('incident_summary', {}).get('anomaly_confidence', 0) |
| output_msg += f"\nπ― Confidence: {confidence*100:.1f}%" |
| |
| if analysis.get('recommended_actions'): |
| output_msg += f"\nπ‘ Insights: {', '.join(analysis['recommended_actions'][:2])}" |
| |
| |
| if result["business_impact"]: |
| impact = result["business_impact"] |
| output_msg += f"\nπ° Business Impact: ${impact['revenue_loss_estimate']} | π₯ {impact['affected_users_estimate']} users | π¨ {impact['severity_level']}" |
| |
| |
| if result["healing_actions"] and result["healing_actions"] != ["no_action"]: |
| actions = ", ".join(result["healing_actions"]) |
| output_msg += f"\nπ§ Auto-Actions: {actions}" |
| |
| |
| agent_insights_data = result.get("multi_agent_analysis", {}) |
| |
| return ( |
| output_msg, |
| agent_insights_data, |
| gr.Dataframe( |
| headers=["Timestamp", "Component", "Latency", "Error Rate", "Throughput", "Severity", "Analysis"], |
| value=table_data, |
| wrap=True |
| ) |
| ) |
| |
| except Exception as e: |
| return f"β Error processing event: {str(e)}", {}, gr.Dataframe(value=[]) |
| |
| submit_btn.click( |
| fn=submit_event_enhanced, |
| inputs=[component, latency, error_rate, throughput, cpu_util, memory_util], |
| outputs=[output_text, agent_insights, events_table] |
| ) |
| |
| return demo |
|
|
| if __name__ == "__main__": |
| demo = create_enhanced_ui() |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False |
| ) |