File size: 9,963 Bytes
72db17b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# agentic_reliability_framework/infrastructure/policies.py
"""
Policy Algebra – Composable, typed policies for infrastructure governance.

This module defines a composable policy system using a monoid-like structure.
Policies can be combined (AND, OR) and evaluated against intents. The algebra
enables building complex rules from simple primitives while maintaining
deterministic evaluation.

The design draws from knowledge engineering (rule-based systems), decision
engineering (explicit trade-offs), and platform engineering (pluggable backends).
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Protocol, Set, TypeVar, Union

from pydantic import BaseModel

from agentic_reliability_framework.infrastructure.intents import (
    InfrastructureIntent,
    ProvisionResourceIntent,
    GrantAccessIntent,
    ResourceType,
    PermissionLevel,
    Environment,
)

# -----------------------------------------------------------------------------
# Type Aliases for Readability
# -----------------------------------------------------------------------------
Violation = str
EvalResult = List[Violation]

# -----------------------------------------------------------------------------
# Abstract Policy Class – using Composite pattern
# -----------------------------------------------------------------------------
class Policy(ABC):
    """Abstract base for all policies. Evaluates an intent and returns violations."""

    @abstractmethod
    def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
        """Return list of violations. Empty list means the policy is satisfied."""
        pass

    # -------------------------------------------------------------------------
    # Combinators (returning new composite policies)
    # -------------------------------------------------------------------------
    def __and__(self, other: Policy) -> Policy:
        """Logical AND: violations from both policies."""
        return AndPolicy(self, other)

    def __or__(self, other: Policy) -> Policy:
        """Logical OR: violations only if both policies produce violations."""
        return OrPolicy(self, other)

    def __invert__(self) -> Policy:
        """Logical NOT: violations if the original policy yields no violations."""
        return NotPolicy(self)

# -----------------------------------------------------------------------------
# Atomic Policies (Primitives)
# -----------------------------------------------------------------------------
class AtomicPolicy(Policy, ABC):
    """Base class for policies that don't contain other policies."""
    pass

@dataclass(frozen=True)
class RegionAllowedPolicy(AtomicPolicy):
    """Ensure the intent's region is in an allowed set."""
    allowed_regions: Set[str]

    def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
        if isinstance(intent, ProvisionResourceIntent):
            if intent.region not in self.allowed_regions:
                return [f"Region '{intent.region}' not allowed. Allowed: {self.allowed_regions}"]
        return []

@dataclass(frozen=True)
class ResourceTypeRestrictedPolicy(AtomicPolicy):
    """Forbid certain resource types."""
    forbidden_types: Set[ResourceType]

    def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
        if isinstance(intent, ProvisionResourceIntent):
            if intent.resource_type in self.forbidden_types:
                return [f"Resource type '{intent.resource_type.value}' is forbidden."]
        return []

@dataclass(frozen=True)
class MaxPermissionLevelPolicy(AtomicPolicy):
    """Limit the maximum permission level that can be granted."""
    max_level: PermissionLevel

    # Permission level ordering (read < write < admin)
    _LEVEL_ORDER = {
        PermissionLevel.READ: 1,
        PermissionLevel.WRITE: 2,
        PermissionLevel.ADMIN: 3,
    }

    def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
        if isinstance(intent, GrantAccessIntent):
            if self._LEVEL_ORDER[intent.permission_level] > self._LEVEL_ORDER[self.max_level]:
                return [f"Permission level '{intent.permission_level.value}' exceeds max allowed '{self.max_level.value}'."]
        return []

@dataclass(frozen=True)
class CostThresholdPolicy(AtomicPolicy):
    """
    Enforce a maximum monthly cost.
    Note: This policy requires the cost estimate to be provided externally.
    We evaluate it only when a cost is supplied via context.
    """
    max_cost_usd: float

    def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult:
        # This is a special case – we need cost from the simulator.
        # We'll handle it by allowing context injection. For composition, we'll use a wrapper.
        # In practice, we'll evaluate this in the simulator and add violations manually.
        # For now, we keep it as a marker.
        return []

# -----------------------------------------------------------------------------
# Composite Policies (using Combinators)
# -----------------------------------------------------------------------------
@dataclass(frozen=True)
class AndPolicy(Policy):
    """Logical AND of two policies."""
    left: Policy
    right: Policy

    def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
        return self.left.evaluate(intent) + self.right.evaluate(intent)

@dataclass(frozen=True)
class OrPolicy(Policy):
    """Logical OR – violations only if both sub-policies have violations."""
    left: Policy
    right: Policy

    def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
        left_violations = self.left.evaluate(intent)
        right_violations = self.right.evaluate(intent)
        # If either has no violations, overall no violations (OR satisfied)
        if not left_violations or not right_violations:
            return []
        # Both have violations – combine them
        return left_violations + right_violations

@dataclass(frozen=True)
class NotPolicy(Policy):
    """Logical NOT – violations if inner policy has no violations."""
    inner: Policy

    def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
        inner_violations = self.inner.evaluate(intent)
        if inner_violations:
            return []  # inner failed (had violations), so NOT is satisfied (no violations)
        else:
            return ["Condition not met (NOT policy)"]

# -----------------------------------------------------------------------------
# Context-Aware Evaluation
# -----------------------------------------------------------------------------
class PolicyEvaluator:
    """
    Evaluates a policy tree with additional context (e.g., cost estimates).
    This allows policies that depend on external data to be evaluated.
    """

    def __init__(self, root_policy: Policy):
        self._root = root_policy

    def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult:
        """
        Evaluate the policy tree against the intent, using context for dynamic checks.
        Context may contain 'cost_estimate' for CostThresholdPolicy, etc.
        """
        # For simplicity, we traverse the tree and apply context where needed.
        # A more sophisticated implementation would use a visitor pattern.
        return self._evaluate_recursive(self._root, intent, context or {})

    def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult:
        # If policy is CostThresholdPolicy, we use context to evaluate.
        if isinstance(policy, CostThresholdPolicy):
            cost = context.get('cost_estimate')
            if cost is not None and cost > policy.max_cost_usd:
                return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"]
            return []
        # For atomic policies, just evaluate (they don't need context)
        if isinstance(policy, AtomicPolicy):
            return policy.evaluate(intent)
        # For composites, recurse
        if isinstance(policy, AndPolicy):
            return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context)
        if isinstance(policy, OrPolicy):
            left = self._evaluate_recursive(policy.left, intent, context)
            right = self._evaluate_recursive(policy.right, intent, context)
            if not left or not right:
                return []
            return left + right
        if isinstance(policy, NotPolicy):
            inner = self._evaluate_recursive(policy.inner, intent, context)
            if inner:
                return []
            return ["Condition not met (NOT policy)"]
        # Fallback
        return []

# -----------------------------------------------------------------------------
# Policy Builder (DSL) – convenience functions
# -----------------------------------------------------------------------------
def allow_all() -> Policy:
    """Policy that never produces violations."""
    class _AllowAll(AtomicPolicy):
        def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
            return []
    return _AllowAll()

def deny_all() -> Policy:
    """Policy that always produces a violation."""
    class _DenyAll(AtomicPolicy):
        def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
            return ["Action denied by default policy"]
    return _DenyAll()

# Example: policy = (region_allowed({"eastus"}) & ~resource_type_restricted({ResourceType.KUBERNETES_CLUSTER})) | cost_threshold(500)

__all__ = [
    "Policy",
    "RegionAllowedPolicy",
    "ResourceTypeRestrictedPolicy",
    "MaxPermissionLevelPolicy",
    "CostThresholdPolicy",
    "AndPolicy",
    "OrPolicy",
    "NotPolicy",
    "PolicyEvaluator",
    "allow_all",
    "deny_all",
]