Update infrastructure/risk_engine.py
Browse files
infrastructure/risk_engine.py
CHANGED
|
@@ -1,21 +1,11 @@
|
|
| 1 |
-
# agentic_reliability_framework/infrastructure/risk_engine.py
|
| 2 |
"""
|
| 3 |
Risk Scoring Engine – Multi-factor probabilistic risk model.
|
| 4 |
-
|
| 5 |
-
This module computes a risk score (0-1) for an infrastructure intent by combining
|
| 6 |
-
multiple factors with configurable weights. The model is inspired by Bayesian
|
| 7 |
-
decision theory and multi-criteria decision analysis (MCDA). It produces not only
|
| 8 |
-
a score but also a detailed explanation of each factor's contribution, supporting
|
| 9 |
-
transparency and psychological trust.
|
| 10 |
-
|
| 11 |
-
The risk engine is designed to be extended with additional factors (e.g., historical
|
| 12 |
-
data, anomaly scores) without changing the core API.
|
| 13 |
"""
|
| 14 |
|
| 15 |
from typing import Dict, List, Optional, Tuple, Any, Callable
|
| 16 |
from dataclasses import dataclass, field
|
| 17 |
|
| 18 |
-
from
|
| 19 |
InfrastructureIntent,
|
| 20 |
ProvisionResourceIntent,
|
| 21 |
GrantAccessIntent,
|
|
@@ -24,12 +14,8 @@ from agentic_reliability_framework.infrastructure.intents import (
|
|
| 24 |
Environment,
|
| 25 |
)
|
| 26 |
|
| 27 |
-
# -----------------------------------------------------------------------------
|
| 28 |
-
# Factor Definition
|
| 29 |
-
# -----------------------------------------------------------------------------
|
| 30 |
@dataclass(frozen=True)
|
| 31 |
class RiskFactor:
|
| 32 |
-
"""A single factor contributing to risk, with a weight and a scoring function."""
|
| 33 |
name: str
|
| 34 |
weight: float
|
| 35 |
score_fn: Callable[[InfrastructureIntent, Optional[float], List[str]], float]
|
|
@@ -38,11 +24,7 @@ class RiskFactor:
|
|
| 38 |
def __call__(self, intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
| 39 |
return self.score_fn(intent, cost, violations)
|
| 40 |
|
| 41 |
-
# -----------------------------------------------------------------------------
|
| 42 |
-
# Built-in Factors
|
| 43 |
-
# -----------------------------------------------------------------------------
|
| 44 |
def intent_type_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
| 45 |
-
"""Base risk from intent type."""
|
| 46 |
mapping = {
|
| 47 |
"provision_resource": 0.1,
|
| 48 |
"grant_access": 0.3,
|
|
@@ -51,14 +33,11 @@ def intent_type_factor(intent: InfrastructureIntent, cost: Optional[float], viol
|
|
| 51 |
return mapping.get(intent.intent_type, 0.1)
|
| 52 |
|
| 53 |
def cost_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
| 54 |
-
"""Risk contribution from estimated cost (normalized to [0,1])."""
|
| 55 |
if not isinstance(intent, ProvisionResourceIntent) or cost is None:
|
| 56 |
return 0.0
|
| 57 |
-
# Normalize: $0 → 0, $5000 → 1 (linear)
|
| 58 |
return min(cost / 5000.0, 1.0)
|
| 59 |
|
| 60 |
def permission_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
| 61 |
-
"""Risk from permission level being granted."""
|
| 62 |
if not isinstance(intent, GrantAccessIntent):
|
| 63 |
return 0.0
|
| 64 |
mapping = {
|
|
@@ -69,7 +48,6 @@ def permission_factor(intent: InfrastructureIntent, cost: Optional[float], viola
|
|
| 69 |
return mapping.get(intent.permission_level, 0.5)
|
| 70 |
|
| 71 |
def scope_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
| 72 |
-
"""Risk from deployment scope (for config changes)."""
|
| 73 |
if not isinstance(intent, DeployConfigurationIntent):
|
| 74 |
return 0.0
|
| 75 |
mapping = {
|
|
@@ -80,27 +58,14 @@ def scope_factor(intent: InfrastructureIntent, cost: Optional[float], violations
|
|
| 80 |
return mapping.get(intent.change_scope, 0.3)
|
| 81 |
|
| 82 |
def environment_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
| 83 |
-
"""Additional risk if environment is production."""
|
| 84 |
if hasattr(intent, "environment") and intent.environment == Environment.PROD:
|
| 85 |
return 0.1
|
| 86 |
return 0.0
|
| 87 |
|
| 88 |
def policy_violation_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
| 89 |
-
"""Risk from number of policy violations (capped)."""
|
| 90 |
-
# Each violation adds 0.2, max 0.8
|
| 91 |
return min(len(violations) * 0.2, 0.8)
|
| 92 |
|
| 93 |
-
# -----------------------------------------------------------------------------
|
| 94 |
-
# Risk Engine
|
| 95 |
-
# -----------------------------------------------------------------------------
|
| 96 |
class RiskEngine:
|
| 97 |
-
"""
|
| 98 |
-
Computes a weighted risk score from multiple factors.
|
| 99 |
-
|
| 100 |
-
The engine is initialized with a list of factors and their weights.
|
| 101 |
-
The total score is the weighted sum of factor scores, clamped to [0,1].
|
| 102 |
-
"""
|
| 103 |
-
|
| 104 |
DEFAULT_FACTORS = [
|
| 105 |
RiskFactor("intent_type", 1.0, intent_type_factor, "Base risk from intent type"),
|
| 106 |
RiskFactor("cost", 0.3, cost_factor, "Normalized cost estimate"),
|
|
@@ -111,9 +76,6 @@ class RiskEngine:
|
|
| 111 |
]
|
| 112 |
|
| 113 |
def __init__(self, factors: Optional[List[RiskFactor]] = None):
|
| 114 |
-
"""
|
| 115 |
-
Initialize with custom factors. If none provided, uses DEFAULT_FACTORS.
|
| 116 |
-
"""
|
| 117 |
self.factors = factors if factors is not None else self.DEFAULT_FACTORS
|
| 118 |
|
| 119 |
def calculate_risk(
|
|
@@ -122,14 +84,6 @@ class RiskEngine:
|
|
| 122 |
cost_estimate: Optional[float],
|
| 123 |
policy_violations: List[str],
|
| 124 |
) -> Tuple[float, str, Dict[str, float]]:
|
| 125 |
-
"""
|
| 126 |
-
Compute risk score and detailed breakdown.
|
| 127 |
-
|
| 128 |
-
Returns:
|
| 129 |
-
- total_score: float in [0,1]
|
| 130 |
-
- explanation: human-readable string
|
| 131 |
-
- contributions: dict mapping factor names to their weighted contribution
|
| 132 |
-
"""
|
| 133 |
total = 0.0
|
| 134 |
contributions = {}
|
| 135 |
|
|
@@ -139,10 +93,8 @@ class RiskEngine:
|
|
| 139 |
contributions[factor.name] = weighted
|
| 140 |
total += weighted
|
| 141 |
|
| 142 |
-
# Clamp to [0,1]
|
| 143 |
total = max(0.0, min(total, 1.0))
|
| 144 |
|
| 145 |
-
# Build explanation
|
| 146 |
lines = [f"Total risk score: {total:.2f}"]
|
| 147 |
for factor in self.factors:
|
| 148 |
contrib = contributions[factor.name]
|
|
|
|
|
|
|
| 1 |
"""
|
| 2 |
Risk Scoring Engine – Multi-factor probabilistic risk model.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 3 |
"""
|
| 4 |
|
| 5 |
from typing import Dict, List, Optional, Tuple, Any, Callable
|
| 6 |
from dataclasses import dataclass, field
|
| 7 |
|
| 8 |
+
from .intents import (
|
| 9 |
InfrastructureIntent,
|
| 10 |
ProvisionResourceIntent,
|
| 11 |
GrantAccessIntent,
|
|
|
|
| 14 |
Environment,
|
| 15 |
)
|
| 16 |
|
|
|
|
|
|
|
|
|
|
| 17 |
@dataclass(frozen=True)
|
| 18 |
class RiskFactor:
|
|
|
|
| 19 |
name: str
|
| 20 |
weight: float
|
| 21 |
score_fn: Callable[[InfrastructureIntent, Optional[float], List[str]], float]
|
|
|
|
| 24 |
def __call__(self, intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
| 25 |
return self.score_fn(intent, cost, violations)
|
| 26 |
|
|
|
|
|
|
|
|
|
|
| 27 |
def intent_type_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
|
|
|
| 28 |
mapping = {
|
| 29 |
"provision_resource": 0.1,
|
| 30 |
"grant_access": 0.3,
|
|
|
|
| 33 |
return mapping.get(intent.intent_type, 0.1)
|
| 34 |
|
| 35 |
def cost_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
|
|
|
| 36 |
if not isinstance(intent, ProvisionResourceIntent) or cost is None:
|
| 37 |
return 0.0
|
|
|
|
| 38 |
return min(cost / 5000.0, 1.0)
|
| 39 |
|
| 40 |
def permission_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
|
|
|
| 41 |
if not isinstance(intent, GrantAccessIntent):
|
| 42 |
return 0.0
|
| 43 |
mapping = {
|
|
|
|
| 48 |
return mapping.get(intent.permission_level, 0.5)
|
| 49 |
|
| 50 |
def scope_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
|
|
|
| 51 |
if not isinstance(intent, DeployConfigurationIntent):
|
| 52 |
return 0.0
|
| 53 |
mapping = {
|
|
|
|
| 58 |
return mapping.get(intent.change_scope, 0.3)
|
| 59 |
|
| 60 |
def environment_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
|
|
|
| 61 |
if hasattr(intent, "environment") and intent.environment == Environment.PROD:
|
| 62 |
return 0.1
|
| 63 |
return 0.0
|
| 64 |
|
| 65 |
def policy_violation_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
|
|
|
|
|
|
|
| 66 |
return min(len(violations) * 0.2, 0.8)
|
| 67 |
|
|
|
|
|
|
|
|
|
|
| 68 |
class RiskEngine:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 69 |
DEFAULT_FACTORS = [
|
| 70 |
RiskFactor("intent_type", 1.0, intent_type_factor, "Base risk from intent type"),
|
| 71 |
RiskFactor("cost", 0.3, cost_factor, "Normalized cost estimate"),
|
|
|
|
| 76 |
]
|
| 77 |
|
| 78 |
def __init__(self, factors: Optional[List[RiskFactor]] = None):
|
|
|
|
|
|
|
|
|
|
| 79 |
self.factors = factors if factors is not None else self.DEFAULT_FACTORS
|
| 80 |
|
| 81 |
def calculate_risk(
|
|
|
|
| 84 |
cost_estimate: Optional[float],
|
| 85 |
policy_violations: List[str],
|
| 86 |
) -> Tuple[float, str, Dict[str, float]]:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 87 |
total = 0.0
|
| 88 |
contributions = {}
|
| 89 |
|
|
|
|
| 93 |
contributions[factor.name] = weighted
|
| 94 |
total += weighted
|
| 95 |
|
|
|
|
| 96 |
total = max(0.0, min(total, 1.0))
|
| 97 |
|
|
|
|
| 98 |
lines = [f"Total risk score: {total:.2f}"]
|
| 99 |
for factor in self.factors:
|
| 100 |
contrib = contributions[factor.name]
|