petter2025's picture
Update infrastructure/healing_intent.py
9537d6e verified
raw
history blame
44.1 kB
"""
Healing Intent - OSS creates, Enterprise executes
"""
from dataclasses import dataclass, field, asdict
from typing import Dict, Any, Optional, List, ClassVar, Tuple, Union
from datetime import datetime
import hashlib
import json
import time
import uuid
from enum import Enum
import numpy as np
from .intents import InfrastructureIntent
from .constants import (
OSS_EDITION,
OSS_LICENSE,
ENTERPRISE_UPGRADE_URL,
EXECUTION_ALLOWED,
MAX_SIMILARITY_CACHE,
SIMILARITY_THRESHOLD,
MAX_POLICY_VIOLATIONS,
MAX_RISK_FACTORS,
MAX_COST_PROJECTIONS,
MAX_DECISION_TREE_DEPTH,
MAX_ALTERNATIVE_ACTIONS,
OSSBoundaryError,
)
class HealingIntentError(Exception): pass
class SerializationError(HealingIntentError): pass
class ValidationError(HealingIntentError): pass
class IntentSource(str, Enum):
OSS_ANALYSIS = "oss_analysis"
HUMAN_OVERRIDE = "human_override"
AUTOMATED_LEARNING = "automated_learning"
SCHEDULED_ACTION = "scheduled_action"
RAG_SIMILARITY = "rag_similarity"
INFRASTRUCTURE_ANALYSIS = "infrastructure_analysis"
POLICY_VIOLATION = "policy_violation"
COST_OPTIMIZATION = "cost_optimization"
class IntentStatus(str, Enum):
CREATED = "created"
PENDING_EXECUTION = "pending_execution"
EXECUTING = "executing"
EXECUTING_PARTIAL = "executing_partial"
COMPLETED = "completed"
COMPLETED_PARTIAL = "completed_partial"
FAILED = "failed"
REJECTED = "rejected"
CANCELLED = "cancelled"
ROLLED_BACK = "rolled_back"
OSS_ADVISORY_ONLY = "oss_advisory_only"
PENDING_APPROVAL = "pending_approval"
APPROVED = "approved"
APPROVED_WITH_OVERRIDES = "approved_with_overrides"
class RecommendedAction(str, Enum):
APPROVE = "approve"
DENY = "deny"
ESCALATE = "escalate"
DEFER = "defer"
class ConfidenceDistribution:
def __init__(self, mean: float, std: float = 0.05, samples: Optional[List[float]] = None):
self.mean = max(0.0, min(mean, 1.0))
self.std = max(0.0, min(std, 0.5))
self._samples = samples or list(np.random.normal(self.mean, self.std, 1000).clip(0, 1))
@property
def p5(self) -> float:
return float(np.percentile(self._samples, 5))
@property
def p50(self) -> float:
return float(np.percentile(self._samples, 50))
@property
def p95(self) -> float:
return float(np.percentile(self._samples, 95))
@property
def confidence_interval(self) -> Tuple[float, float]:
return (self.p5, self.p95)
def to_dict(self) -> Dict[str, float]:
return {
"mean": self.mean,
"std": self.std,
"p5": self.p5,
"p50": self.p50,
"p95": self.p95
}
@classmethod
def from_dict(cls, data: Dict[str, float]) -> "ConfidenceDistribution":
return cls(mean=data["mean"], std=data.get("std", 0.05), samples=None)
def __repr__(self) -> str:
return f"ConfidenceDistribution(mean={self.mean:.3f}, 95% CI=[{self.p5:.3f}, {self.p95:.3f}])"
@dataclass(frozen=True, slots=True)
class HealingIntent:
action: str
component: str
parameters: Dict[str, Any] = field(default_factory=dict)
justification: str = ""
confidence: float = 0.85
confidence_distribution: Optional[Dict[str, float]] = None
incident_id: str = ""
detected_at: float = field(default_factory=time.time)
risk_score: Optional[float] = None
risk_factors: Optional[Dict[str, float]] = None
cost_projection: Optional[float] = None
cost_confidence_interval: Optional[Tuple[float, float]] = None
recommended_action: Optional[RecommendedAction] = None
decision_tree: Optional[List[Dict[str, Any]]] = None
alternative_actions: Optional[List[Dict[str, Any]]] = None
risk_profile: Optional[str] = None
reasoning_chain: Optional[List[Dict[str, Any]]] = None
similar_incidents: Optional[List[Dict[str, Any]]] = None
rag_similarity_score: Optional[float] = None
source: IntentSource = IntentSource.OSS_ANALYSIS
intent_id: str = field(default_factory=lambda: f"intent_{uuid.uuid4().hex[:16]}")
created_at: float = field(default_factory=time.time)
status: IntentStatus = IntentStatus.CREATED
execution_id: Optional[str] = None
executed_at: Optional[float] = None
execution_result: Optional[Dict[str, Any]] = None
enterprise_metadata: Dict[str, Any] = field(default_factory=dict)
human_overrides: List[Dict[str, Any]] = field(default_factory=list)
approvals: List[Dict[str, Any]] = field(default_factory=list)
comments: List[Dict[str, Any]] = field(default_factory=list)
oss_edition: str = OSS_EDITION
oss_license: str = OSS_LICENSE
requires_enterprise: bool = True
execution_allowed: bool = EXECUTION_ALLOWED
infrastructure_intent_id: Optional[str] = None
policy_violations: List[str] = field(default_factory=list)
infrastructure_intent: Optional[Dict[str, Any]] = None
MIN_CONFIDENCE: ClassVar[float] = 0.0
MAX_CONFIDENCE: ClassVar[float] = 1.0
MAX_JUSTIFICATION_LENGTH: ClassVar[int] = 5000
MAX_PARAMETERS_SIZE: ClassVar[int] = 100
MAX_SIMILAR_INCIDENTS: ClassVar[int] = MAX_SIMILARITY_CACHE
VERSION: ClassVar[str] = "2.0.0"
def __post_init__(self) -> None:
self._validate_oss_boundaries()
self._validate_risk_integration()
def _validate_oss_boundaries(self) -> None:
errors: List[str] = []
if not (self.MIN_CONFIDENCE <= self.confidence <= self.MAX_CONFIDENCE):
errors.append(f"Confidence must be between {self.MIN_CONFIDENCE} and {self.MAX_CONFIDENCE}, got {self.confidence}")
if len(self.justification) > self.MAX_JUSTIFICATION_LENGTH:
errors.append(f"Justification exceeds max length {self.MAX_JUSTIFICATION_LENGTH}")
if not self.action or not self.action.strip():
errors.append("Action cannot be empty")
if not self.component or not self.component.strip():
errors.append("Component cannot be empty")
if len(self.parameters) > self.MAX_PARAMETERS_SIZE:
errors.append(f"Too many parameters: {len(self.parameters)} > {self.MAX_PARAMETERS_SIZE}")
try:
json.dumps(self.parameters)
except (TypeError, ValueError) as e:
errors.append(f"Parameters must be JSON serializable: {e}")
if self.similar_incidents and len(self.similar_incidents) > self.MAX_SIMILAR_INCIDENTS:
errors.append(f"Too many similar incidents: {len(self.similar_incidents)} > {self.MAX_SIMILAR_INCIDENTS}")
if self.oss_edition == OSS_EDITION:
if self.execution_allowed:
errors.append("Execution not allowed in OSS edition")
if self.status == IntentStatus.EXECUTING:
errors.append("EXECUTING status not allowed in OSS edition")
if self.executed_at is not None:
errors.append("executed_at should not be set in OSS edition")
if self.execution_id is not None:
errors.append("execution_id should not be set in OSS edition")
if errors:
raise ValidationError("HealingIntent validation failed:\n" + "\n".join(f" • {error}" for error in errors))
def _validate_risk_integration(self) -> None:
if self.risk_score is not None and not (0.0 <= self.risk_score <= 1.0):
raise ValidationError(f"Risk score must be between 0 and 1, got {self.risk_score}")
if self.cost_projection is not None and self.cost_projection < 0:
raise ValidationError(f"Cost projection cannot be negative, got {self.cost_projection}")
if self.cost_confidence_interval is not None:
low, high = self.cost_confidence_interval
if low > high:
raise ValidationError(f"Invalid confidence interval: [{low}, {high}]")
@property
def deterministic_id(self) -> str:
data = {
"action": self.action,
"component": self.component,
"parameters": self._normalize_parameters(self.parameters),
"incident_id": self.incident_id,
"detected_at": int(self.detected_at),
"oss_edition": self.oss_edition,
}
json_str = json.dumps(data, sort_keys=True, default=str)
hash_digest = hashlib.sha256(json_str.encode()).hexdigest()
return f"intent_{hash_digest[:16]}"
@property
def age_seconds(self) -> float:
return time.time() - self.created_at
@property
def is_executable(self) -> bool:
if self.oss_edition == OSS_EDITION:
return False
return self.status in [IntentStatus.CREATED, IntentStatus.PENDING_EXECUTION, IntentStatus.APPROVED]
@property
def is_oss_advisory(self) -> bool:
return self.oss_edition == OSS_EDITION and not self.execution_allowed
@property
def requires_enterprise_upgrade(self) -> bool:
return self.requires_enterprise and self.oss_edition == OSS_EDITION
@property
def confidence_interval(self) -> Optional[Tuple[float, float]]:
if self.confidence_distribution:
return (self.confidence_distribution.get("p5", self.confidence), self.confidence_distribution.get("p95", self.confidence))
return None
def to_enterprise_request(self) -> Dict[str, Any]:
return {
"intent_id": self.deterministic_id,
"action": self.action,
"component": self.component,
"parameters": self.parameters,
"justification": self.justification,
"confidence": self.confidence,
"confidence_interval": self.confidence_interval,
"risk_score": self.risk_score,
"cost_projection": self.cost_projection,
"incident_id": self.incident_id,
"detected_at": self.detected_at,
"created_at": self.created_at,
"source": self.source.value,
"recommended_action": self.recommended_action.value if self.recommended_action else None,
"oss_edition": self.oss_edition,
"oss_license": self.oss_license,
"requires_enterprise": self.requires_enterprise,
"execution_allowed": self.execution_allowed,
"version": self.VERSION,
"oss_metadata": {
"similar_incidents_count": len(self.similar_incidents) if self.similar_incidents else 0,
"rag_similarity_score": self.rag_similarity_score,
"has_reasoning_chain": self.reasoning_chain is not None,
"source": self.source.value,
"is_oss_advisory": self.is_oss_advisory,
"risk_factors": self.risk_factors,
"policy_violations_count": len(self.policy_violations) if self.policy_violations else 0,
"confidence_basis": self._get_confidence_basis(),
"learning_applied": False,
"learning_reason": "OSS advisory mode does not persist or learn from outcomes",
},
"upgrade_url": ENTERPRISE_UPGRADE_URL,
"enterprise_features": [
"autonomous_execution",
"approval_workflows",
"persistent_storage",
"learning_engine",
"audit_trails",
"compliance_reports",
"multi_tenant_support",
"sso_integration",
"24_7_support",
"probabilistic_confidence",
"risk_analytics",
"cost_optimization"
]
}
def _get_confidence_basis(self) -> str:
if self.recommended_action == RecommendedAction.DENY and self.policy_violations:
return "policy_violation"
if self.rag_similarity_score and self.rag_similarity_score > SIMILARITY_THRESHOLD:
return "historical_similarity"
if self.risk_score is not None:
return "risk_based"
return "policy_only"
def to_dict(self, include_oss_context: bool = False) -> Dict[str, Any]:
data = asdict(self)
if "source" in data and isinstance(data["source"], IntentSource):
data["source"] = self.source.value
if "status" in data and isinstance(data["status"], IntentStatus):
data["status"] = self.status.value
if "recommended_action" in data and isinstance(data["recommended_action"], RecommendedAction):
data["recommended_action"] = self.recommended_action.value if self.recommended_action else None
if not include_oss_context:
data.pop("reasoning_chain", None)
data.pop("similar_incidents", None)
data.pop("rag_similarity_score", None)
data.pop("decision_tree", None)
data.pop("alternative_actions", None)
data.pop("infrastructure_intent", None)
data["deterministic_id"] = self.deterministic_id
data["age_seconds"] = self.age_seconds
data["is_executable"] = self.is_executable
data["is_oss_advisory"] = self.is_oss_advisory
data["requires_enterprise_upgrade"] = self.requires_enterprise_upgrade
data["version"] = self.VERSION
data["confidence_interval"] = self.confidence_interval
return data
def with_execution_result(self, execution_id: str, executed_at: float, result: Dict[str, Any],
status: IntentStatus = IntentStatus.COMPLETED, metadata: Optional[Dict[str, Any]] = None) -> "HealingIntent":
return HealingIntent(
action=self.action, component=self.component, parameters=self.parameters, justification=self.justification,
confidence=self.confidence, confidence_distribution=self.confidence_distribution, incident_id=self.incident_id,
detected_at=self.detected_at, risk_score=self.risk_score, risk_factors=self.risk_factors,
cost_projection=self.cost_projection, cost_confidence_interval=self.cost_confidence_interval,
recommended_action=self.recommended_action, decision_tree=self.decision_tree,
alternative_actions=self.alternative_actions, risk_profile=self.risk_profile,
reasoning_chain=self.reasoning_chain, similar_incidents=self.similar_incidents,
rag_similarity_score=self.rag_similarity_score, source=self.source, intent_id=self.intent_id,
created_at=self.created_at, oss_edition=self.oss_edition, oss_license=self.oss_license,
requires_enterprise=self.requires_enterprise, execution_allowed=self.execution_allowed,
infrastructure_intent_id=self.infrastructure_intent_id, policy_violations=self.policy_violations,
infrastructure_intent=self.infrastructure_intent, status=status, execution_id=execution_id,
executed_at=executed_at, execution_result=result,
enterprise_metadata={**(self.enterprise_metadata or {}), **(metadata or {})},
human_overrides=self.human_overrides, approvals=self.approvals, comments=self.comments
)
def with_human_approval(self, approver: str, approval_time: float, comments: Optional[str] = None,
overrides: Optional[Dict[str, Any]] = None) -> "HealingIntent":
approval_record = {"approver": approver, "timestamp": approval_time, "comments": comments, "overrides": overrides}
new_overrides = list(self.human_overrides)
if overrides:
new_overrides.append({"overrider": approver, "timestamp": approval_time, "overrides": overrides, "reason": comments})
new_approvals = list(self.approvals) + [approval_record]
new_comments = list(self.comments)
if comments:
new_comments.append({"author": approver, "timestamp": approval_time, "comment": comments})
return HealingIntent(
action=self.action, component=self.component, parameters=self.parameters, justification=self.justification,
confidence=self.confidence, confidence_distribution=self.confidence_distribution, incident_id=self.incident_id,
detected_at=self.detected_at, risk_score=self.risk_score, risk_factors=self.risk_factors,
cost_projection=self.cost_projection, cost_confidence_interval=self.cost_confidence_interval,
recommended_action=self.recommended_action, decision_tree=self.decision_tree,
alternative_actions=self.alternative_actions, risk_profile=self.risk_profile,
reasoning_chain=self.reasoning_chain, similar_incidents=self.similar_incidents,
rag_similarity_score=self.rag_similarity_score, source=self.source, intent_id=self.intent_id,
created_at=self.created_at, status=IntentStatus.APPROVED_WITH_OVERRIDES if overrides else IntentStatus.APPROVED,
execution_id=self.execution_id, executed_at=self.executed_at, execution_result=self.execution_result,
enterprise_metadata=self.enterprise_metadata, human_overrides=new_overrides, approvals=new_approvals,
comments=new_comments, oss_edition=self.oss_edition, oss_license=self.oss_license,
requires_enterprise=self.requires_enterprise, execution_allowed=self.execution_allowed,
infrastructure_intent_id=self.infrastructure_intent_id, policy_violations=self.policy_violations,
infrastructure_intent=self.infrastructure_intent
)
def mark_as_sent_to_enterprise(self) -> "HealingIntent":
return HealingIntent(
action=self.action, component=self.component, parameters=self.parameters, justification=self.justification,
confidence=self.confidence, confidence_distribution=self.confidence_distribution, incident_id=self.incident_id,
detected_at=self.detected_at, risk_score=self.risk_score, risk_factors=self.risk_factors,
cost_projection=self.cost_projection, cost_confidence_interval=self.cost_confidence_interval,
recommended_action=self.recommended_action, decision_tree=self.decision_tree,
alternative_actions=self.alternative_actions, risk_profile=self.risk_profile,
reasoning_chain=self.reasoning_chain, similar_incidents=self.similar_incidents,
rag_similarity_score=self.rag_similarity_score, source=self.source, intent_id=self.intent_id,
created_at=self.created_at, status=IntentStatus.PENDING_EXECUTION, execution_id=self.execution_id,
executed_at=self.executed_at, execution_result=self.execution_result, enterprise_metadata=self.enterprise_metadata,
human_overrides=self.human_overrides, approvals=self.approvals, comments=self.comments,
oss_edition=self.oss_edition, oss_license=self.oss_license, requires_enterprise=self.requires_enterprise,
execution_allowed=self.execution_allowed, infrastructure_intent_id=self.infrastructure_intent_id,
policy_violations=self.policy_violations, infrastructure_intent=self.infrastructure_intent
)
def mark_as_oss_advisory(self) -> "HealingIntent":
return HealingIntent(
action=self.action, component=self.component, parameters=self.parameters, justification=self.justification,
confidence=self.confidence, confidence_distribution=self.confidence_distribution, incident_id=self.incident_id,
detected_at=self.detected_at, risk_score=self.risk_score, risk_factors=self.risk_factors,
cost_projection=self.cost_projection, cost_confidence_interval=self.cost_confidence_interval,
recommended_action=self.recommended_action, decision_tree=self.decision_tree,
alternative_actions=self.alternative_actions, risk_profile=self.risk_profile,
reasoning_chain=self.reasoning_chain, similar_incidents=self.similar_incidents,
rag_similarity_score=self.rag_similarity_score, source=self.source, intent_id=self.intent_id,
created_at=self.created_at, status=IntentStatus.OSS_ADVISORY_ONLY, execution_id=self.execution_id,
executed_at=self.executed_at, execution_result=self.execution_result, enterprise_metadata=self.enterprise_metadata,
human_overrides=self.human_overrides, approvals=self.approvals, comments=self.comments,
oss_edition=self.oss_edition, oss_license=self.oss_license, requires_enterprise=self.requires_enterprise,
execution_allowed=False, infrastructure_intent_id=self.infrastructure_intent_id,
policy_violations=self.policy_violations, infrastructure_intent=self.infrastructure_intent
)
@classmethod
def from_infrastructure_intent(cls, infrastructure_intent: Any, action: str, component: str,
parameters: Dict[str, Any], justification: str, confidence: float = 0.85,
risk_score: Optional[float] = None, risk_factors: Optional[Dict[str, float]] = None,
cost_projection: Optional[float] = None, policy_violations: Optional[List[str]] = None,
recommended_action: Optional[RecommendedAction] = None,
source: IntentSource = IntentSource.INFRASTRUCTURE_ANALYSIS) -> "HealingIntent":
infrastructure_intent_id = getattr(infrastructure_intent, 'intent_id', None)
if hasattr(infrastructure_intent, 'model_dump'):
intent_dict = infrastructure_intent.model_dump()
elif hasattr(infrastructure_intent, 'to_dict'):
intent_dict = infrastructure_intent.to_dict()
else:
intent_dict = {"type": str(type(infrastructure_intent))}
return cls(
action=action, component=component, parameters=parameters, justification=justification, confidence=confidence,
risk_score=risk_score, risk_factors=risk_factors, cost_projection=cost_projection,
policy_violations=policy_violations or [], recommended_action=recommended_action, source=source,
infrastructure_intent_id=infrastructure_intent_id, infrastructure_intent=intent_dict,
oss_edition=OSS_EDITION, requires_enterprise=True, execution_allowed=False
)
@classmethod
def from_analysis(cls, action: str, component: str, parameters: Dict[str, Any], justification: str,
confidence: float, confidence_std: float = 0.05,
similar_incidents: Optional[List[Dict[str, Any]]] = None,
reasoning_chain: Optional[List[Dict[str, Any]]] = None, incident_id: str = "",
source: IntentSource = IntentSource.OSS_ANALYSIS, rag_similarity_score: Optional[float] = None,
risk_score: Optional[float] = None, cost_projection: Optional[float] = None) -> "HealingIntent":
if similar_incidents and len(similar_incidents) > cls.MAX_SIMILAR_INCIDENTS:
similar_incidents = similar_incidents[:cls.MAX_SIMILAR_INCIDENTS]
conf_dist = ConfidenceDistribution(confidence, confidence_std)
enhanced_confidence = confidence
if similar_incidents:
similarity_scores = [inc.get("similarity", 0.0) for inc in similar_incidents if "similarity" in inc]
if similarity_scores:
avg_similarity = sum(similarity_scores) / len(similarity_scores)
confidence_boost = min(0.2, avg_similarity * 0.3)
enhanced_confidence = min(confidence * (1.0 + confidence_boost), cls.MAX_CONFIDENCE)
final_rag_score = rag_similarity_score
if final_rag_score is None and similar_incidents and len(similar_incidents) > 0:
top_similarities = [inc.get("similarity", 0.0) for inc in similar_incidents[:3] if "similarity" in inc]
if top_similarities:
final_rag_score = sum(top_similarities) / len(top_similarities)
return cls(
action=action, component=component, parameters=parameters, justification=justification,
confidence=enhanced_confidence, confidence_distribution=conf_dist.to_dict(), incident_id=incident_id,
similar_incidents=similar_incidents, reasoning_chain=reasoning_chain, rag_similarity_score=final_rag_score,
source=source, risk_score=risk_score, cost_projection=cost_projection,
oss_edition=OSS_EDITION, requires_enterprise=True, execution_allowed=False
)
@classmethod
def from_rag_recommendation(cls, action: str, component: str, parameters: Dict[str, Any],
rag_similarity_score: float, similar_incidents: List[Dict[str, Any]],
justification_template: str = "Based on {count} similar historical incidents with {success_rate:.0%} success rate",
success_rate: Optional[float] = None, risk_score: Optional[float] = None,
cost_projection: Optional[float] = None) -> "HealingIntent":
if not similar_incidents:
raise ValidationError("RAG recommendation requires similar incidents")
if success_rate is None:
if len(similar_incidents) == 0:
success_rate = 0.0
else:
successful = sum(1 for inc in similar_incidents if inc.get("success", False))
success_rate = successful / len(similar_incidents)
justification = justification_template.format(count=len(similar_incidents), success_rate=success_rate or 0.0,
action=action, component=component)
base_confidence = rag_similarity_score * 0.8
if success_rate:
base_confidence = base_confidence * (0.7 + success_rate * 0.3)
return cls.from_analysis(action=action, component=component, parameters=parameters, justification=justification,
confidence=min(base_confidence, 0.95), similar_incidents=similar_incidents,
incident_id=similar_incidents[0].get("incident_id", "") if similar_incidents else "",
source=IntentSource.RAG_SIMILARITY, rag_similarity_score=rag_similarity_score,
risk_score=risk_score, cost_projection=cost_projection)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "HealingIntent":
version = data.get("version", "1.0.0")
clean_data = data.copy()
if "source" in clean_data and isinstance(clean_data["source"], str):
clean_data["source"] = IntentSource(clean_data["source"])
if "status" in clean_data and isinstance(clean_data["status"], str):
clean_data["status"] = IntentStatus(clean_data["status"])
if "recommended_action" in clean_data and isinstance(clean_data["recommended_action"], str):
try:
clean_data["recommended_action"] = RecommendedAction(clean_data["recommended_action"])
except ValueError:
clean_data["recommended_action"] = None
clean_data.pop("deterministic_id", None)
clean_data.pop("age_seconds", None)
clean_data.pop("is_executable", None)
clean_data.pop("is_oss_advisory", None)
clean_data.pop("requires_enterprise_upgrade", None)
clean_data.pop("version", None)
clean_data.pop("confidence_interval", None)
return cls(**clean_data)
def _normalize_parameters(self, params: Dict[str, Any]) -> Dict[str, Any]:
normalized: Dict[str, Any] = {}
for key, value in sorted(params.items()):
normalized[key] = self._normalize_value(value)
return normalized
def _normalize_value(self, value: Any) -> Any:
if isinstance(value, (int, float, str, bool, type(None))):
return value
elif isinstance(value, (list, tuple, set)):
return tuple(sorted(self._normalize_value(v) for v in value))
elif isinstance(value, dict):
return self._normalize_parameters(value)
elif hasattr(value, '__dict__'):
return self._normalize_parameters(value.__dict__)
else:
try:
return str(value)
except Exception:
return f"<unserializable:{type(value).__name__}>"
def get_oss_context(self) -> Dict[str, Any]:
return {
"reasoning_chain": self.reasoning_chain,
"similar_incidents": self.similar_incidents,
"rag_similarity_score": self.rag_similarity_score,
"decision_tree": self.decision_tree,
"alternative_actions": self.alternative_actions,
"analysis_timestamp": datetime.fromtimestamp(self.detected_at).isoformat(),
"source": self.source.value,
"created_at": datetime.fromtimestamp(self.created_at).isoformat(),
"oss_edition": self.oss_edition,
"is_oss_advisory": self.is_oss_advisory,
"infrastructure_intent": self.infrastructure_intent,
}
def get_execution_summary(self) -> Dict[str, Any]:
summary = {
"intent_id": self.deterministic_id,
"action": self.action,
"component": self.component,
"confidence": self.confidence,
"confidence_interval": self.confidence_interval,
"risk_score": self.risk_score,
"cost_projection": self.cost_projection,
"status": self.status.value,
"created_at": datetime.fromtimestamp(self.created_at).isoformat(),
"age_seconds": self.age_seconds,
"oss_edition": self.oss_edition,
"requires_enterprise": self.requires_enterprise,
"is_oss_advisory": self.is_oss_advisory,
"source": self.source.value,
"policy_violations_count": len(self.policy_violations) if self.policy_violations else 0,
"confidence_basis": self._get_confidence_basis(),
}
if self.executed_at:
summary["executed_at"] = datetime.fromtimestamp(self.executed_at).isoformat()
summary["execution_duration_seconds"] = self.executed_at - self.created_at
if self.execution_result:
summary["execution_success"] = self.execution_result.get("success", False)
summary["execution_message"] = self.execution_result.get("message", "")
if self.rag_similarity_score:
summary["rag_similarity_score"] = self.rag_similarity_score
if self.similar_incidents:
summary["similar_incidents_count"] = len(self.similar_incidents)
if self.approvals:
summary["approvals_count"] = len(self.approvals)
summary["approved_by"] = [a.get("approver") for a in self.approvals if a.get("approver")]
if self.human_overrides:
summary["overrides_count"] = len(self.human_overrides)
return summary
def is_immutable(self) -> bool:
try:
object.__setattr__(self, '_test_immutable', True)
return False
except Exception:
return True
def __repr__(self) -> str:
return (f"HealingIntent(id={self.deterministic_id[:8]}..., action={self.action}, "
f"component={self.component}, confidence={self.confidence:.2f}, "
f"risk={self.risk_score:.2f if self.risk_score else 'N/A'}, status={self.status.value})")
class HealingIntentSerializer:
SCHEMA_VERSION: ClassVar[str] = "2.0.0"
@classmethod
def serialize(cls, intent: HealingIntent, version: str = "2.0.0") -> Dict[str, Any]:
try:
if version == "2.0.0":
return {
"version": version,
"schema_version": cls.SCHEMA_VERSION,
"data": intent.to_dict(include_oss_context=True),
"metadata": {
"serialized_at": time.time(),
"deterministic_id": intent.deterministic_id,
"is_executable": intent.is_executable,
"is_oss_advisory": intent.is_oss_advisory,
"requires_enterprise_upgrade": intent.requires_enterprise_upgrade,
"oss_edition": intent.oss_edition,
"has_probabilistic_confidence": intent.confidence_distribution is not None,
"has_risk_assessment": intent.risk_score is not None,
"has_cost_projection": intent.cost_projection is not None,
}
}
elif version in ["1.1.0", "1.0.0"]:
data = intent.to_dict(include_oss_context=True)
data.pop("confidence_distribution", None)
data.pop("risk_score", None)
data.pop("risk_factors", None)
data.pop("cost_projection", None)
data.pop("cost_confidence_interval", None)
data.pop("recommended_action", None)
data.pop("decision_tree", None)
data.pop("alternative_actions", None)
data.pop("risk_profile", None)
data.pop("human_overrides", None)
data.pop("approvals", None)
data.pop("comments", None)
data.pop("infrastructure_intent_id", None)
data.pop("policy_violations", None)
data.pop("infrastructure_intent", None)
if data.get("status") in [
IntentStatus.EXECUTING_PARTIAL.value,
IntentStatus.COMPLETED_PARTIAL.value,
IntentStatus.ROLLED_BACK.value,
IntentStatus.PENDING_APPROVAL.value,
IntentStatus.APPROVED.value,
IntentStatus.APPROVED_WITH_OVERRIDES.value
]:
data["status"] = IntentStatus.PENDING_EXECUTION.value
return {
"version": version,
"schema_version": "1.1.0" if version == "1.1.0" else "1.0.0",
"data": data,
"metadata": {
"serialized_at": time.time(),
"deterministic_id": intent.deterministic_id,
"is_executable": intent.is_executable,
"is_oss_advisory": intent.is_oss_advisory,
}
}
else:
raise SerializationError(f"Unsupported version: {version}")
except Exception as e:
raise SerializationError(f"Failed to serialize HealingIntent: {e}") from e
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> HealingIntent:
try:
version = data.get("version", "1.0.0")
intent_data = data.get("data", data)
if version in ["2.0.0", "1.1.0", "1.0.0"]:
if version.startswith("1."):
intent_data.setdefault("confidence_distribution", None)
intent_data.setdefault("risk_score", None)
intent_data.setdefault("risk_factors", None)
intent_data.setdefault("cost_projection", None)
intent_data.setdefault("cost_confidence_interval", None)
intent_data.setdefault("recommended_action", None)
intent_data.setdefault("decision_tree", None)
intent_data.setdefault("alternative_actions", None)
intent_data.setdefault("risk_profile", None)
intent_data.setdefault("human_overrides", [])
intent_data.setdefault("approvals", [])
intent_data.setdefault("comments", [])
intent_data.setdefault("infrastructure_intent_id", None)
intent_data.setdefault("policy_violations", [])
intent_data.setdefault("infrastructure_intent", None)
return HealingIntent.from_dict(intent_data)
else:
raise SerializationError(f"Unsupported version: {version}")
except KeyError as e:
raise SerializationError(f"Missing required field in serialized data: {e}") from e
except Exception as e:
raise SerializationError(f"Failed to deserialize HealingIntent: {e}") from e
@classmethod
def to_json(cls, intent: HealingIntent, pretty: bool = False) -> str:
try:
serialized = cls.serialize(intent)
return json.dumps(serialized, indent=2 if pretty else None, default=str)
except Exception as e:
raise SerializationError(f"Failed to convert to JSON: {e}") from e
@classmethod
def from_json(cls, json_str: str) -> HealingIntent:
try:
data = json.loads(json_str)
return cls.deserialize(data)
except json.JSONDecodeError as e:
raise SerializationError(f"Invalid JSON: {e}") from e
except Exception as e:
raise SerializationError(f"Failed to parse JSON: {e}") from e
@classmethod
def to_enterprise_json(cls, intent: HealingIntent) -> str:
try:
enterprise_request = intent.to_enterprise_request()
return json.dumps(enterprise_request, default=str)
except Exception as e:
raise SerializationError(f"Failed to create Enterprise JSON: {e}") from e
@classmethod
def validate_for_oss(cls, intent: HealingIntent) -> bool:
try:
if intent.oss_edition != OSS_EDITION:
return False
if intent.execution_allowed:
return False
if intent.similar_incidents and len(intent.similar_incidents) > HealingIntent.MAX_SIMILAR_INCIDENTS:
return False
if not intent.is_immutable():
return False
if intent.executed_at is not None or intent.execution_id is not None:
return False
return True
except Exception:
return False
# Factory functions
def create_infrastructure_healing_intent(infrastructure_result: Any, action_mapping: Optional[Dict[str, str]] = None) -> HealingIntent:
if action_mapping is None:
action_mapping = {"approve": "execute", "deny": "block", "escalate": "escalate", "defer": "defer"}
recommended_action = getattr(infrastructure_result, 'recommended_action', None)
if recommended_action and hasattr(recommended_action, 'value'):
action = action_mapping.get(recommended_action.value, "review")
else:
action = "review"
parameters = {
"infrastructure_intent_id": getattr(infrastructure_result, 'intent_id', None),
"risk_score": getattr(infrastructure_result, 'risk_score', None),
"cost_projection": getattr(infrastructure_result, 'cost_projection', None),
"policy_violations": getattr(infrastructure_result, 'policy_violations', []),
"evaluation_details": getattr(infrastructure_result, 'evaluation_details', {})
}
justification_parts = [getattr(infrastructure_result, 'justification', "Infrastructure analysis completed")]
policy_violations = getattr(infrastructure_result, 'policy_violations', [])
if policy_violations:
justification_parts.append(f"Policy violations: {'; '.join(policy_violations)}")
return HealingIntent.from_infrastructure_intent(
infrastructure_intent=getattr(infrastructure_result, 'infrastructure_intent', None),
action=action,
component="infrastructure",
parameters=parameters,
justification=" ".join(justification_parts),
confidence=getattr(infrastructure_result, 'confidence_score', 0.85),
risk_score=getattr(infrastructure_result, 'risk_score', None),
policy_violations=policy_violations,
recommended_action=recommended_action,
source=IntentSource.INFRASTRUCTURE_ANALYSIS
).mark_as_oss_advisory()
def create_rollback_intent(component: str, revision: str = "previous", justification: str = "",
incident_id: str = "", similar_incidents: Optional[List[Dict[str, Any]]] = None,
rag_similarity_score: Optional[float] = None, risk_score: Optional[float] = None,
cost_projection: Optional[float] = None) -> HealingIntent:
if not justification:
justification = f"Rollback {component} to {revision} revision"
return HealingIntent.from_analysis(
action="rollback",
component=component,
parameters={"revision": revision},
justification=justification,
confidence=0.9,
similar_incidents=similar_incidents,
incident_id=incident_id,
rag_similarity_score=rag_similarity_score,
risk_score=risk_score,
cost_projection=cost_projection,
).mark_as_oss_advisory()
def create_restart_intent(component: str, container_id: Optional[str] = None, justification: str = "",
incident_id: str = "", similar_incidents: Optional[List[Dict[str, Any]]] = None,
rag_similarity_score: Optional[float] = None, risk_score: Optional[float] = None,
cost_projection: Optional[float] = None) -> HealingIntent:
parameters = {}
if container_id:
parameters["container_id"] = container_id
if not justification:
justification = f"Restart container for {component}"
return HealingIntent.from_analysis(
action="restart_container",
component=component,
parameters=parameters,
justification=justification,
confidence=0.85,
similar_incidents=similar_incidents,
incident_id=incident_id,
rag_similarity_score=rag_similarity_score,
risk_score=risk_score,
cost_projection=cost_projection,
).mark_as_oss_advisory()
def create_scale_out_intent(component: str, scale_factor: int = 2, justification: str = "",
incident_id: str = "", similar_incidents: Optional[List[Dict[str, Any]]] = None,
rag_similarity_score: Optional[float] = None, risk_score: Optional[float] = None,
cost_projection: Optional[float] = None) -> HealingIntent:
if not justification:
justification = f"Scale out {component} by factor {scale_factor}"
return HealingIntent.from_analysis(
action="scale_out",
component=component,
parameters={"scale_factor": scale_factor},
justification=justification,
confidence=0.8,
similar_incidents=similar_incidents,
incident_id=incident_id,
rag_similarity_score=rag_similarity_score,
risk_score=risk_score,
cost_projection=cost_projection,
).mark_as_oss_advisory()
def create_oss_advisory_intent(action: str, component: str, parameters: Dict[str, Any], justification: str,
confidence: float = 0.85, incident_id: str = "", risk_score: Optional[float] = None,
cost_projection: Optional[float] = None) -> HealingIntent:
return HealingIntent(
action=action,
component=component,
parameters=parameters,
justification=justification,
confidence=confidence,
incident_id=incident_id,
risk_score=risk_score,
cost_projection=cost_projection,
oss_edition=OSS_EDITION,
requires_enterprise=True,
execution_allowed=False,
status=IntentStatus.OSS_ADVISORY_ONLY,
)
__all__ = [
"HealingIntent",
"ConfidenceDistribution",
"HealingIntentSerializer",
"IntentSource",
"IntentStatus",
"RecommendedAction",
"HealingIntentError",
"SerializationError",
"ValidationError",
"create_infrastructure_healing_intent",
"create_rollback_intent",
"create_restart_intent",
"create_scale_out_intent",
"create_oss_advisory_intent",
]