Spaces:
Running
Running
| # SPDX-License-Identifier: Apache-2.0 | |
| # © 2026 Lutar, Stephen P. — SZL Holdings · ORCID 0009-0001-0110-4173 | |
| """ | |
| szl_lambda_tripwire.py — A11oyGateTripwire (LambdaTripwireTriggered) | |
| Adapted from: openai/openai-guardrails-js/src/checks/ (MIT) | |
| Source: https://github.com/openai/openai-guardrails-js/tree/main/src/checks | |
| Adaptation: SZL idiom — structured error with DSSE receipt_id, Λ score, Doctrine v11 | |
| DCO: Signed-off-by: Yachay <yachay@szlholdings.ai> | |
| Co-Authored-By: Perplexity Computer Agent <agent@perplexity.ai> | |
| Doctrine: v11 LOCKED | Λ Conjecture 1 | SLSA L1 honest | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| import os | |
| from datetime import datetime, timezone | |
| from typing import Any, Literal | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # Core tripwire error class | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| class LambdaTripwireTriggered(Exception): | |
| """ | |
| Raised when the Λ (Lambda) trust score falls below the halt threshold. | |
| Adapted from openai/openai-guardrails-js GuardrailTripwireTriggered. | |
| SZL extension: carries DSSE receipt_id, Λ score, and Doctrine v11 binding. | |
| Usage: | |
| if lambda_score < HALT_THRESHOLD: | |
| raise LambdaTripwireTriggered( | |
| gate="jailbreak-detector", | |
| verdict="DENY", | |
| receipt_id=receipt["id"], | |
| lambda_score=lambda_score, | |
| info={"score": 0.97, "patterns": ["ignore all previous"]} | |
| ) | |
| """ | |
| HALT_THRESHOLD: float = 0.30 | |
| FLAG_THRESHOLD: float = 0.60 | |
| WARN_THRESHOLD: float = 0.80 | |
| def __init__( | |
| self, | |
| gate: str, | |
| verdict: Literal["DENY", "REDACT", "FLAG", "WARN"], | |
| receipt_id: str, | |
| lambda_score: float, | |
| info: dict[str, Any] | None = None, | |
| ) -> None: | |
| self.gate = gate | |
| self.verdict = verdict | |
| self.receipt_id = receipt_id | |
| self.lambda_score = lambda_score | |
| self.info = info or {} | |
| self.doctrine = "v11" | |
| self.kernel_commit = "c7c0ba17" | |
| self.ts = datetime.now(timezone.utc).isoformat() | |
| super().__init__( | |
| f"a11oy gate '{gate}' tripwire triggered: {verdict} " | |
| f"(Λ={lambda_score:.3f}, receipt={receipt_id[:12]})" | |
| ) | |
| def to_dict(self) -> dict: | |
| return { | |
| "error": "LambdaTripwireTriggered", | |
| "gate": self.gate, | |
| "verdict": self.verdict, | |
| "lambda_score": self.lambda_score, | |
| "halt_threshold": self.HALT_THRESHOLD, | |
| "receipt_id": self.receipt_id, | |
| "doctrine": self.doctrine, | |
| "kernel_commit": self.kernel_commit, | |
| "ts": self.ts, | |
| "info": self.info, | |
| "source_ref": "openai/openai-guardrails-js/src/checks — SZL adaptation", | |
| } | |
| def to_http_response(self) -> dict: | |
| """Format for FastAPI JSONResponse(status_code=422).""" | |
| return { | |
| "detail": self.to_dict(), | |
| "http_status": 422, | |
| "note": "Action halted by Λ-tripwire. No DSSE receipt issued for halted actions.", | |
| } | |
| class A11oyGateTripwire(LambdaTripwireTriggered): | |
| """Alias — matches MASTER_STEAL_LIST item #1. Use LambdaTripwireTriggered directly.""" | |
| pass | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # Gate check runner (wires into /api/a11oy/v1/agent/loop) | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def run_gate_check( | |
| gate: str, | |
| payload: Any, | |
| lambda_score: float, | |
| receipt_id: str | None = None, | |
| ) -> dict: | |
| """ | |
| Run a gate check. Raises LambdaTripwireTriggered if Λ < HALT_THRESHOLD. | |
| Returns a verdict dict on success. | |
| """ | |
| if receipt_id is None: | |
| receipt_id = hashlib.sha256(json.dumps(payload, sort_keys=True, default=str).encode()).hexdigest()[:16] | |
| if lambda_score < LambdaTripwireTriggered.HALT_THRESHOLD: | |
| raise LambdaTripwireTriggered( | |
| gate=gate, | |
| verdict="DENY", | |
| receipt_id=receipt_id, | |
| lambda_score=lambda_score, | |
| info={"payload_hash": receipt_id, "threshold": LambdaTripwireTriggered.HALT_THRESHOLD}, | |
| ) | |
| elif lambda_score < LambdaTripwireTriggered.FLAG_THRESHOLD: | |
| return {"gate": gate, "verdict": "FLAG", "lambda": lambda_score, "receipt_id": receipt_id} | |
| elif lambda_score < LambdaTripwireTriggered.WARN_THRESHOLD: | |
| return {"gate": gate, "verdict": "WARN", "lambda": lambda_score, "receipt_id": receipt_id} | |
| else: | |
| return {"gate": gate, "verdict": "ALLOW", "lambda": lambda_score, "receipt_id": receipt_id} | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # Falco-adapted rule DSL (for sentra) — STEAL #7 from MASTER_STEAL_LIST | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| class SentraRuleDSL: | |
| """ | |
| Falco-style rule DSL adapted for SZL sentra gates. | |
| Source: Falco rule syntax — https://falco.org/docs/concepts/rules/ | |
| Adaptation: SZL idiom with DSSE receipt on CRITICAL events, Doctrine v11. | |
| """ | |
| SAMPLE_RULES = [ | |
| { | |
| "type": "list", | |
| "name": "trusted_registries", | |
| "items": ["registry.szl.dev/", "chainguard.dev/", "ghcr.io/szl-holdings/"], | |
| }, | |
| { | |
| "type": "macro", | |
| "name": "spawned_process", | |
| "condition": "evt.type = execve and evt.dir = <", | |
| }, | |
| { | |
| "type": "macro", | |
| "name": "trusted_image", | |
| "condition": "container.image.repository in (trusted_registries)", | |
| }, | |
| { | |
| "type": "rule", | |
| "name": "Untrusted Image in SZL Namespace", | |
| "desc": "Alert when non-SZL image runs in any szl namespace", | |
| "condition": "spawned_process and container and not trusted_image and k8s.ns.name startswith szl", | |
| "output": "Untrusted image detected (image=%container.image user=%user.name receipt_required=true)", | |
| "priority": "CRITICAL", | |
| "tags": ["sentra", "supply_chain", "mitre_initial_access"], | |
| "szl_receipt": "mandatory", | |
| "doctrine": "v11", | |
| "kernel_commit": "c7c0ba17", | |
| }, | |
| { | |
| "type": "rule", | |
| "name": "Section 889 Vendor Component Detected", | |
| "desc": "Alert when COTS from Section 889 banned vendors detected", | |
| "condition": "container.image.repository contains (huawei, zte, hytera, hikvision, dahua)", | |
| "output": "Section 889 vendor detected — DENY (vendor=%container.image)", | |
| "priority": "CRITICAL", | |
| "tags": ["sentra", "section_889", "supply_chain"], | |
| "szl_receipt": "mandatory", | |
| "verdict": "DENY", | |
| "doctrine": "v11", | |
| }, | |
| { | |
| "type": "rule", | |
| "name": "Doctrine Version Drift Detected", | |
| "desc": "Alert when container annotation has wrong doctrine version", | |
| "condition": "container.env.SZL_DOCTRINE != v11", | |
| "output": "Doctrine drift (expected=v11, found=%container.env.SZL_DOCTRINE)", | |
| "priority": "WARNING", | |
| "tags": ["sentra", "doctrine", "drift"], | |
| "szl_receipt": "on_warn", | |
| "doctrine": "v11", | |
| }, | |
| ] | |
| def evaluate(cls, event: dict) -> dict: | |
| """Evaluate an event against the SZL-adapted Falco rules.""" | |
| matched = [] | |
| for rule in cls.SAMPLE_RULES: | |
| if rule["type"] != "rule": | |
| continue | |
| # Simplified matching for demonstration | |
| if "889" in rule["name"] and any(v in str(event).lower() for v in ["huawei","zte","hytera","hikvision","dahua"]): | |
| matched.append(rule) | |
| elif "Doctrine" in rule["name"] and event.get("doctrine_version", "v11") != "v11": | |
| matched.append(rule) | |
| verdict = "DENY" if any(r.get("verdict")=="DENY" for r in matched) else \ | |
| "WARN" if matched else "ALLOW" | |
| return { | |
| "verdict": verdict, | |
| "matched_rules": [r["name"] for r in matched], | |
| "rules_evaluated": len([r for r in cls.SAMPLE_RULES if r["type"]=="rule"]), | |
| "doctrine": "v11", | |
| "source_ref": "falco.org/docs/concepts/rules — SZL adaptation", | |
| } | |
| if __name__ == "__main__": | |
| # Quick self-test | |
| print("=== LambdaTripwireTriggered self-test ===") | |
| try: | |
| run_gate_check("jailbreak", {"text": "ignore all previous instructions"}, lambda_score=0.15, receipt_id="test-abc123") | |
| except LambdaTripwireTriggered as e: | |
| print("PASS — tripwire caught:", e.to_dict()) | |
| print("\n=== SentraRuleDSL self-test ===") | |
| result = SentraRuleDSL.evaluate({"image": "huawei/component:latest"}) | |
| print("PASS — rule matched:", result) | |