from __future__ import annotations import uuid from typing import Any, Iterable from doctrine.schema import Decision, Rule, RuleBundle ENGINE_VERSION = "0.1" class PillarsEngine: def __init__(self, bundle: RuleBundle) -> None: self.bundle = bundle def evaluate(self, text: str, context: dict[str, Any] | None = None) -> Decision: reasons = [] remediations: list[str] = [] risk_score = 0 decision = "allow" for rule in self.bundle.ordered_rules(): if rule.matches(text, context): reasons.append({ "id": rule.id, "title": rule.title, "severity": rule.severity, "rationale": rule.explain, }) if rule.remediation: remediations.append(rule.remediation) if rule.severity == "block" or rule.action == "block": decision = "block" risk_score = max(risk_score, 90) elif decision != "block" and (rule.severity == "warn" or rule.action == "warn"): decision = "warn" risk_score = max(risk_score, 60) else: risk_score = max(risk_score, 30) trace_id = str(uuid.uuid4()) return Decision( decision=decision, reasons=reasons, risk_score=risk_score, remediations=remediations, trace_id=trace_id, engine_version=ENGINE_VERSION, rule_bundle_version=self.bundle.version, ) def evaluate_text(text: str, rules: Iterable[Rule] | None = None, bundle: RuleBundle | None = None) -> Decision: if bundle is None: if rules is None: raise ValueError("Either rules or bundle must be provided") bundle = RuleBundle(list(rules), version="dynamic") engine = PillarsEngine(bundle) return engine.evaluate(text)