petter2025 commited on
Commit
72db17b
·
verified ·
1 Parent(s): d688dc2

Create policies.py

Browse files
Files changed (1) hide show
  1. infrastructure/policies.py +244 -0
infrastructure/policies.py ADDED
@@ -0,0 +1,244 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # agentic_reliability_framework/infrastructure/policies.py
2
+ """
3
+ Policy Algebra – Composable, typed policies for infrastructure governance.
4
+
5
+ This module defines a composable policy system using a monoid-like structure.
6
+ Policies can be combined (AND, OR) and evaluated against intents. The algebra
7
+ enables building complex rules from simple primitives while maintaining
8
+ deterministic evaluation.
9
+
10
+ The design draws from knowledge engineering (rule-based systems), decision
11
+ engineering (explicit trade-offs), and platform engineering (pluggable backends).
12
+ """
13
+
14
+ from __future__ import annotations
15
+
16
+ from abc import ABC, abstractmethod
17
+ from dataclasses import dataclass
18
+ from typing import Any, Callable, Dict, List, Optional, Protocol, Set, TypeVar, Union
19
+
20
+ from pydantic import BaseModel
21
+
22
+ from agentic_reliability_framework.infrastructure.intents import (
23
+ InfrastructureIntent,
24
+ ProvisionResourceIntent,
25
+ GrantAccessIntent,
26
+ ResourceType,
27
+ PermissionLevel,
28
+ Environment,
29
+ )
30
+
31
+ # -----------------------------------------------------------------------------
32
+ # Type Aliases for Readability
33
+ # -----------------------------------------------------------------------------
34
+ Violation = str
35
+ EvalResult = List[Violation]
36
+
37
+ # -----------------------------------------------------------------------------
38
+ # Abstract Policy Class – using Composite pattern
39
+ # -----------------------------------------------------------------------------
40
+ class Policy(ABC):
41
+ """Abstract base for all policies. Evaluates an intent and returns violations."""
42
+
43
+ @abstractmethod
44
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
45
+ """Return list of violations. Empty list means the policy is satisfied."""
46
+ pass
47
+
48
+ # -------------------------------------------------------------------------
49
+ # Combinators (returning new composite policies)
50
+ # -------------------------------------------------------------------------
51
+ def __and__(self, other: Policy) -> Policy:
52
+ """Logical AND: violations from both policies."""
53
+ return AndPolicy(self, other)
54
+
55
+ def __or__(self, other: Policy) -> Policy:
56
+ """Logical OR: violations only if both policies produce violations."""
57
+ return OrPolicy(self, other)
58
+
59
+ def __invert__(self) -> Policy:
60
+ """Logical NOT: violations if the original policy yields no violations."""
61
+ return NotPolicy(self)
62
+
63
+ # -----------------------------------------------------------------------------
64
+ # Atomic Policies (Primitives)
65
+ # -----------------------------------------------------------------------------
66
+ class AtomicPolicy(Policy, ABC):
67
+ """Base class for policies that don't contain other policies."""
68
+ pass
69
+
70
+ @dataclass(frozen=True)
71
+ class RegionAllowedPolicy(AtomicPolicy):
72
+ """Ensure the intent's region is in an allowed set."""
73
+ allowed_regions: Set[str]
74
+
75
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
76
+ if isinstance(intent, ProvisionResourceIntent):
77
+ if intent.region not in self.allowed_regions:
78
+ return [f"Region '{intent.region}' not allowed. Allowed: {self.allowed_regions}"]
79
+ return []
80
+
81
+ @dataclass(frozen=True)
82
+ class ResourceTypeRestrictedPolicy(AtomicPolicy):
83
+ """Forbid certain resource types."""
84
+ forbidden_types: Set[ResourceType]
85
+
86
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
87
+ if isinstance(intent, ProvisionResourceIntent):
88
+ if intent.resource_type in self.forbidden_types:
89
+ return [f"Resource type '{intent.resource_type.value}' is forbidden."]
90
+ return []
91
+
92
+ @dataclass(frozen=True)
93
+ class MaxPermissionLevelPolicy(AtomicPolicy):
94
+ """Limit the maximum permission level that can be granted."""
95
+ max_level: PermissionLevel
96
+
97
+ # Permission level ordering (read < write < admin)
98
+ _LEVEL_ORDER = {
99
+ PermissionLevel.READ: 1,
100
+ PermissionLevel.WRITE: 2,
101
+ PermissionLevel.ADMIN: 3,
102
+ }
103
+
104
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
105
+ if isinstance(intent, GrantAccessIntent):
106
+ if self._LEVEL_ORDER[intent.permission_level] > self._LEVEL_ORDER[self.max_level]:
107
+ return [f"Permission level '{intent.permission_level.value}' exceeds max allowed '{self.max_level.value}'."]
108
+ return []
109
+
110
+ @dataclass(frozen=True)
111
+ class CostThresholdPolicy(AtomicPolicy):
112
+ """
113
+ Enforce a maximum monthly cost.
114
+ Note: This policy requires the cost estimate to be provided externally.
115
+ We evaluate it only when a cost is supplied via context.
116
+ """
117
+ max_cost_usd: float
118
+
119
+ def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult:
120
+ # This is a special case – we need cost from the simulator.
121
+ # We'll handle it by allowing context injection. For composition, we'll use a wrapper.
122
+ # In practice, we'll evaluate this in the simulator and add violations manually.
123
+ # For now, we keep it as a marker.
124
+ return []
125
+
126
+ # -----------------------------------------------------------------------------
127
+ # Composite Policies (using Combinators)
128
+ # -----------------------------------------------------------------------------
129
+ @dataclass(frozen=True)
130
+ class AndPolicy(Policy):
131
+ """Logical AND of two policies."""
132
+ left: Policy
133
+ right: Policy
134
+
135
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
136
+ return self.left.evaluate(intent) + self.right.evaluate(intent)
137
+
138
+ @dataclass(frozen=True)
139
+ class OrPolicy(Policy):
140
+ """Logical OR – violations only if both sub-policies have violations."""
141
+ left: Policy
142
+ right: Policy
143
+
144
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
145
+ left_violations = self.left.evaluate(intent)
146
+ right_violations = self.right.evaluate(intent)
147
+ # If either has no violations, overall no violations (OR satisfied)
148
+ if not left_violations or not right_violations:
149
+ return []
150
+ # Both have violations – combine them
151
+ return left_violations + right_violations
152
+
153
+ @dataclass(frozen=True)
154
+ class NotPolicy(Policy):
155
+ """Logical NOT – violations if inner policy has no violations."""
156
+ inner: Policy
157
+
158
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
159
+ inner_violations = self.inner.evaluate(intent)
160
+ if inner_violations:
161
+ return [] # inner failed (had violations), so NOT is satisfied (no violations)
162
+ else:
163
+ return ["Condition not met (NOT policy)"]
164
+
165
+ # -----------------------------------------------------------------------------
166
+ # Context-Aware Evaluation
167
+ # -----------------------------------------------------------------------------
168
+ class PolicyEvaluator:
169
+ """
170
+ Evaluates a policy tree with additional context (e.g., cost estimates).
171
+ This allows policies that depend on external data to be evaluated.
172
+ """
173
+
174
+ def __init__(self, root_policy: Policy):
175
+ self._root = root_policy
176
+
177
+ def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult:
178
+ """
179
+ Evaluate the policy tree against the intent, using context for dynamic checks.
180
+ Context may contain 'cost_estimate' for CostThresholdPolicy, etc.
181
+ """
182
+ # For simplicity, we traverse the tree and apply context where needed.
183
+ # A more sophisticated implementation would use a visitor pattern.
184
+ return self._evaluate_recursive(self._root, intent, context or {})
185
+
186
+ def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult:
187
+ # If policy is CostThresholdPolicy, we use context to evaluate.
188
+ if isinstance(policy, CostThresholdPolicy):
189
+ cost = context.get('cost_estimate')
190
+ if cost is not None and cost > policy.max_cost_usd:
191
+ return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"]
192
+ return []
193
+ # For atomic policies, just evaluate (they don't need context)
194
+ if isinstance(policy, AtomicPolicy):
195
+ return policy.evaluate(intent)
196
+ # For composites, recurse
197
+ if isinstance(policy, AndPolicy):
198
+ return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context)
199
+ if isinstance(policy, OrPolicy):
200
+ left = self._evaluate_recursive(policy.left, intent, context)
201
+ right = self._evaluate_recursive(policy.right, intent, context)
202
+ if not left or not right:
203
+ return []
204
+ return left + right
205
+ if isinstance(policy, NotPolicy):
206
+ inner = self._evaluate_recursive(policy.inner, intent, context)
207
+ if inner:
208
+ return []
209
+ return ["Condition not met (NOT policy)"]
210
+ # Fallback
211
+ return []
212
+
213
+ # -----------------------------------------------------------------------------
214
+ # Policy Builder (DSL) – convenience functions
215
+ # -----------------------------------------------------------------------------
216
+ def allow_all() -> Policy:
217
+ """Policy that never produces violations."""
218
+ class _AllowAll(AtomicPolicy):
219
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
220
+ return []
221
+ return _AllowAll()
222
+
223
+ def deny_all() -> Policy:
224
+ """Policy that always produces a violation."""
225
+ class _DenyAll(AtomicPolicy):
226
+ def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
227
+ return ["Action denied by default policy"]
228
+ return _DenyAll()
229
+
230
+ # Example: policy = (region_allowed({"eastus"}) & ~resource_type_restricted({ResourceType.KUBERNETES_CLUSTER})) | cost_threshold(500)
231
+
232
+ __all__ = [
233
+ "Policy",
234
+ "RegionAllowedPolicy",
235
+ "ResourceTypeRestrictedPolicy",
236
+ "MaxPermissionLevelPolicy",
237
+ "CostThresholdPolicy",
238
+ "AndPolicy",
239
+ "OrPolicy",
240
+ "NotPolicy",
241
+ "PolicyEvaluator",
242
+ "allow_all",
243
+ "deny_all",
244
+ ]