""" Risk Scoring Engine – Multi-factor probabilistic risk model. """ from typing import Dict, List, Optional, Tuple, Any, Callable from dataclasses import dataclass, field from .intents import ( InfrastructureIntent, ProvisionResourceIntent, GrantAccessIntent, DeployConfigurationIntent, PermissionLevel, Environment, ) @dataclass(frozen=True) class RiskFactor: name: str weight: float score_fn: Callable[[InfrastructureIntent, Optional[float], List[str]], float] description: str = "" def __call__(self, intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float: return self.score_fn(intent, cost, violations) def intent_type_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float: mapping = { "provision_resource": 0.1, "grant_access": 0.3, "deploy_config": 0.2, } return mapping.get(intent.intent_type, 0.1) def cost_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float: if not isinstance(intent, ProvisionResourceIntent) or cost is None: return 0.0 return min(cost / 5000.0, 1.0) def permission_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float: if not isinstance(intent, GrantAccessIntent): return 0.0 mapping = { PermissionLevel.READ: 0.1, PermissionLevel.WRITE: 0.4, PermissionLevel.ADMIN: 0.8, } return mapping.get(intent.permission_level, 0.5) def scope_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float: if not isinstance(intent, DeployConfigurationIntent): return 0.0 mapping = { "single_instance": 0.1, "canary": 0.2, "global": 0.6, } return mapping.get(intent.change_scope, 0.3) def environment_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float: if hasattr(intent, "environment") and intent.environment == Environment.PROD: return 0.1 return 0.0 def policy_violation_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float: return min(len(violations) * 0.2, 0.8) class RiskEngine: DEFAULT_FACTORS = [ RiskFactor("intent_type", 1.0, intent_type_factor, "Base risk from intent type"), RiskFactor("cost", 0.3, cost_factor, "Normalized cost estimate"), RiskFactor("permission", 0.3, permission_factor, "Permission level being granted"), RiskFactor("scope", 0.2, scope_factor, "Deployment scope"), RiskFactor("environment", 0.1, environment_factor, "Production environment"), RiskFactor("policy_violations", 0.2, policy_violation_factor, "Number of policy violations"), ] def __init__(self, factors: Optional[List[RiskFactor]] = None): self.factors = factors if factors is not None else self.DEFAULT_FACTORS def calculate_risk( self, intent: InfrastructureIntent, cost_estimate: Optional[float], policy_violations: List[str], ) -> Tuple[float, str, Dict[str, float]]: total = 0.0 contributions = {} for factor in self.factors: raw_score = factor(intent, cost_estimate, policy_violations) weighted = raw_score * factor.weight contributions[factor.name] = weighted total += weighted total = max(0.0, min(total, 1.0)) lines = [f"Total risk score: {total:.2f}"] for factor in self.factors: contrib = contributions[factor.name] if contrib > 0.0: lines.append(f" - {factor.name}: {contrib:.2f} ({factor.description})") explanation = "\n".join(lines) return total, explanation, contributions