File size: 5,590 Bytes
72db17b e840d23 72db17b e840d23 72db17b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | """
Policy Algebra – Composable, typed policies for infrastructure governance.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Protocol, Set, TypeVar, Union
from pydantic import BaseModel
# Relative imports for intents
from .intents import (
InfrastructureIntent,
ProvisionResourceIntent,
GrantAccessIntent,
ResourceType,
PermissionLevel,
Environment,
)
Violation = str
EvalResult = List[Violation]
class Policy(ABC):
@abstractmethod
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
pass
def __and__(self, other: Policy) -> Policy:
return AndPolicy(self, other)
def __or__(self, other: Policy) -> Policy:
return OrPolicy(self, other)
def __invert__(self) -> Policy:
return NotPolicy(self)
class AtomicPolicy(Policy, ABC):
pass
@dataclass(frozen=True)
class RegionAllowedPolicy(AtomicPolicy):
allowed_regions: Set[str]
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, ProvisionResourceIntent):
if intent.region not in self.allowed_regions:
return [f"Region '{intent.region}' not allowed. Allowed: {self.allowed_regions}"]
return []
@dataclass(frozen=True)
class ResourceTypeRestrictedPolicy(AtomicPolicy):
forbidden_types: Set[ResourceType]
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, ProvisionResourceIntent):
if intent.resource_type in self.forbidden_types:
return [f"Resource type '{intent.resource_type.value}' is forbidden."]
return []
@dataclass(frozen=True)
class MaxPermissionLevelPolicy(AtomicPolicy):
max_level: PermissionLevel
_LEVEL_ORDER = {
PermissionLevel.READ: 1,
PermissionLevel.WRITE: 2,
PermissionLevel.ADMIN: 3,
}
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, GrantAccessIntent):
if self._LEVEL_ORDER[intent.permission_level] > self._LEVEL_ORDER[self.max_level]:
return [f"Permission level '{intent.permission_level.value}' exceeds max allowed '{self.max_level.value}'."]
return []
@dataclass(frozen=True)
class CostThresholdPolicy(AtomicPolicy):
max_cost_usd: float
def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult:
return []
@dataclass(frozen=True)
class AndPolicy(Policy):
left: Policy
right: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return self.left.evaluate(intent) + self.right.evaluate(intent)
@dataclass(frozen=True)
class OrPolicy(Policy):
left: Policy
right: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
left_violations = self.left.evaluate(intent)
right_violations = self.right.evaluate(intent)
if not left_violations or not right_violations:
return []
return left_violations + right_violations
@dataclass(frozen=True)
class NotPolicy(Policy):
inner: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
inner_violations = self.inner.evaluate(intent)
if inner_violations:
return []
else:
return ["Condition not met (NOT policy)"]
class PolicyEvaluator:
def __init__(self, root_policy: Policy):
self._root = root_policy
def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult:
return self._evaluate_recursive(self._root, intent, context or {})
def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult:
if isinstance(policy, CostThresholdPolicy):
cost = context.get('cost_estimate')
if cost is not None and cost > policy.max_cost_usd:
return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"]
return []
if isinstance(policy, AtomicPolicy):
return policy.evaluate(intent)
if isinstance(policy, AndPolicy):
return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context)
if isinstance(policy, OrPolicy):
left = self._evaluate_recursive(policy.left, intent, context)
right = self._evaluate_recursive(policy.right, intent, context)
if not left or not right:
return []
return left + right
if isinstance(policy, NotPolicy):
inner = self._evaluate_recursive(policy.inner, intent, context)
if inner:
return []
return ["Condition not met (NOT policy)"]
return []
def allow_all() -> Policy:
class _AllowAll(AtomicPolicy):
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return []
return _AllowAll()
def deny_all() -> Policy:
class _DenyAll(AtomicPolicy):
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return ["Action denied by default policy"]
return _DenyAll()
__all__ = [
"Policy",
"RegionAllowedPolicy",
"ResourceTypeRestrictedPolicy",
"MaxPermissionLevelPolicy",
"CostThresholdPolicy",
"AndPolicy",
"OrPolicy",
"NotPolicy",
"PolicyEvaluator",
"allow_all",
"deny_all",
] |