petter2025's picture
Update infrastructure/policies.py
e840d23 verified
raw
history blame
5.59 kB
"""
Policy Algebra – Composable, typed policies for infrastructure governance.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Protocol, Set, TypeVar, Union
from pydantic import BaseModel
# Relative imports for intents
from .intents import (
InfrastructureIntent,
ProvisionResourceIntent,
GrantAccessIntent,
ResourceType,
PermissionLevel,
Environment,
)
Violation = str
EvalResult = List[Violation]
class Policy(ABC):
@abstractmethod
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
pass
def __and__(self, other: Policy) -> Policy:
return AndPolicy(self, other)
def __or__(self, other: Policy) -> Policy:
return OrPolicy(self, other)
def __invert__(self) -> Policy:
return NotPolicy(self)
class AtomicPolicy(Policy, ABC):
pass
@dataclass(frozen=True)
class RegionAllowedPolicy(AtomicPolicy):
allowed_regions: Set[str]
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, ProvisionResourceIntent):
if intent.region not in self.allowed_regions:
return [f"Region '{intent.region}' not allowed. Allowed: {self.allowed_regions}"]
return []
@dataclass(frozen=True)
class ResourceTypeRestrictedPolicy(AtomicPolicy):
forbidden_types: Set[ResourceType]
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, ProvisionResourceIntent):
if intent.resource_type in self.forbidden_types:
return [f"Resource type '{intent.resource_type.value}' is forbidden."]
return []
@dataclass(frozen=True)
class MaxPermissionLevelPolicy(AtomicPolicy):
max_level: PermissionLevel
_LEVEL_ORDER = {
PermissionLevel.READ: 1,
PermissionLevel.WRITE: 2,
PermissionLevel.ADMIN: 3,
}
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
if isinstance(intent, GrantAccessIntent):
if self._LEVEL_ORDER[intent.permission_level] > self._LEVEL_ORDER[self.max_level]:
return [f"Permission level '{intent.permission_level.value}' exceeds max allowed '{self.max_level.value}'."]
return []
@dataclass(frozen=True)
class CostThresholdPolicy(AtomicPolicy):
max_cost_usd: float
def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult:
return []
@dataclass(frozen=True)
class AndPolicy(Policy):
left: Policy
right: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return self.left.evaluate(intent) + self.right.evaluate(intent)
@dataclass(frozen=True)
class OrPolicy(Policy):
left: Policy
right: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
left_violations = self.left.evaluate(intent)
right_violations = self.right.evaluate(intent)
if not left_violations or not right_violations:
return []
return left_violations + right_violations
@dataclass(frozen=True)
class NotPolicy(Policy):
inner: Policy
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
inner_violations = self.inner.evaluate(intent)
if inner_violations:
return []
else:
return ["Condition not met (NOT policy)"]
class PolicyEvaluator:
def __init__(self, root_policy: Policy):
self._root = root_policy
def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult:
return self._evaluate_recursive(self._root, intent, context or {})
def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult:
if isinstance(policy, CostThresholdPolicy):
cost = context.get('cost_estimate')
if cost is not None and cost > policy.max_cost_usd:
return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"]
return []
if isinstance(policy, AtomicPolicy):
return policy.evaluate(intent)
if isinstance(policy, AndPolicy):
return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context)
if isinstance(policy, OrPolicy):
left = self._evaluate_recursive(policy.left, intent, context)
right = self._evaluate_recursive(policy.right, intent, context)
if not left or not right:
return []
return left + right
if isinstance(policy, NotPolicy):
inner = self._evaluate_recursive(policy.inner, intent, context)
if inner:
return []
return ["Condition not met (NOT policy)"]
return []
def allow_all() -> Policy:
class _AllowAll(AtomicPolicy):
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return []
return _AllowAll()
def deny_all() -> Policy:
class _DenyAll(AtomicPolicy):
def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
return ["Action denied by default policy"]
return _DenyAll()
__all__ = [
"Policy",
"RegionAllowedPolicy",
"ResourceTypeRestrictedPolicy",
"MaxPermissionLevelPolicy",
"CostThresholdPolicy",
"AndPolicy",
"OrPolicy",
"NotPolicy",
"PolicyEvaluator",
"allow_all",
"deny_all",
]