File size: 1,993 Bytes
6364e69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from __future__ import annotations

import uuid
from typing import Any, Iterable

from doctrine.schema import Decision, Rule, RuleBundle

ENGINE_VERSION = "0.1"


class PillarsEngine:
    def __init__(self, bundle: RuleBundle) -> None:
        self.bundle = bundle

    def evaluate(self, text: str, context: dict[str, Any] | None = None) -> Decision:
        reasons = []
        remediations: list[str] = []
        risk_score = 0
        decision = "allow"
        for rule in self.bundle.ordered_rules():
            if rule.matches(text, context):
                reasons.append({
                    "id": rule.id,
                    "title": rule.title,
                    "severity": rule.severity,
                    "rationale": rule.explain,
                })
                if rule.remediation:
                    remediations.append(rule.remediation)
                if rule.severity == "block" or rule.action == "block":
                    decision = "block"
                    risk_score = max(risk_score, 90)
                elif decision != "block" and (rule.severity == "warn" or rule.action == "warn"):
                    decision = "warn"
                    risk_score = max(risk_score, 60)
                else:
                    risk_score = max(risk_score, 30)
        trace_id = str(uuid.uuid4())
        return Decision(
            decision=decision,
            reasons=reasons,
            risk_score=risk_score,
            remediations=remediations,
            trace_id=trace_id,
            engine_version=ENGINE_VERSION,
            rule_bundle_version=self.bundle.version,
        )


def evaluate_text(text: str, rules: Iterable[Rule] | None = None, bundle: RuleBundle | None = None) -> Decision:
    if bundle is None:
        if rules is None:
            raise ValueError("Either rules or bundle must be provided")
        bundle = RuleBundle(list(rules), version="dynamic")
    engine = PillarsEngine(bundle)
    return engine.evaluate(text)