| """ |
| Azure Infrastructure Simulator – Main orchestration engine. |
| """ |
|
|
| from typing import List, Optional, Dict, Any |
|
|
| from ..intents import ( |
| InfrastructureIntent, |
| ProvisionResourceIntent, |
| ) |
| from ..policies import ( |
| Policy, |
| PolicyEvaluator, |
| CostThresholdPolicy, |
| ) |
| from ..cost_estimator import CostEstimator |
| from ..risk_engine import RiskEngine |
| from ..healing_intent import ( |
| HealingIntent, |
| RecommendedAction, |
| IntentSource, |
| ) |
| from ..constants import MAX_POLICY_VIOLATIONS |
|
|
|
|
| class AzureInfrastructureSimulator: |
| def __init__( |
| self, |
| policy: Policy, |
| pricing_file: Optional[str] = None, |
| risk_factors: Optional[List] = None, |
| ): |
| self._policy_evaluator = PolicyEvaluator(policy) |
| self._cost_estimator = CostEstimator(pricing_file) |
| self._risk_engine = RiskEngine(risk_factors if risk_factors else RiskEngine.DEFAULT_FACTORS) |
|
|
| def evaluate(self, intent: InfrastructureIntent) -> HealingIntent: |
| cost = None |
| if isinstance(intent, ProvisionResourceIntent): |
| cost = self._cost_estimator.estimate_monthly_cost(intent) |
|
|
| context = {"cost_estimate": cost} if cost is not None else {} |
| violations = self._policy_evaluator.evaluate(intent, context) |
|
|
| if len(violations) > MAX_POLICY_VIOLATIONS: |
| violations = violations[:MAX_POLICY_VIOLATIONS] |
|
|
| risk_score, explanation, contributions = self._risk_engine.calculate_risk( |
| intent, cost, violations |
| ) |
|
|
| if risk_score > 0.8 or violations: |
| recommended_action = RecommendedAction.DENY |
| elif risk_score > 0.4: |
| recommended_action = RecommendedAction.ESCALATE |
| else: |
| recommended_action = RecommendedAction.APPROVE |
|
|
| justification_parts = [f"Risk score: {risk_score:.2f}."] |
| if cost is not None: |
| justification_parts.append(f"Estimated monthly cost: ${cost:.2f}.") |
| if violations: |
| justification_parts.append(f"Policy violations: {'; '.join(violations)}.") |
| justification_parts.append(explanation) |
| justification = " ".join(justification_parts) |
|
|
| intent_summary = f"{intent.intent_type} requested by {intent.requester}" |
|
|
| details = { |
| "cost_estimate": cost, |
| "violations": violations, |
| "risk_score": risk_score, |
| "factor_contributions": contributions, |
| } |
|
|
| healing_intent = HealingIntent( |
| intent_id=intent.intent_id, |
| intent_summary=intent_summary, |
| cost_projection=cost, |
| risk_score=risk_score, |
| policy_violations=violations, |
| recommended_action=recommended_action, |
| justification=justification, |
| confidence_score=0.9, |
| evaluation_details=details, |
| source=IntentSource.INFRASTRUCTURE_ANALYSIS, |
| ) |
|
|
| return healing_intent.mark_as_oss_advisory() |