File size: 9,963 Bytes
72db17b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | # agentic_reliability_framework/infrastructure/policies.py
"""
Policy Algebra β Composable, typed policies for infrastructure governance.
This module defines a composable policy system using a monoid-like structure.
Policies can be combined (AND, OR) and evaluated against intents. The algebra
enables building complex rules from simple primitives while maintaining
deterministic evaluation.
The design draws from knowledge engineering (rule-based systems), decision
engineering (explicit trade-offs), and platform engineering (pluggable backends).
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Protocol, Set, TypeVar, Union
from pydantic import BaseModel
from agentic_reliability_framework.infrastructure.intents import (
InfrastructureIntent,
ProvisionResourceIntent,
GrantAccessIntent,
ResourceType,
PermissionLevel,
Environment,
)
# -----------------------------------------------------------------------------
# Type Aliases for Readability
# -----------------------------------------------------------------------------
Violation = str
EvalResult = List[Violation]
# -----------------------------------------------------------------------------
# Abstract Policy Class β using Composite pattern
# -----------------------------------------------------------------------------
class Policy(ABC):
"""Abstract base for all policies. Evaluates an intent and returns violations."""
@abstractmethod
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
"""Return list of violations. Empty list means the policy is satisfied."""
pass
# -------------------------------------------------------------------------
# Combinators (returning new composite policies)
# -------------------------------------------------------------------------
def __and__(self, other: Policy) -> Policy:
"""Logical AND: violations from both policies."""
return AndPolicy(self, other)
def __or__(self, other: Policy) -> Policy:
"""Logical OR: violations only if both policies produce violations."""
return OrPolicy(self, other)
def __invert__(self) -> Policy:
"""Logical NOT: violations if the original policy yields no violations."""
return NotPolicy(self)
# -----------------------------------------------------------------------------
# Atomic Policies (Primitives)
# -----------------------------------------------------------------------------
class AtomicPolicy(Policy, ABC):
"""Base class for policies that don't contain other policies."""
pass
@dataclass(frozen=True)
class RegionAllowedPolicy(AtomicPolicy):
"""Ensure the intent's region is in an allowed set."""
allowed_regions: Set[str]
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, ProvisionResourceIntent):
if intent.region not in self.allowed_regions:
return [f"Region '{intent.region}' not allowed. Allowed: {self.allowed_regions}"]
return []
@dataclass(frozen=True)
class ResourceTypeRestrictedPolicy(AtomicPolicy):
"""Forbid certain resource types."""
forbidden_types: Set[ResourceType]
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, ProvisionResourceIntent):
if intent.resource_type in self.forbidden_types:
return [f"Resource type '{intent.resource_type.value}' is forbidden."]
return []
@dataclass(frozen=True)
class MaxPermissionLevelPolicy(AtomicPolicy):
"""Limit the maximum permission level that can be granted."""
max_level: PermissionLevel
# Permission level ordering (read < write < admin)
_LEVEL_ORDER = {
PermissionLevel.READ: 1,
PermissionLevel.WRITE: 2,
PermissionLevel.ADMIN: 3,
}
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, GrantAccessIntent):
if self._LEVEL_ORDER[intent.permission_level] > self._LEVEL_ORDER[self.max_level]:
return [f"Permission level '{intent.permission_level.value}' exceeds max allowed '{self.max_level.value}'."]
return []
@dataclass(frozen=True)
class CostThresholdPolicy(AtomicPolicy):
"""
Enforce a maximum monthly cost.
Note: This policy requires the cost estimate to be provided externally.
We evaluate it only when a cost is supplied via context.
"""
max_cost_usd: float
def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult:
# This is a special case β we need cost from the simulator.
# We'll handle it by allowing context injection. For composition, we'll use a wrapper.
# In practice, we'll evaluate this in the simulator and add violations manually.
# For now, we keep it as a marker.
return []
# -----------------------------------------------------------------------------
# Composite Policies (using Combinators)
# -----------------------------------------------------------------------------
@dataclass(frozen=True)
class AndPolicy(Policy):
"""Logical AND of two policies."""
left: Policy
right: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return self.left.evaluate(intent) + self.right.evaluate(intent)
@dataclass(frozen=True)
class OrPolicy(Policy):
"""Logical OR β violations only if both sub-policies have violations."""
left: Policy
right: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
left_violations = self.left.evaluate(intent)
right_violations = self.right.evaluate(intent)
# If either has no violations, overall no violations (OR satisfied)
if not left_violations or not right_violations:
return []
# Both have violations β combine them
return left_violations + right_violations
@dataclass(frozen=True)
class NotPolicy(Policy):
"""Logical NOT β violations if inner policy has no violations."""
inner: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
inner_violations = self.inner.evaluate(intent)
if inner_violations:
return [] # inner failed (had violations), so NOT is satisfied (no violations)
else:
return ["Condition not met (NOT policy)"]
# -----------------------------------------------------------------------------
# Context-Aware Evaluation
# -----------------------------------------------------------------------------
class PolicyEvaluator:
"""
Evaluates a policy tree with additional context (e.g., cost estimates).
This allows policies that depend on external data to be evaluated.
"""
def __init__(self, root_policy: Policy):
self._root = root_policy
def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult:
"""
Evaluate the policy tree against the intent, using context for dynamic checks.
Context may contain 'cost_estimate' for CostThresholdPolicy, etc.
"""
# For simplicity, we traverse the tree and apply context where needed.
# A more sophisticated implementation would use a visitor pattern.
return self._evaluate_recursive(self._root, intent, context or {})
def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult:
# If policy is CostThresholdPolicy, we use context to evaluate.
if isinstance(policy, CostThresholdPolicy):
cost = context.get('cost_estimate')
if cost is not None and cost > policy.max_cost_usd:
return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"]
return []
# For atomic policies, just evaluate (they don't need context)
if isinstance(policy, AtomicPolicy):
return policy.evaluate(intent)
# For composites, recurse
if isinstance(policy, AndPolicy):
return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context)
if isinstance(policy, OrPolicy):
left = self._evaluate_recursive(policy.left, intent, context)
right = self._evaluate_recursive(policy.right, intent, context)
if not left or not right:
return []
return left + right
if isinstance(policy, NotPolicy):
inner = self._evaluate_recursive(policy.inner, intent, context)
if inner:
return []
return ["Condition not met (NOT policy)"]
# Fallback
return []
# -----------------------------------------------------------------------------
# Policy Builder (DSL) β convenience functions
# -----------------------------------------------------------------------------
def allow_all() -> Policy:
"""Policy that never produces violations."""
class _AllowAll(AtomicPolicy):
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return []
return _AllowAll()
def deny_all() -> Policy:
"""Policy that always produces a violation."""
class _DenyAll(AtomicPolicy):
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return ["Action denied by default policy"]
return _DenyAll()
# Example: policy = (region_allowed({"eastus"}) & ~resource_type_restricted({ResourceType.KUBERNETES_CLUSTER})) | cost_threshold(500)
__all__ = [
"Policy",
"RegionAllowedPolicy",
"ResourceTypeRestrictedPolicy",
"MaxPermissionLevelPolicy",
"CostThresholdPolicy",
"AndPolicy",
"OrPolicy",
"NotPolicy",
"PolicyEvaluator",
"allow_all",
"deny_all",
] |