petter2025's picture
Update infrastructure/risk_engine.py
0b9fac2 verified
raw
history blame
3.86 kB
"""
Risk Scoring Engine – Multi-factor probabilistic risk model.
"""
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field
from .intents import (
InfrastructureIntent,
ProvisionResourceIntent,
GrantAccessIntent,
DeployConfigurationIntent,
PermissionLevel,
Environment,
)
@dataclass(frozen=True)
class RiskFactor:
name: str
weight: float
score_fn: Callable[[InfrastructureIntent, Optional[float], List[str]], float]
description: str = ""
def __call__(self, intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
return self.score_fn(intent, cost, violations)
def intent_type_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
mapping = {
"provision_resource": 0.1,
"grant_access": 0.3,
"deploy_config": 0.2,
}
return mapping.get(intent.intent_type, 0.1)
def cost_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
if not isinstance(intent, ProvisionResourceIntent) or cost is None:
return 0.0
return min(cost / 5000.0, 1.0)
def permission_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
if not isinstance(intent, GrantAccessIntent):
return 0.0
mapping = {
PermissionLevel.READ: 0.1,
PermissionLevel.WRITE: 0.4,
PermissionLevel.ADMIN: 0.8,
}
return mapping.get(intent.permission_level, 0.5)
def scope_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
if not isinstance(intent, DeployConfigurationIntent):
return 0.0
mapping = {
"single_instance": 0.1,
"canary": 0.2,
"global": 0.6,
}
return mapping.get(intent.change_scope, 0.3)
def environment_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
if hasattr(intent, "environment") and intent.environment == Environment.PROD:
return 0.1
return 0.0
def policy_violation_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
return min(len(violations) * 0.2, 0.8)
class RiskEngine:
DEFAULT_FACTORS = [
RiskFactor("intent_type", 1.0, intent_type_factor, "Base risk from intent type"),
RiskFactor("cost", 0.3, cost_factor, "Normalized cost estimate"),
RiskFactor("permission", 0.3, permission_factor, "Permission level being granted"),
RiskFactor("scope", 0.2, scope_factor, "Deployment scope"),
RiskFactor("environment", 0.1, environment_factor, "Production environment"),
RiskFactor("policy_violations", 0.2, policy_violation_factor, "Number of policy violations"),
]
def __init__(self, factors: Optional[List[RiskFactor]] = None):
self.factors = factors if factors is not None else self.DEFAULT_FACTORS
def calculate_risk(
self,
intent: InfrastructureIntent,
cost_estimate: Optional[float],
policy_violations: List[str],
) -> Tuple[float, str, Dict[str, float]]:
total = 0.0
contributions = {}
for factor in self.factors:
raw_score = factor(intent, cost_estimate, policy_violations)
weighted = raw_score * factor.weight
contributions[factor.name] = weighted
total += weighted
total = max(0.0, min(total, 1.0))
lines = [f"Total risk score: {total:.2f}"]
for factor in self.factors:
contrib = contributions[factor.name]
if contrib > 0.0:
lines.append(f" - {factor.name}: {contrib:.2f} ({factor.description})")
explanation = "\n".join(lines)
return total, explanation, contributions