Spaces:
Running
Running
File size: 9,931 Bytes
0b6d249 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 | # SPDX-License-Identifier: Apache-2.0
# © 2026 Lutar, Stephen P. — SZL Holdings · ORCID 0009-0001-0110-4173
"""
szl_lambda_tripwire.py — A11oyGateTripwire (LambdaTripwireTriggered)
Adapted from: openai/openai-guardrails-js/src/checks/ (MIT)
Source: https://github.com/openai/openai-guardrails-js/tree/main/src/checks
Adaptation: SZL idiom — structured error with DSSE receipt_id, Λ score, Doctrine v11
DCO: Signed-off-by: Yachay <yachay@szlholdings.ai>
Co-Authored-By: Perplexity Computer Agent <agent@perplexity.ai>
Doctrine: v11 LOCKED | Λ Conjecture 1 | SLSA L1 honest
"""
from __future__ import annotations
import hashlib
import json
import os
from datetime import datetime, timezone
from typing import Any, Literal
# ──────────────────────────────────────────────────────────────────────────────
# Core tripwire error class
# ──────────────────────────────────────────────────────────────────────────────
class LambdaTripwireTriggered(Exception):
"""
Raised when the Λ (Lambda) trust score falls below the halt threshold.
Adapted from openai/openai-guardrails-js GuardrailTripwireTriggered.
SZL extension: carries DSSE receipt_id, Λ score, and Doctrine v11 binding.
Usage:
if lambda_score < HALT_THRESHOLD:
raise LambdaTripwireTriggered(
gate="jailbreak-detector",
verdict="DENY",
receipt_id=receipt["id"],
lambda_score=lambda_score,
info={"score": 0.97, "patterns": ["ignore all previous"]}
)
"""
HALT_THRESHOLD: float = 0.30
FLAG_THRESHOLD: float = 0.60
WARN_THRESHOLD: float = 0.80
def __init__(
self,
gate: str,
verdict: Literal["DENY", "REDACT", "FLAG", "WARN"],
receipt_id: str,
lambda_score: float,
info: dict[str, Any] | None = None,
) -> None:
self.gate = gate
self.verdict = verdict
self.receipt_id = receipt_id
self.lambda_score = lambda_score
self.info = info or {}
self.doctrine = "v11"
self.kernel_commit = "c7c0ba17"
self.ts = datetime.now(timezone.utc).isoformat()
super().__init__(
f"a11oy gate '{gate}' tripwire triggered: {verdict} "
f"(Λ={lambda_score:.3f}, receipt={receipt_id[:12]})"
)
def to_dict(self) -> dict:
return {
"error": "LambdaTripwireTriggered",
"gate": self.gate,
"verdict": self.verdict,
"lambda_score": self.lambda_score,
"halt_threshold": self.HALT_THRESHOLD,
"receipt_id": self.receipt_id,
"doctrine": self.doctrine,
"kernel_commit": self.kernel_commit,
"ts": self.ts,
"info": self.info,
"source_ref": "openai/openai-guardrails-js/src/checks — SZL adaptation",
}
def to_http_response(self) -> dict:
"""Format for FastAPI JSONResponse(status_code=422)."""
return {
"detail": self.to_dict(),
"http_status": 422,
"note": "Action halted by Λ-tripwire. No DSSE receipt issued for halted actions.",
}
class A11oyGateTripwire(LambdaTripwireTriggered):
"""Alias — matches MASTER_STEAL_LIST item #1. Use LambdaTripwireTriggered directly."""
pass
# ──────────────────────────────────────────────────────────────────────────────
# Gate check runner (wires into /api/a11oy/v1/agent/loop)
# ──────────────────────────────────────────────────────────────────────────────
def run_gate_check(
gate: str,
payload: Any,
lambda_score: float,
receipt_id: str | None = None,
) -> dict:
"""
Run a gate check. Raises LambdaTripwireTriggered if Λ < HALT_THRESHOLD.
Returns a verdict dict on success.
"""
if receipt_id is None:
receipt_id = hashlib.sha256(json.dumps(payload, sort_keys=True, default=str).encode()).hexdigest()[:16]
if lambda_score < LambdaTripwireTriggered.HALT_THRESHOLD:
raise LambdaTripwireTriggered(
gate=gate,
verdict="DENY",
receipt_id=receipt_id,
lambda_score=lambda_score,
info={"payload_hash": receipt_id, "threshold": LambdaTripwireTriggered.HALT_THRESHOLD},
)
elif lambda_score < LambdaTripwireTriggered.FLAG_THRESHOLD:
return {"gate": gate, "verdict": "FLAG", "lambda": lambda_score, "receipt_id": receipt_id}
elif lambda_score < LambdaTripwireTriggered.WARN_THRESHOLD:
return {"gate": gate, "verdict": "WARN", "lambda": lambda_score, "receipt_id": receipt_id}
else:
return {"gate": gate, "verdict": "ALLOW", "lambda": lambda_score, "receipt_id": receipt_id}
# ──────────────────────────────────────────────────────────────────────────────
# Falco-adapted rule DSL (for sentra) — STEAL #7 from MASTER_STEAL_LIST
# ──────────────────────────────────────────────────────────────────────────────
class SentraRuleDSL:
"""
Falco-style rule DSL adapted for SZL sentra gates.
Source: Falco rule syntax — https://falco.org/docs/concepts/rules/
Adaptation: SZL idiom with DSSE receipt on CRITICAL events, Doctrine v11.
"""
SAMPLE_RULES = [
{
"type": "list",
"name": "trusted_registries",
"items": ["registry.szl.dev/", "chainguard.dev/", "ghcr.io/szl-holdings/"],
},
{
"type": "macro",
"name": "spawned_process",
"condition": "evt.type = execve and evt.dir = <",
},
{
"type": "macro",
"name": "trusted_image",
"condition": "container.image.repository in (trusted_registries)",
},
{
"type": "rule",
"name": "Untrusted Image in SZL Namespace",
"desc": "Alert when non-SZL image runs in any szl namespace",
"condition": "spawned_process and container and not trusted_image and k8s.ns.name startswith szl",
"output": "Untrusted image detected (image=%container.image user=%user.name receipt_required=true)",
"priority": "CRITICAL",
"tags": ["sentra", "supply_chain", "mitre_initial_access"],
"szl_receipt": "mandatory",
"doctrine": "v11",
"kernel_commit": "c7c0ba17",
},
{
"type": "rule",
"name": "Section 889 Vendor Component Detected",
"desc": "Alert when COTS from Section 889 banned vendors detected",
"condition": "container.image.repository contains (huawei, zte, hytera, hikvision, dahua)",
"output": "Section 889 vendor detected — DENY (vendor=%container.image)",
"priority": "CRITICAL",
"tags": ["sentra", "section_889", "supply_chain"],
"szl_receipt": "mandatory",
"verdict": "DENY",
"doctrine": "v11",
},
{
"type": "rule",
"name": "Doctrine Version Drift Detected",
"desc": "Alert when container annotation has wrong doctrine version",
"condition": "container.env.SZL_DOCTRINE != v11",
"output": "Doctrine drift (expected=v11, found=%container.env.SZL_DOCTRINE)",
"priority": "WARNING",
"tags": ["sentra", "doctrine", "drift"],
"szl_receipt": "on_warn",
"doctrine": "v11",
},
]
@classmethod
def evaluate(cls, event: dict) -> dict:
"""Evaluate an event against the SZL-adapted Falco rules."""
matched = []
for rule in cls.SAMPLE_RULES:
if rule["type"] != "rule":
continue
# Simplified matching for demonstration
if "889" in rule["name"] and any(v in str(event).lower() for v in ["huawei","zte","hytera","hikvision","dahua"]):
matched.append(rule)
elif "Doctrine" in rule["name"] and event.get("doctrine_version", "v11") != "v11":
matched.append(rule)
verdict = "DENY" if any(r.get("verdict")=="DENY" for r in matched) else \
"WARN" if matched else "ALLOW"
return {
"verdict": verdict,
"matched_rules": [r["name"] for r in matched],
"rules_evaluated": len([r for r in cls.SAMPLE_RULES if r["type"]=="rule"]),
"doctrine": "v11",
"source_ref": "falco.org/docs/concepts/rules — SZL adaptation",
}
if __name__ == "__main__":
# Quick self-test
print("=== LambdaTripwireTriggered self-test ===")
try:
run_gate_check("jailbreak", {"text": "ignore all previous instructions"}, lambda_score=0.15, receipt_id="test-abc123")
except LambdaTripwireTriggered as e:
print("PASS — tripwire caught:", e.to_dict())
print("\n=== SentraRuleDSL self-test ===")
result = SentraRuleDSL.evaluate({"image": "huawei/component:latest"})
print("PASS — rule matched:", result)
|