petter2025's picture
Update core/data_models.py
2ec94ec verified
raw
history blame
4.53 kB
"""
Pythonic data models integrated with actual ARF OSS package
"""
from dataclasses import dataclass, asdict
from enum import Enum
from typing import Dict, List, Optional, Any, Tuple
import datetime
# Import from actual ARF OSS package
try:
from agentic_reliability_framework.arf_core.models.healing_intent import (
HealingIntent,
create_scale_out_intent,
create_rollback_intent,
create_restart_intent
)
from agentic_reliability_framework.arf_core.engine.simple_mcp_client import OSSMCPClient
ARF_OSS_AVAILABLE = True
except ImportError:
ARF_OSS_AVAILABLE = False
# Fallback mock classes for demo
class HealingIntent:
def __init__(self, **kwargs):
self.data = kwargs
class OSSMCPClient:
def analyze(self, *args, **kwargs):
return {"status": "OSS Analysis Complete"}
class IncidentSeverity(Enum):
"""Enum for incident severity levels"""
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
class DemoMode(Enum):
"""Enum for demo modes"""
QUICK = "quick"
COMPREHENSIVE = "comprehensive"
INVESTOR = "investor"
@dataclass
class OSSAnalysis:
"""Structured OSS analysis results - using actual ARF"""
status: str
recommendations: List[str]
estimated_time: str
engineers_needed: str
manual_effort: str
confidence_score: float = 0.95
healing_intent: Optional[Dict] = None
def to_dict(self) -> Dict:
"""Convert to dictionary, including healing intent if available"""
data = asdict(self)
if self.healing_intent:
data["healing_intent"] = {
"type": "HealingIntent",
"recommendations": self.recommendations,
"requires_execution": True
}
return data
@classmethod
def from_arf_analysis(cls, arf_result: Dict, scenario_name: str) -> 'OSSAnalysis':
"""Create from actual ARF analysis result"""
# This would be connected to actual ARF OSS analysis
recommendations = arf_result.get("recommendations", [
"Increase resource allocation",
"Implement monitoring",
"Add circuit breakers",
"Optimize configuration"
])
return cls(
status="✅ ARF OSS Analysis Complete",
recommendations=recommendations,
estimated_time="45-90 minutes",
engineers_needed="2-3 engineers",
manual_effort="High",
confidence_score=0.92,
healing_intent={
"scenario": scenario_name,
"actions": recommendations,
"execution_required": True,
"auto_execution": False # OSS is advisory only
}
)
@dataclass
class EnterpriseResults:
"""Structured enterprise execution results"""
actions_completed: List[str]
metrics_improvement: Dict[str, str]
business_impact: Dict[str, Any]
approval_required: bool = True
execution_time: str = ""
healing_intent_executed: bool = True
def to_dict(self) -> Dict:
data = asdict(self)
data["arf_enterprise"] = {
"execution_complete": True,
"learning_applied": True,
"audit_trail_created": True
}
return data
@dataclass
class IncidentScenario:
"""Pythonic incident scenario model with ARF integration"""
name: str
severity: IncidentSeverity
metrics: Dict[str, str]
impact: Dict[str, str]
arf_pattern: str = "" # ARF pattern name for RAG recall
oss_analysis: Optional[OSSAnalysis] = None
enterprise_results: Optional[EnterpriseResults] = None
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
data = {
"name": self.name,
"severity": self.severity.value,
"metrics": self.metrics,
"impact": self.impact,
"arf_oss_available": ARF_OSS_AVAILABLE
}
if self.oss_analysis:
data["oss_analysis"] = self.oss_analysis.to_dict()
if self.enterprise_results:
data["enterprise_results"] = self.enterprise_results.to_dict()
return data
@dataclass
class DemoStep:
"""Demo step for presenter guidance"""
title: str
scenario: Optional[str]
action: str
message: str
icon: str = "🎯"
arf_integration: bool = False # Whether this step uses actual ARF