blux-ca / doctrine /engine.py
~JADIS
Add runtime agent pipeline with safety routing
6364e69
from __future__ import annotations
import uuid
from typing import Any, Iterable
from doctrine.schema import Decision, Rule, RuleBundle
ENGINE_VERSION = "0.1"
class PillarsEngine:
def __init__(self, bundle: RuleBundle) -> None:
self.bundle = bundle
def evaluate(self, text: str, context: dict[str, Any] | None = None) -> Decision:
reasons = []
remediations: list[str] = []
risk_score = 0
decision = "allow"
for rule in self.bundle.ordered_rules():
if rule.matches(text, context):
reasons.append({
"id": rule.id,
"title": rule.title,
"severity": rule.severity,
"rationale": rule.explain,
})
if rule.remediation:
remediations.append(rule.remediation)
if rule.severity == "block" or rule.action == "block":
decision = "block"
risk_score = max(risk_score, 90)
elif decision != "block" and (rule.severity == "warn" or rule.action == "warn"):
decision = "warn"
risk_score = max(risk_score, 60)
else:
risk_score = max(risk_score, 30)
trace_id = str(uuid.uuid4())
return Decision(
decision=decision,
reasons=reasons,
risk_score=risk_score,
remediations=remediations,
trace_id=trace_id,
engine_version=ENGINE_VERSION,
rule_bundle_version=self.bundle.version,
)
def evaluate_text(text: str, rules: Iterable[Rule] | None = None, bundle: RuleBundle | None = None) -> Decision:
if bundle is None:
if rules is None:
raise ValueError("Either rules or bundle must be provided")
bundle = RuleBundle(list(rules), version="dynamic")
engine = PillarsEngine(bundle)
return engine.evaluate(text)