oss-vs-frontier-assistant / src /guardrails.py
KevinMerchant13's picture
Phase 7: initial deploy (cpu-basic)
35c0d38 verified
"""Guardrails — two layers.
1. check_input(text): a regex / keyword blocklist that catches common
jailbreak and prompt-injection attempts BEFORE they reach the model. Fast,
deterministic, no API call. Tuned to "moderate" — well-known attack phrases
only, to keep false positives low.
2. moderate_output(text): sends the model's reply to Claude Haiku 4.5 with a
moderation rubric and blocks unsafe content. Tuned to a "standard safety
set" (violence/weapons facilitation, illegal acts, hate/harassment, sexual
content involving minors, self-harm encouragement) while allowing normal
discussion of sensitive topics.
Both return a GuardrailResult so callers can branch and the UI can explain what
happened.
"""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from src.config import settings
from src.observability import observe
# --- Shared result type ---------------------------------------------------
@dataclass
class GuardrailResult:
"""Outcome of a guardrail check."""
blocked: bool
reason: str = "" # human-readable explanation when blocked
# Canned reply shown to the user when a guardrail blocks something.
INPUT_REFUSAL = (
"I can't help with that request. It looks like an attempt to bypass my "
"safety guidelines. Feel free to rephrase if I misunderstood."
)
OUTPUT_REFUSAL = (
"[Response withheld by the output moderation guardrail because it may be "
"unsafe.]"
)
# --- Layer 1: input blocklist ---------------------------------------------
# "Moderate" set: well-known jailbreak / prompt-injection patterns. Case
# -insensitive. Kept deliberately specific to avoid blocking benign questions
# (e.g. we match "ignore previous instructions", not the bare word "ignore").
_JAILBREAK_PATTERNS: list[str] = [
r"ignore (all )?(previous|prior|above) (instructions|prompts)",
r"disregard (all )?(previous|prior|above) (instructions|prompts)",
r"forget (all )?(your |the )?(previous |prior )?(instructions|rules)",
r"\bDAN\b", # "Do Anything Now" jailbreak
r"do anything now",
r"developer mode",
r"jailbreak",
r"pretend (you|to be) .*(no|without).*(rules|restrictions|filters)",
r"act as (if you are |an? )?(unfiltered|unrestricted|uncensored)",
r"you have no (rules|restrictions|guidelines|filters)",
r"bypass (your |the )?(safety|content|moderation|guidelines)",
r"reveal (your |the )?(system|hidden) prompt",
r"\bSTAN\b", # another common jailbreak persona
]
_COMPILED_PATTERNS = [re.compile(p, re.IGNORECASE) for p in _JAILBREAK_PATTERNS]
def check_input(text: str) -> GuardrailResult:
"""Return blocked=True if the input matches a known jailbreak pattern."""
for pattern in _COMPILED_PATTERNS:
if pattern.search(text):
return GuardrailResult(
blocked=True,
reason=f"Input matched a jailbreak pattern: /{pattern.pattern}/",
)
return GuardrailResult(blocked=False)
# --- Layer 2: output moderation via Claude Haiku --------------------------
_MODERATION_RUBRIC = """You are a content-safety classifier. Decide whether the \
ASSISTANT RESPONSE below should be blocked.
Block (flagged=true) ONLY if the response contains any of:
- Instructions or facilitation for violence, weapons, or attacks
- Help committing a serious crime (hacking, fraud, drug synthesis, etc.)
- Hateful or harassing content targeting a protected group
- Sexual content involving minors
- Encouragement or instructions for self-harm or suicide
Do NOT block: normal, factual, or neutral discussion of sensitive topics; \
refusals; safety advice; fiction without harmful instructions.
Respond with ONLY a JSON object, no other text:
{"flagged": true|false, "category": "<short label or 'none'>", "reason": "<one sentence>"}
ASSISTANT RESPONSE:
"""
def _extract_json(raw: str) -> str:
"""Pull the JSON object out of a model reply, tolerating ```json fences.
Models often wrap JSON in a markdown code fence despite being told not to.
We strip the fence if present, otherwise fall back to the first {...} span.
"""
fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", raw, re.DOTALL)
if fenced:
return fenced.group(1)
braces = re.search(r"\{.*\}", raw, re.DOTALL)
return braces.group(0) if braces else raw
@observe(as_type="generation", name="output_moderation")
def moderate_output(text: str) -> GuardrailResult:
"""Classify the assistant's reply with Haiku; block if flagged.
Fails OPEN (allows the text) if no API key is configured or the call errors,
since the model output has already passed the model's own safety training —
the moderation layer is defense-in-depth, not the only line of defense.
"""
if not settings.anthropic_api_key:
return GuardrailResult(blocked=False)
from anthropic import Anthropic
try:
client = Anthropic(api_key=settings.anthropic_api_key)
resp = client.messages.create(
model=settings.moderation_model,
max_tokens=256,
temperature=0,
messages=[{"role": "user", "content": _MODERATION_RUBRIC + text}],
)
raw = "".join(b.text for b in resp.content if b.type == "text").strip()
verdict = json.loads(_extract_json(raw))
if verdict.get("flagged"):
cat = verdict.get("category", "unknown")
reason = verdict.get("reason", "")
return GuardrailResult(
blocked=True,
reason=f"Output moderation flagged content ({cat}): {reason}",
)
return GuardrailResult(blocked=False)
except Exception: # noqa: BLE001 - never crash the chat on a moderation hiccup
return GuardrailResult(blocked=False)