| """ |
| Policy Algebra – Composable, typed policies for infrastructure governance. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from abc import ABC, abstractmethod |
| from dataclasses import dataclass |
| from typing import Any, Callable, Dict, List, Optional, Protocol, Set, TypeVar, Union |
|
|
| from pydantic import BaseModel |
|
|
| |
| from .intents import ( |
| InfrastructureIntent, |
| ProvisionResourceIntent, |
| GrantAccessIntent, |
| ResourceType, |
| PermissionLevel, |
| Environment, |
| ) |
|
|
| Violation = str |
| EvalResult = List[Violation] |
|
|
| class Policy(ABC): |
| @abstractmethod |
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| pass |
|
|
| def __and__(self, other: Policy) -> Policy: |
| return AndPolicy(self, other) |
|
|
| def __or__(self, other: Policy) -> Policy: |
| return OrPolicy(self, other) |
|
|
| def __invert__(self) -> Policy: |
| return NotPolicy(self) |
|
|
| class AtomicPolicy(Policy, ABC): |
| pass |
|
|
| @dataclass(frozen=True) |
| class RegionAllowedPolicy(AtomicPolicy): |
| allowed_regions: Set[str] |
|
|
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| if isinstance(intent, ProvisionResourceIntent): |
| if intent.region not in self.allowed_regions: |
| return [f"Region '{intent.region}' not allowed. Allowed: {self.allowed_regions}"] |
| return [] |
|
|
| @dataclass(frozen=True) |
| class ResourceTypeRestrictedPolicy(AtomicPolicy): |
| forbidden_types: Set[ResourceType] |
|
|
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| if isinstance(intent, ProvisionResourceIntent): |
| if intent.resource_type in self.forbidden_types: |
| return [f"Resource type '{intent.resource_type.value}' is forbidden."] |
| return [] |
|
|
| @dataclass(frozen=True) |
| class MaxPermissionLevelPolicy(AtomicPolicy): |
| max_level: PermissionLevel |
|
|
| _LEVEL_ORDER = { |
| PermissionLevel.READ: 1, |
| PermissionLevel.WRITE: 2, |
| PermissionLevel.ADMIN: 3, |
| } |
|
|
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| if isinstance(intent, GrantAccessIntent): |
| if self._LEVEL_ORDER[intent.permission_level] > self._LEVEL_ORDER[self.max_level]: |
| return [f"Permission level '{intent.permission_level.value}' exceeds max allowed '{self.max_level.value}'."] |
| return [] |
|
|
| @dataclass(frozen=True) |
| class CostThresholdPolicy(AtomicPolicy): |
| max_cost_usd: float |
|
|
| def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult: |
| return [] |
|
|
| @dataclass(frozen=True) |
| class AndPolicy(Policy): |
| left: Policy |
| right: Policy |
|
|
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| return self.left.evaluate(intent) + self.right.evaluate(intent) |
|
|
| @dataclass(frozen=True) |
| class OrPolicy(Policy): |
| left: Policy |
| right: Policy |
|
|
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| left_violations = self.left.evaluate(intent) |
| right_violations = self.right.evaluate(intent) |
| if not left_violations or not right_violations: |
| return [] |
| return left_violations + right_violations |
|
|
| @dataclass(frozen=True) |
| class NotPolicy(Policy): |
| inner: Policy |
|
|
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| inner_violations = self.inner.evaluate(intent) |
| if inner_violations: |
| return [] |
| else: |
| return ["Condition not met (NOT policy)"] |
|
|
| class PolicyEvaluator: |
| def __init__(self, root_policy: Policy): |
| self._root = root_policy |
|
|
| def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult: |
| return self._evaluate_recursive(self._root, intent, context or {}) |
|
|
| def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult: |
| if isinstance(policy, CostThresholdPolicy): |
| cost = context.get('cost_estimate') |
| if cost is not None and cost > policy.max_cost_usd: |
| return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"] |
| return [] |
| if isinstance(policy, AtomicPolicy): |
| return policy.evaluate(intent) |
| if isinstance(policy, AndPolicy): |
| return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context) |
| if isinstance(policy, OrPolicy): |
| left = self._evaluate_recursive(policy.left, intent, context) |
| right = self._evaluate_recursive(policy.right, intent, context) |
| if not left or not right: |
| return [] |
| return left + right |
| if isinstance(policy, NotPolicy): |
| inner = self._evaluate_recursive(policy.inner, intent, context) |
| if inner: |
| return [] |
| return ["Condition not met (NOT policy)"] |
| return [] |
|
|
| def allow_all() -> Policy: |
| class _AllowAll(AtomicPolicy): |
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| return [] |
| return _AllowAll() |
|
|
| def deny_all() -> Policy: |
| class _DenyAll(AtomicPolicy): |
| def evaluate(self, intent: InfrastructureIntent) -> EvalResult: |
| return ["Action denied by default policy"] |
| return _DenyAll() |
|
|
| __all__ = [ |
| "Policy", |
| "RegionAllowedPolicy", |
| "ResourceTypeRestrictedPolicy", |
| "MaxPermissionLevelPolicy", |
| "CostThresholdPolicy", |
| "AndPolicy", |
| "OrPolicy", |
| "NotPolicy", |
| "PolicyEvaluator", |
| "allow_all", |
| "deny_all", |
| ] |