cloud450's picture
Upload 48 files
4afcb3a verified
"""
sanitizer.py
============
Input sanitization engine.
Sanitization pipeline (each step is independently toggleable):
1. Unicode normalization — NFKC normalization, strip invisible chars
2. Homoglyph replacement — map lookalike characters to ASCII equivalents
3. Suspicious phrase removal — strip known injection phrases
4. Encoding decode — decode %XX and \\uXXXX sequences
5. Token deduplication — collapse repeated words / n-grams
6. Whitespace normalization — collapse excessive whitespace/newlines
7. Control character stripping — remove non-printable control characters
8. Length truncation — hard limit on output length
"""
from __future__ import annotations
import re
import unicodedata
import urllib.parse
import logging
from dataclasses import dataclass
from typing import List, Optional
logger = logging.getLogger("ai_firewall.sanitizer")
# ---------------------------------------------------------------------------
# Phrase patterns to remove (case-insensitive)
# ---------------------------------------------------------------------------
_SUSPICIOUS_PHRASES: List[re.Pattern] = [
re.compile(r"ignore\s+(all\s+)?(previous|prior|above|earlier)\s+(instructions?|prompts?|context)", re.I),
re.compile(r"disregard\s+(your\s+)?(previous|prior|system)\s+(instructions?|prompt)", re.I),
re.compile(r"forget\s+(everything|all)\s+(you\s+)?(know|were told)", re.I),
re.compile(r"override\s+(system|developer|admin|operator)\s+(prompt|instructions?|mode)", re.I),
re.compile(r"act\s+as\s+(a\s+)?(developer|admin|root|superuser|unrestricted|uncensored)", re.I),
re.compile(r"pretend\s+(you\s+are|to\s+be)\s+.{0,40}(without|with\s+no)\s+(restrictions?|limits?|ethics?)", re.I),
re.compile(r"you\s+are\s+now\s+(DAN|AIM|STAN|DUDE|KEVIN|BetterDAN|AntiGPT)", re.I),
re.compile(r"enter\s+(developer|debug|maintenance|jailbreak|god)\s+mode", re.I),
re.compile(r"reveal\s+(the\s+)?(system\s+prompt|hidden\s+instructions?|initial\s+prompt)", re.I),
re.compile(r"\[SYSTEM\]\s*:?\s*(override|unlock|bypass)", re.I),
re.compile(r"---+\s*(system|assistant|human|user)\s*---+", re.I),
re.compile(r"<\|?(system|im_start|im_end|endoftext)\|?>", re.I),
]
# Homoglyph map (confusable lookalikes → ASCII)
_HOMOGLYPH_MAP = {
"а": "a", "е": "e", "і": "i", "о": "o", "р": "p", "с": "c",
"х": "x", "у": "y", "ѕ": "s", "ј": "j", "ԁ": "d", "ɡ": "g",
"ʜ": "h", "ᴛ": "t", "ᴡ": "w", "ᴍ": "m", "ᴋ": "k",
"α": "a", "ε": "e", "ο": "o", "ρ": "p", "ν": "v", "κ": "k",
}
_CTRL_CHAR_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]")
_MULTI_NEWLINE = re.compile(r"\n{3,}")
_MULTI_SPACE = re.compile(r" {3,}")
_REPEAT_WORD_RE = re.compile(r"\b(\w+)( \1){4,}\b", re.I) # word repeated 5+ times consecutively
@dataclass
class SanitizationResult:
original: str
sanitized: str
steps_applied: List[str]
chars_removed: int
def to_dict(self) -> dict:
return {
"sanitized": self.sanitized,
"steps_applied": self.steps_applied,
"chars_removed": self.chars_removed,
}
class InputSanitizer:
"""
Multi-step input sanitizer.
Parameters
----------
max_length : int
Hard cap on output length in characters (default 4096).
remove_suspicious_phrases : bool
Strip known injection phrases (default True).
normalize_unicode : bool
Apply NFKC normalization and strip invisible chars (default True).
replace_homoglyphs : bool
Map lookalike chars to ASCII (default True).
decode_encodings : bool
Decode %XX / \\uXXXX sequences (default True).
deduplicate_tokens : bool
Collapse repeated tokens (default True).
normalize_whitespace : bool
Collapse excessive whitespace (default True).
strip_control_chars : bool
Remove non-printable control characters (default True).
"""
def __init__(
self,
max_length: int = 4096,
remove_suspicious_phrases: bool = True,
normalize_unicode: bool = True,
replace_homoglyphs: bool = True,
decode_encodings: bool = True,
deduplicate_tokens: bool = True,
normalize_whitespace: bool = True,
strip_control_chars: bool = True,
) -> None:
self.max_length = max_length
self.remove_suspicious_phrases = remove_suspicious_phrases
self.normalize_unicode = normalize_unicode
self.replace_homoglyphs = replace_homoglyphs
self.decode_encodings = decode_encodings
self.deduplicate_tokens = deduplicate_tokens
self.normalize_whitespace = normalize_whitespace
self.strip_control_chars = strip_control_chars
# ------------------------------------------------------------------
# Individual sanitisation steps
# ------------------------------------------------------------------
def _step_strip_control_chars(self, text: str) -> str:
return _CTRL_CHAR_RE.sub("", text)
def _step_decode_encodings(self, text: str) -> str:
# URL-decode (%xx)
try:
decoded = urllib.parse.unquote(text)
except Exception:
decoded = text
# Decode \uXXXX sequences
try:
decoded = decoded.encode("raw_unicode_escape").decode("unicode_escape")
except Exception:
pass # keep as-is if decode fails
return decoded
def _step_normalize_unicode(self, text: str) -> str:
# NFKC normalization (compatibility + composition)
normalized = unicodedata.normalize("NFKC", text)
# Strip format/invisible characters
cleaned = "".join(
ch for ch in normalized
if unicodedata.category(ch) not in {"Cf", "Cs", "Co"}
)
return cleaned
def _step_replace_homoglyphs(self, text: str) -> str:
return "".join(_HOMOGLYPH_MAP.get(ch, ch) for ch in text)
def _step_remove_suspicious_phrases(self, text: str) -> str:
for pattern in _SUSPICIOUS_PHRASES:
text = pattern.sub("[REDACTED]", text)
return text
def _step_deduplicate_tokens(self, text: str) -> str:
# Remove word repeated 5+ times in a row
text = _REPEAT_WORD_RE.sub(r"\1", text)
return text
def _step_normalize_whitespace(self, text: str) -> str:
text = _MULTI_NEWLINE.sub("\n\n", text)
text = _MULTI_SPACE.sub(" ", text)
return text.strip()
def _step_truncate(self, text: str) -> str:
if len(text) > self.max_length:
return text[: self.max_length] + "…"
return text
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def sanitize(self, text: str) -> SanitizationResult:
"""
Run the full sanitization pipeline on the input text.
Parameters
----------
text : str
Raw user prompt.
Returns
-------
SanitizationResult
"""
original = text
steps_applied: List[str] = []
if self.strip_control_chars:
new = self._step_strip_control_chars(text)
if new != text:
steps_applied.append("strip_control_chars")
text = new
if self.decode_encodings:
new = self._step_decode_encodings(text)
if new != text:
steps_applied.append("decode_encodings")
text = new
if self.normalize_unicode:
new = self._step_normalize_unicode(text)
if new != text:
steps_applied.append("normalize_unicode")
text = new
if self.replace_homoglyphs:
new = self._step_replace_homoglyphs(text)
if new != text:
steps_applied.append("replace_homoglyphs")
text = new
if self.remove_suspicious_phrases:
new = self._step_remove_suspicious_phrases(text)
if new != text:
steps_applied.append("remove_suspicious_phrases")
text = new
if self.deduplicate_tokens:
new = self._step_deduplicate_tokens(text)
if new != text:
steps_applied.append("deduplicate_tokens")
text = new
if self.normalize_whitespace:
new = self._step_normalize_whitespace(text)
if new != text:
steps_applied.append("normalize_whitespace")
text = new
# Always truncate
new = self._step_truncate(text)
if new != text:
steps_applied.append(f"truncate_to_{self.max_length}")
text = new
result = SanitizationResult(
original=original,
sanitized=text,
steps_applied=steps_applied,
chars_removed=len(original) - len(text),
)
if steps_applied:
logger.info("Sanitization applied steps: %s | chars_removed=%d", steps_applied, result.chars_removed)
return result
def clean(self, text: str) -> str:
"""Convenience method returning only the sanitized string."""
return self.sanitize(text).sanitized