File size: 3,857 Bytes
1cdc8e7
 
 
 
 
 
 
0b9fac2
1cdc8e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
Risk Scoring Engine – Multi-factor probabilistic risk model.
"""

from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field

from .intents import (
    InfrastructureIntent,
    ProvisionResourceIntent,
    GrantAccessIntent,
    DeployConfigurationIntent,
    PermissionLevel,
    Environment,
)

@dataclass(frozen=True)
class RiskFactor:
    name: str
    weight: float
    score_fn: Callable[[InfrastructureIntent, Optional[float], List[str]], float]
    description: str = ""

    def __call__(self, intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
        return self.score_fn(intent, cost, violations)

def intent_type_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
    mapping = {
        "provision_resource": 0.1,
        "grant_access": 0.3,
        "deploy_config": 0.2,
    }
    return mapping.get(intent.intent_type, 0.1)

def cost_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
    if not isinstance(intent, ProvisionResourceIntent) or cost is None:
        return 0.0
    return min(cost / 5000.0, 1.0)

def permission_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
    if not isinstance(intent, GrantAccessIntent):
        return 0.0
    mapping = {
        PermissionLevel.READ: 0.1,
        PermissionLevel.WRITE: 0.4,
        PermissionLevel.ADMIN: 0.8,
    }
    return mapping.get(intent.permission_level, 0.5)

def scope_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
    if not isinstance(intent, DeployConfigurationIntent):
        return 0.0
    mapping = {
        "single_instance": 0.1,
        "canary": 0.2,
        "global": 0.6,
    }
    return mapping.get(intent.change_scope, 0.3)

def environment_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
    if hasattr(intent, "environment") and intent.environment == Environment.PROD:
        return 0.1
    return 0.0

def policy_violation_factor(intent: InfrastructureIntent, cost: Optional[float], violations: List[str]) -> float:
    return min(len(violations) * 0.2, 0.8)

class RiskEngine:
    DEFAULT_FACTORS = [
        RiskFactor("intent_type", 1.0, intent_type_factor, "Base risk from intent type"),
        RiskFactor("cost", 0.3, cost_factor, "Normalized cost estimate"),
        RiskFactor("permission", 0.3, permission_factor, "Permission level being granted"),
        RiskFactor("scope", 0.2, scope_factor, "Deployment scope"),
        RiskFactor("environment", 0.1, environment_factor, "Production environment"),
        RiskFactor("policy_violations", 0.2, policy_violation_factor, "Number of policy violations"),
    ]

    def __init__(self, factors: Optional[List[RiskFactor]] = None):
        self.factors = factors if factors is not None else self.DEFAULT_FACTORS

    def calculate_risk(
        self,
        intent: InfrastructureIntent,
        cost_estimate: Optional[float],
        policy_violations: List[str],
    ) -> Tuple[float, str, Dict[str, float]]:
        total = 0.0
        contributions = {}

        for factor in self.factors:
            raw_score = factor(intent, cost_estimate, policy_violations)
            weighted = raw_score * factor.weight
            contributions[factor.name] = weighted
            total += weighted

        total = max(0.0, min(total, 1.0))

        lines = [f"Total risk score: {total:.2f}"]
        for factor in self.factors:
            contrib = contributions[factor.name]
            if contrib > 0.0:
                lines.append(f"  - {factor.name}: {contrib:.2f} ({factor.description})")
        explanation = "\n".join(lines)

        return total, explanation, contributions