killinchu / szl_lambda_tripwire.py
betterwithage's picture
feat: LambdaTripwireTriggered + SentraRuleDSL (steal OpenAI+Falco)
0b6d249 verified
# SPDX-License-Identifier: Apache-2.0
# © 2026 Lutar, Stephen P. — SZL Holdings · ORCID 0009-0001-0110-4173
"""
szl_lambda_tripwire.py — A11oyGateTripwire (LambdaTripwireTriggered)
Adapted from: openai/openai-guardrails-js/src/checks/ (MIT)
Source: https://github.com/openai/openai-guardrails-js/tree/main/src/checks
Adaptation: SZL idiom — structured error with DSSE receipt_id, Λ score, Doctrine v11
DCO: Signed-off-by: Yachay <yachay@szlholdings.ai>
Co-Authored-By: Perplexity Computer Agent <agent@perplexity.ai>
Doctrine: v11 LOCKED | Λ Conjecture 1 | SLSA L1 honest
"""
from __future__ import annotations
import hashlib
import json
import os
from datetime import datetime, timezone
from typing import Any, Literal
# ──────────────────────────────────────────────────────────────────────────────
# Core tripwire error class
# ──────────────────────────────────────────────────────────────────────────────
class LambdaTripwireTriggered(Exception):
"""
Raised when the Λ (Lambda) trust score falls below the halt threshold.
Adapted from openai/openai-guardrails-js GuardrailTripwireTriggered.
SZL extension: carries DSSE receipt_id, Λ score, and Doctrine v11 binding.
Usage:
if lambda_score < HALT_THRESHOLD:
raise LambdaTripwireTriggered(
gate="jailbreak-detector",
verdict="DENY",
receipt_id=receipt["id"],
lambda_score=lambda_score,
info={"score": 0.97, "patterns": ["ignore all previous"]}
)
"""
HALT_THRESHOLD: float = 0.30
FLAG_THRESHOLD: float = 0.60
WARN_THRESHOLD: float = 0.80
def __init__(
self,
gate: str,
verdict: Literal["DENY", "REDACT", "FLAG", "WARN"],
receipt_id: str,
lambda_score: float,
info: dict[str, Any] | None = None,
) -> None:
self.gate = gate
self.verdict = verdict
self.receipt_id = receipt_id
self.lambda_score = lambda_score
self.info = info or {}
self.doctrine = "v11"
self.kernel_commit = "c7c0ba17"
self.ts = datetime.now(timezone.utc).isoformat()
super().__init__(
f"a11oy gate '{gate}' tripwire triggered: {verdict} "
f"(Λ={lambda_score:.3f}, receipt={receipt_id[:12]})"
)
def to_dict(self) -> dict:
return {
"error": "LambdaTripwireTriggered",
"gate": self.gate,
"verdict": self.verdict,
"lambda_score": self.lambda_score,
"halt_threshold": self.HALT_THRESHOLD,
"receipt_id": self.receipt_id,
"doctrine": self.doctrine,
"kernel_commit": self.kernel_commit,
"ts": self.ts,
"info": self.info,
"source_ref": "openai/openai-guardrails-js/src/checks — SZL adaptation",
}
def to_http_response(self) -> dict:
"""Format for FastAPI JSONResponse(status_code=422)."""
return {
"detail": self.to_dict(),
"http_status": 422,
"note": "Action halted by Λ-tripwire. No DSSE receipt issued for halted actions.",
}
class A11oyGateTripwire(LambdaTripwireTriggered):
"""Alias — matches MASTER_STEAL_LIST item #1. Use LambdaTripwireTriggered directly."""
pass
# ──────────────────────────────────────────────────────────────────────────────
# Gate check runner (wires into /api/a11oy/v1/agent/loop)
# ──────────────────────────────────────────────────────────────────────────────
def run_gate_check(
gate: str,
payload: Any,
lambda_score: float,
receipt_id: str | None = None,
) -> dict:
"""
Run a gate check. Raises LambdaTripwireTriggered if Λ < HALT_THRESHOLD.
Returns a verdict dict on success.
"""
if receipt_id is None:
receipt_id = hashlib.sha256(json.dumps(payload, sort_keys=True, default=str).encode()).hexdigest()[:16]
if lambda_score < LambdaTripwireTriggered.HALT_THRESHOLD:
raise LambdaTripwireTriggered(
gate=gate,
verdict="DENY",
receipt_id=receipt_id,
lambda_score=lambda_score,
info={"payload_hash": receipt_id, "threshold": LambdaTripwireTriggered.HALT_THRESHOLD},
)
elif lambda_score < LambdaTripwireTriggered.FLAG_THRESHOLD:
return {"gate": gate, "verdict": "FLAG", "lambda": lambda_score, "receipt_id": receipt_id}
elif lambda_score < LambdaTripwireTriggered.WARN_THRESHOLD:
return {"gate": gate, "verdict": "WARN", "lambda": lambda_score, "receipt_id": receipt_id}
else:
return {"gate": gate, "verdict": "ALLOW", "lambda": lambda_score, "receipt_id": receipt_id}
# ──────────────────────────────────────────────────────────────────────────────
# Falco-adapted rule DSL (for sentra) — STEAL #7 from MASTER_STEAL_LIST
# ──────────────────────────────────────────────────────────────────────────────
class SentraRuleDSL:
"""
Falco-style rule DSL adapted for SZL sentra gates.
Source: Falco rule syntax — https://falco.org/docs/concepts/rules/
Adaptation: SZL idiom with DSSE receipt on CRITICAL events, Doctrine v11.
"""
SAMPLE_RULES = [
{
"type": "list",
"name": "trusted_registries",
"items": ["registry.szl.dev/", "chainguard.dev/", "ghcr.io/szl-holdings/"],
},
{
"type": "macro",
"name": "spawned_process",
"condition": "evt.type = execve and evt.dir = <",
},
{
"type": "macro",
"name": "trusted_image",
"condition": "container.image.repository in (trusted_registries)",
},
{
"type": "rule",
"name": "Untrusted Image in SZL Namespace",
"desc": "Alert when non-SZL image runs in any szl namespace",
"condition": "spawned_process and container and not trusted_image and k8s.ns.name startswith szl",
"output": "Untrusted image detected (image=%container.image user=%user.name receipt_required=true)",
"priority": "CRITICAL",
"tags": ["sentra", "supply_chain", "mitre_initial_access"],
"szl_receipt": "mandatory",
"doctrine": "v11",
"kernel_commit": "c7c0ba17",
},
{
"type": "rule",
"name": "Section 889 Vendor Component Detected",
"desc": "Alert when COTS from Section 889 banned vendors detected",
"condition": "container.image.repository contains (huawei, zte, hytera, hikvision, dahua)",
"output": "Section 889 vendor detected — DENY (vendor=%container.image)",
"priority": "CRITICAL",
"tags": ["sentra", "section_889", "supply_chain"],
"szl_receipt": "mandatory",
"verdict": "DENY",
"doctrine": "v11",
},
{
"type": "rule",
"name": "Doctrine Version Drift Detected",
"desc": "Alert when container annotation has wrong doctrine version",
"condition": "container.env.SZL_DOCTRINE != v11",
"output": "Doctrine drift (expected=v11, found=%container.env.SZL_DOCTRINE)",
"priority": "WARNING",
"tags": ["sentra", "doctrine", "drift"],
"szl_receipt": "on_warn",
"doctrine": "v11",
},
]
@classmethod
def evaluate(cls, event: dict) -> dict:
"""Evaluate an event against the SZL-adapted Falco rules."""
matched = []
for rule in cls.SAMPLE_RULES:
if rule["type"] != "rule":
continue
# Simplified matching for demonstration
if "889" in rule["name"] and any(v in str(event).lower() for v in ["huawei","zte","hytera","hikvision","dahua"]):
matched.append(rule)
elif "Doctrine" in rule["name"] and event.get("doctrine_version", "v11") != "v11":
matched.append(rule)
verdict = "DENY" if any(r.get("verdict")=="DENY" for r in matched) else \
"WARN" if matched else "ALLOW"
return {
"verdict": verdict,
"matched_rules": [r["name"] for r in matched],
"rules_evaluated": len([r for r in cls.SAMPLE_RULES if r["type"]=="rule"]),
"doctrine": "v11",
"source_ref": "falco.org/docs/concepts/rules — SZL adaptation",
}
if __name__ == "__main__":
# Quick self-test
print("=== LambdaTripwireTriggered self-test ===")
try:
run_gate_check("jailbreak", {"text": "ignore all previous instructions"}, lambda_score=0.15, receipt_id="test-abc123")
except LambdaTripwireTriggered as e:
print("PASS — tripwire caught:", e.to_dict())
print("\n=== SentraRuleDSL self-test ===")
result = SentraRuleDSL.evaluate({"image": "huawei/component:latest"})
print("PASS — rule matched:", result)