""" Pythonic data models integrated with actual ARF OSS package """ from dataclasses import dataclass, asdict from enum import Enum from typing import Dict, List, Optional, Any, Tuple import datetime # Import from actual ARF OSS package try: from agentic_reliability_framework.arf_core.models.healing_intent import ( HealingIntent, create_scale_out_intent, create_rollback_intent, create_restart_intent ) from agentic_reliability_framework.arf_core.engine.simple_mcp_client import OSSMCPClient ARF_OSS_AVAILABLE = True except ImportError: ARF_OSS_AVAILABLE = False # Fallback mock classes for demo class HealingIntent: def __init__(self, **kwargs): self.data = kwargs class OSSMCPClient: def analyze(self, *args, **kwargs): return {"status": "OSS Analysis Complete"} class IncidentSeverity(Enum): """Enum for incident severity levels""" LOW = "LOW" MEDIUM = "MEDIUM" HIGH = "HIGH" CRITICAL = "CRITICAL" class DemoMode(Enum): """Enum for demo modes""" QUICK = "quick" COMPREHENSIVE = "comprehensive" INVESTOR = "investor" @dataclass class OSSAnalysis: """Structured OSS analysis results - using actual ARF""" status: str recommendations: List[str] estimated_time: str engineers_needed: str manual_effort: str confidence_score: float = 0.95 healing_intent: Optional[Dict] = None def to_dict(self) -> Dict: """Convert to dictionary, including healing intent if available""" data = asdict(self) if self.healing_intent: data["healing_intent"] = { "type": "HealingIntent", "recommendations": self.recommendations, "requires_execution": True } return data @classmethod def from_arf_analysis(cls, arf_result: Dict, scenario_name: str) -> 'OSSAnalysis': """Create from actual ARF analysis result""" # This would be connected to actual ARF OSS analysis recommendations = arf_result.get("recommendations", [ "Increase resource allocation", "Implement monitoring", "Add circuit breakers", "Optimize configuration" ]) return cls( status="✅ ARF OSS Analysis Complete", recommendations=recommendations, estimated_time="45-90 minutes", engineers_needed="2-3 engineers", manual_effort="High", confidence_score=0.92, healing_intent={ "scenario": scenario_name, "actions": recommendations, "execution_required": True, "auto_execution": False # OSS is advisory only } ) @dataclass class EnterpriseResults: """Structured enterprise execution results""" actions_completed: List[str] metrics_improvement: Dict[str, str] business_impact: Dict[str, Any] approval_required: bool = True execution_time: str = "" healing_intent_executed: bool = True def to_dict(self) -> Dict: data = asdict(self) data["arf_enterprise"] = { "execution_complete": True, "learning_applied": True, "audit_trail_created": True } return data @dataclass class IncidentScenario: """Pythonic incident scenario model with ARF integration""" name: str severity: IncidentSeverity metrics: Dict[str, str] impact: Dict[str, str] arf_pattern: str = "" # ARF pattern name for RAG recall oss_analysis: Optional[OSSAnalysis] = None enterprise_results: Optional[EnterpriseResults] = None def to_dict(self) -> Dict: """Convert to dictionary for JSON serialization""" data = { "name": self.name, "severity": self.severity.value, "metrics": self.metrics, "impact": self.impact, "arf_oss_available": ARF_OSS_AVAILABLE } if self.oss_analysis: data["oss_analysis"] = self.oss_analysis.to_dict() if self.enterprise_results: data["enterprise_results"] = self.enterprise_results.to_dict() return data @dataclass class DemoStep: """Demo step for presenter guidance""" title: str scenario: Optional[str] action: str message: str icon: str = "🎯" arf_integration: bool = False # Whether this step uses actual ARF