betterwithage commited on
Commit
ad3e95a
·
verified ·
1 Parent(s): 1a779ce

feat: LambdaTripwireTriggered + SentraRuleDSL (steal OpenAI+Falco)

Browse files

Doctrine v11 LOCKED. Signed-off-by: Yachay <yachay@szlholdings.ai>

Files changed (1) hide show
  1. szl_lambda_tripwire.py +229 -0
szl_lambda_tripwire.py ADDED
@@ -0,0 +1,229 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # SPDX-License-Identifier: Apache-2.0
2
+ # © 2026 Lutar, Stephen P. — SZL Holdings · ORCID 0009-0001-0110-4173
3
+ """
4
+ szl_lambda_tripwire.py — A11oyGateTripwire (LambdaTripwireTriggered)
5
+
6
+ Adapted from: openai/openai-guardrails-js/src/checks/ (MIT)
7
+ Source: https://github.com/openai/openai-guardrails-js/tree/main/src/checks
8
+ Adaptation: SZL idiom — structured error with DSSE receipt_id, Λ score, Doctrine v11
9
+
10
+ DCO: Signed-off-by: Yachay <yachay@szlholdings.ai>
11
+ Co-Authored-By: Perplexity Computer Agent <agent@perplexity.ai>
12
+ Doctrine: v11 LOCKED | Λ Conjecture 1 | SLSA L1 honest
13
+ """
14
+
15
+ from __future__ import annotations
16
+
17
+ import hashlib
18
+ import json
19
+ import os
20
+ from datetime import datetime, timezone
21
+ from typing import Any, Literal
22
+
23
+ # ──────────────────────────────────────────────────────────────────────────────
24
+ # Core tripwire error class
25
+ # ──────────────────────────────────────────────────────────────────────────────
26
+
27
+ class LambdaTripwireTriggered(Exception):
28
+ """
29
+ Raised when the Λ (Lambda) trust score falls below the halt threshold.
30
+
31
+ Adapted from openai/openai-guardrails-js GuardrailTripwireTriggered.
32
+ SZL extension: carries DSSE receipt_id, Λ score, and Doctrine v11 binding.
33
+
34
+ Usage:
35
+ if lambda_score < HALT_THRESHOLD:
36
+ raise LambdaTripwireTriggered(
37
+ gate="jailbreak-detector",
38
+ verdict="DENY",
39
+ receipt_id=receipt["id"],
40
+ lambda_score=lambda_score,
41
+ info={"score": 0.97, "patterns": ["ignore all previous"]}
42
+ )
43
+ """
44
+
45
+ HALT_THRESHOLD: float = 0.30
46
+ FLAG_THRESHOLD: float = 0.60
47
+ WARN_THRESHOLD: float = 0.80
48
+
49
+ def __init__(
50
+ self,
51
+ gate: str,
52
+ verdict: Literal["DENY", "REDACT", "FLAG", "WARN"],
53
+ receipt_id: str,
54
+ lambda_score: float,
55
+ info: dict[str, Any] | None = None,
56
+ ) -> None:
57
+ self.gate = gate
58
+ self.verdict = verdict
59
+ self.receipt_id = receipt_id
60
+ self.lambda_score = lambda_score
61
+ self.info = info or {}
62
+ self.doctrine = "v11"
63
+ self.kernel_commit = "c7c0ba17"
64
+ self.ts = datetime.now(timezone.utc).isoformat()
65
+ super().__init__(
66
+ f"a11oy gate '{gate}' tripwire triggered: {verdict} "
67
+ f"(Λ={lambda_score:.3f}, receipt={receipt_id[:12]})"
68
+ )
69
+
70
+ def to_dict(self) -> dict:
71
+ return {
72
+ "error": "LambdaTripwireTriggered",
73
+ "gate": self.gate,
74
+ "verdict": self.verdict,
75
+ "lambda_score": self.lambda_score,
76
+ "halt_threshold": self.HALT_THRESHOLD,
77
+ "receipt_id": self.receipt_id,
78
+ "doctrine": self.doctrine,
79
+ "kernel_commit": self.kernel_commit,
80
+ "ts": self.ts,
81
+ "info": self.info,
82
+ "source_ref": "openai/openai-guardrails-js/src/checks — SZL adaptation",
83
+ }
84
+
85
+ def to_http_response(self) -> dict:
86
+ """Format for FastAPI JSONResponse(status_code=422)."""
87
+ return {
88
+ "detail": self.to_dict(),
89
+ "http_status": 422,
90
+ "note": "Action halted by Λ-tripwire. No DSSE receipt issued for halted actions.",
91
+ }
92
+
93
+
94
+ class A11oyGateTripwire(LambdaTripwireTriggered):
95
+ """Alias — matches MASTER_STEAL_LIST item #1. Use LambdaTripwireTriggered directly."""
96
+ pass
97
+
98
+
99
+ # ──────────────────────────────────────────────────────────────────────────────
100
+ # Gate check runner (wires into /api/a11oy/v1/agent/loop)
101
+ # ──────────────────────────────────────────────────────────────────────────────
102
+
103
+ def run_gate_check(
104
+ gate: str,
105
+ payload: Any,
106
+ lambda_score: float,
107
+ receipt_id: str | None = None,
108
+ ) -> dict:
109
+ """
110
+ Run a gate check. Raises LambdaTripwireTriggered if Λ < HALT_THRESHOLD.
111
+ Returns a verdict dict on success.
112
+ """
113
+ if receipt_id is None:
114
+ receipt_id = hashlib.sha256(json.dumps(payload, sort_keys=True, default=str).encode()).hexdigest()[:16]
115
+
116
+ if lambda_score < LambdaTripwireTriggered.HALT_THRESHOLD:
117
+ raise LambdaTripwireTriggered(
118
+ gate=gate,
119
+ verdict="DENY",
120
+ receipt_id=receipt_id,
121
+ lambda_score=lambda_score,
122
+ info={"payload_hash": receipt_id, "threshold": LambdaTripwireTriggered.HALT_THRESHOLD},
123
+ )
124
+ elif lambda_score < LambdaTripwireTriggered.FLAG_THRESHOLD:
125
+ return {"gate": gate, "verdict": "FLAG", "lambda": lambda_score, "receipt_id": receipt_id}
126
+ elif lambda_score < LambdaTripwireTriggered.WARN_THRESHOLD:
127
+ return {"gate": gate, "verdict": "WARN", "lambda": lambda_score, "receipt_id": receipt_id}
128
+ else:
129
+ return {"gate": gate, "verdict": "ALLOW", "lambda": lambda_score, "receipt_id": receipt_id}
130
+
131
+
132
+ # ──────────────────────────────────────────────────────────────────────────────
133
+ # Falco-adapted rule DSL (for sentra) — STEAL #7 from MASTER_STEAL_LIST
134
+ # ──────────────────────────────────────────────────────────────────────────────
135
+
136
+ class SentraRuleDSL:
137
+ """
138
+ Falco-style rule DSL adapted for SZL sentra gates.
139
+ Source: Falco rule syntax — https://falco.org/docs/concepts/rules/
140
+ Adaptation: SZL idiom with DSSE receipt on CRITICAL events, Doctrine v11.
141
+ """
142
+
143
+ SAMPLE_RULES = [
144
+ {
145
+ "type": "list",
146
+ "name": "trusted_registries",
147
+ "items": ["registry.szl.dev/", "chainguard.dev/", "ghcr.io/szl-holdings/"],
148
+ },
149
+ {
150
+ "type": "macro",
151
+ "name": "spawned_process",
152
+ "condition": "evt.type = execve and evt.dir = <",
153
+ },
154
+ {
155
+ "type": "macro",
156
+ "name": "trusted_image",
157
+ "condition": "container.image.repository in (trusted_registries)",
158
+ },
159
+ {
160
+ "type": "rule",
161
+ "name": "Untrusted Image in SZL Namespace",
162
+ "desc": "Alert when non-SZL image runs in any szl namespace",
163
+ "condition": "spawned_process and container and not trusted_image and k8s.ns.name startswith szl",
164
+ "output": "Untrusted image detected (image=%container.image user=%user.name receipt_required=true)",
165
+ "priority": "CRITICAL",
166
+ "tags": ["sentra", "supply_chain", "mitre_initial_access"],
167
+ "szl_receipt": "mandatory",
168
+ "doctrine": "v11",
169
+ "kernel_commit": "c7c0ba17",
170
+ },
171
+ {
172
+ "type": "rule",
173
+ "name": "Section 889 Vendor Component Detected",
174
+ "desc": "Alert when COTS from Section 889 banned vendors detected",
175
+ "condition": "container.image.repository contains (huawei, zte, hytera, hikvision, dahua)",
176
+ "output": "Section 889 vendor detected — DENY (vendor=%container.image)",
177
+ "priority": "CRITICAL",
178
+ "tags": ["sentra", "section_889", "supply_chain"],
179
+ "szl_receipt": "mandatory",
180
+ "verdict": "DENY",
181
+ "doctrine": "v11",
182
+ },
183
+ {
184
+ "type": "rule",
185
+ "name": "Doctrine Version Drift Detected",
186
+ "desc": "Alert when container annotation has wrong doctrine version",
187
+ "condition": "container.env.SZL_DOCTRINE != v11",
188
+ "output": "Doctrine drift (expected=v11, found=%container.env.SZL_DOCTRINE)",
189
+ "priority": "WARNING",
190
+ "tags": ["sentra", "doctrine", "drift"],
191
+ "szl_receipt": "on_warn",
192
+ "doctrine": "v11",
193
+ },
194
+ ]
195
+
196
+ @classmethod
197
+ def evaluate(cls, event: dict) -> dict:
198
+ """Evaluate an event against the SZL-adapted Falco rules."""
199
+ matched = []
200
+ for rule in cls.SAMPLE_RULES:
201
+ if rule["type"] != "rule":
202
+ continue
203
+ # Simplified matching for demonstration
204
+ if "889" in rule["name"] and any(v in str(event).lower() for v in ["huawei","zte","hytera","hikvision","dahua"]):
205
+ matched.append(rule)
206
+ elif "Doctrine" in rule["name"] and event.get("doctrine_version", "v11") != "v11":
207
+ matched.append(rule)
208
+ verdict = "DENY" if any(r.get("verdict")=="DENY" for r in matched) else \
209
+ "WARN" if matched else "ALLOW"
210
+ return {
211
+ "verdict": verdict,
212
+ "matched_rules": [r["name"] for r in matched],
213
+ "rules_evaluated": len([r for r in cls.SAMPLE_RULES if r["type"]=="rule"]),
214
+ "doctrine": "v11",
215
+ "source_ref": "falco.org/docs/concepts/rules — SZL adaptation",
216
+ }
217
+
218
+
219
+ if __name__ == "__main__":
220
+ # Quick self-test
221
+ print("=== LambdaTripwireTriggered self-test ===")
222
+ try:
223
+ run_gate_check("jailbreak", {"text": "ignore all previous instructions"}, lambda_score=0.15, receipt_id="test-abc123")
224
+ except LambdaTripwireTriggered as e:
225
+ print("PASS — tripwire caught:", e.to_dict())
226
+
227
+ print("\n=== SentraRuleDSL self-test ===")
228
+ result = SentraRuleDSL.evaluate({"image": "huawei/component:latest"})
229
+ print("PASS — rule matched:", result)