""" Policy Algebra – Composable, typed policies for infrastructure governance. """ from __future__ import annotations from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Protocol, Set, TypeVar, Union from pydantic import BaseModel # Relative imports for intents from .intents import ( InfrastructureIntent, ProvisionResourceIntent, GrantAccessIntent, ResourceType, PermissionLevel, Environment, ) Violation = str EvalResult = List[Violation] class Policy(ABC): @abstractmethod def evaluate(self, intent: InfrastructureIntent) -> EvalResult: pass def __and__(self, other: Policy) -> Policy: return AndPolicy(self, other) def __or__(self, other: Policy) -> Policy: return OrPolicy(self, other) def __invert__(self) -> Policy: return NotPolicy(self) class AtomicPolicy(Policy, ABC): pass @dataclass(frozen=True) class RegionAllowedPolicy(AtomicPolicy): allowed_regions: Set[str] def evaluate(self, intent: InfrastructureIntent) -> EvalResult: if isinstance(intent, ProvisionResourceIntent): if intent.region not in self.allowed_regions: return [f"Region '{intent.region}' not allowed. Allowed: {self.allowed_regions}"] return [] @dataclass(frozen=True) class ResourceTypeRestrictedPolicy(AtomicPolicy): forbidden_types: Set[ResourceType] def evaluate(self, intent: InfrastructureIntent) -> EvalResult: if isinstance(intent, ProvisionResourceIntent): if intent.resource_type in self.forbidden_types: return [f"Resource type '{intent.resource_type.value}' is forbidden."] return [] @dataclass(frozen=True) class MaxPermissionLevelPolicy(AtomicPolicy): max_level: PermissionLevel _LEVEL_ORDER = { PermissionLevel.READ: 1, PermissionLevel.WRITE: 2, PermissionLevel.ADMIN: 3, } def evaluate(self, intent: InfrastructureIntent) -> EvalResult: if isinstance(intent, GrantAccessIntent): if self._LEVEL_ORDER[intent.permission_level] > self._LEVEL_ORDER[self.max_level]: return [f"Permission level '{intent.permission_level.value}' exceeds max allowed '{self.max_level.value}'."] return [] @dataclass(frozen=True) class CostThresholdPolicy(AtomicPolicy): max_cost_usd: float def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult: return [] @dataclass(frozen=True) class AndPolicy(Policy): left: Policy right: Policy def evaluate(self, intent: InfrastructureIntent) -> EvalResult: return self.left.evaluate(intent) + self.right.evaluate(intent) @dataclass(frozen=True) class OrPolicy(Policy): left: Policy right: Policy def evaluate(self, intent: InfrastructureIntent) -> EvalResult: left_violations = self.left.evaluate(intent) right_violations = self.right.evaluate(intent) if not left_violations or not right_violations: return [] return left_violations + right_violations @dataclass(frozen=True) class NotPolicy(Policy): inner: Policy def evaluate(self, intent: InfrastructureIntent) -> EvalResult: inner_violations = self.inner.evaluate(intent) if inner_violations: return [] else: return ["Condition not met (NOT policy)"] class PolicyEvaluator: def __init__(self, root_policy: Policy): self._root = root_policy def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult: return self._evaluate_recursive(self._root, intent, context or {}) def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult: if isinstance(policy, CostThresholdPolicy): cost = context.get('cost_estimate') if cost is not None and cost > policy.max_cost_usd: return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"] return [] if isinstance(policy, AtomicPolicy): return policy.evaluate(intent) if isinstance(policy, AndPolicy): return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context) if isinstance(policy, OrPolicy): left = self._evaluate_recursive(policy.left, intent, context) right = self._evaluate_recursive(policy.right, intent, context) if not left or not right: return [] return left + right if isinstance(policy, NotPolicy): inner = self._evaluate_recursive(policy.inner, intent, context) if inner: return [] return ["Condition not met (NOT policy)"] return [] def allow_all() -> Policy: class _AllowAll(AtomicPolicy): def evaluate(self, intent: InfrastructureIntent) -> EvalResult: return [] return _AllowAll() def deny_all() -> Policy: class _DenyAll(AtomicPolicy): def evaluate(self, intent: InfrastructureIntent) -> EvalResult: return ["Action denied by default policy"] return _DenyAll() __all__ = [ "Policy", "RegionAllowedPolicy", "ResourceTypeRestrictedPolicy", "MaxPermissionLevelPolicy", "CostThresholdPolicy", "AndPolicy", "OrPolicy", "NotPolicy", "PolicyEvaluator", "allow_all", "deny_all", ]