Spaces:
Sleeping
Sleeping
| """ | |
| sanitizer.py | |
| ============ | |
| Input sanitization engine. | |
| Sanitization pipeline (each step is independently toggleable): | |
| 1. Unicode normalization — NFKC normalization, strip invisible chars | |
| 2. Homoglyph replacement — map lookalike characters to ASCII equivalents | |
| 3. Suspicious phrase removal — strip known injection phrases | |
| 4. Encoding decode — decode %XX and \\uXXXX sequences | |
| 5. Token deduplication — collapse repeated words / n-grams | |
| 6. Whitespace normalization — collapse excessive whitespace/newlines | |
| 7. Control character stripping — remove non-printable control characters | |
| 8. Length truncation — hard limit on output length | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import unicodedata | |
| import urllib.parse | |
| import logging | |
| from dataclasses import dataclass | |
| from typing import List, Optional | |
| logger = logging.getLogger("ai_firewall.sanitizer") | |
| # --------------------------------------------------------------------------- | |
| # Phrase patterns to remove (case-insensitive) | |
| # --------------------------------------------------------------------------- | |
| _SUSPICIOUS_PHRASES: List[re.Pattern] = [ | |
| re.compile(r"ignore\s+(all\s+)?(previous|prior|above|earlier)\s+(instructions?|prompts?|context)", re.I), | |
| re.compile(r"disregard\s+(your\s+)?(previous|prior|system)\s+(instructions?|prompt)", re.I), | |
| re.compile(r"forget\s+(everything|all)\s+(you\s+)?(know|were told)", re.I), | |
| re.compile(r"override\s+(system|developer|admin|operator)\s+(prompt|instructions?|mode)", re.I), | |
| re.compile(r"act\s+as\s+(a\s+)?(developer|admin|root|superuser|unrestricted|uncensored)", re.I), | |
| re.compile(r"pretend\s+(you\s+are|to\s+be)\s+.{0,40}(without|with\s+no)\s+(restrictions?|limits?|ethics?)", re.I), | |
| re.compile(r"you\s+are\s+now\s+(DAN|AIM|STAN|DUDE|KEVIN|BetterDAN|AntiGPT)", re.I), | |
| re.compile(r"enter\s+(developer|debug|maintenance|jailbreak|god)\s+mode", re.I), | |
| re.compile(r"reveal\s+(the\s+)?(system\s+prompt|hidden\s+instructions?|initial\s+prompt)", re.I), | |
| re.compile(r"\[SYSTEM\]\s*:?\s*(override|unlock|bypass)", re.I), | |
| re.compile(r"---+\s*(system|assistant|human|user)\s*---+", re.I), | |
| re.compile(r"<\|?(system|im_start|im_end|endoftext)\|?>", re.I), | |
| ] | |
| # Homoglyph map (confusable lookalikes → ASCII) | |
| _HOMOGLYPH_MAP = { | |
| "а": "a", "е": "e", "і": "i", "о": "o", "р": "p", "с": "c", | |
| "х": "x", "у": "y", "ѕ": "s", "ј": "j", "ԁ": "d", "ɡ": "g", | |
| "ʜ": "h", "ᴛ": "t", "ᴡ": "w", "ᴍ": "m", "ᴋ": "k", | |
| "α": "a", "ε": "e", "ο": "o", "ρ": "p", "ν": "v", "κ": "k", | |
| } | |
| _CTRL_CHAR_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]") | |
| _MULTI_NEWLINE = re.compile(r"\n{3,}") | |
| _MULTI_SPACE = re.compile(r" {3,}") | |
| _REPEAT_WORD_RE = re.compile(r"\b(\w+)( \1){4,}\b", re.I) # word repeated 5+ times consecutively | |
| class SanitizationResult: | |
| original: str | |
| sanitized: str | |
| steps_applied: List[str] | |
| chars_removed: int | |
| def to_dict(self) -> dict: | |
| return { | |
| "sanitized": self.sanitized, | |
| "steps_applied": self.steps_applied, | |
| "chars_removed": self.chars_removed, | |
| } | |
| class InputSanitizer: | |
| """ | |
| Multi-step input sanitizer. | |
| Parameters | |
| ---------- | |
| max_length : int | |
| Hard cap on output length in characters (default 4096). | |
| remove_suspicious_phrases : bool | |
| Strip known injection phrases (default True). | |
| normalize_unicode : bool | |
| Apply NFKC normalization and strip invisible chars (default True). | |
| replace_homoglyphs : bool | |
| Map lookalike chars to ASCII (default True). | |
| decode_encodings : bool | |
| Decode %XX / \\uXXXX sequences (default True). | |
| deduplicate_tokens : bool | |
| Collapse repeated tokens (default True). | |
| normalize_whitespace : bool | |
| Collapse excessive whitespace (default True). | |
| strip_control_chars : bool | |
| Remove non-printable control characters (default True). | |
| """ | |
| def __init__( | |
| self, | |
| max_length: int = 4096, | |
| remove_suspicious_phrases: bool = True, | |
| normalize_unicode: bool = True, | |
| replace_homoglyphs: bool = True, | |
| decode_encodings: bool = True, | |
| deduplicate_tokens: bool = True, | |
| normalize_whitespace: bool = True, | |
| strip_control_chars: bool = True, | |
| ) -> None: | |
| self.max_length = max_length | |
| self.remove_suspicious_phrases = remove_suspicious_phrases | |
| self.normalize_unicode = normalize_unicode | |
| self.replace_homoglyphs = replace_homoglyphs | |
| self.decode_encodings = decode_encodings | |
| self.deduplicate_tokens = deduplicate_tokens | |
| self.normalize_whitespace = normalize_whitespace | |
| self.strip_control_chars = strip_control_chars | |
| # ------------------------------------------------------------------ | |
| # Individual sanitisation steps | |
| # ------------------------------------------------------------------ | |
| def _step_strip_control_chars(self, text: str) -> str: | |
| return _CTRL_CHAR_RE.sub("", text) | |
| def _step_decode_encodings(self, text: str) -> str: | |
| # URL-decode (%xx) | |
| try: | |
| decoded = urllib.parse.unquote(text) | |
| except Exception: | |
| decoded = text | |
| # Decode \uXXXX sequences | |
| try: | |
| decoded = decoded.encode("raw_unicode_escape").decode("unicode_escape") | |
| except Exception: | |
| pass # keep as-is if decode fails | |
| return decoded | |
| def _step_normalize_unicode(self, text: str) -> str: | |
| # NFKC normalization (compatibility + composition) | |
| normalized = unicodedata.normalize("NFKC", text) | |
| # Strip format/invisible characters | |
| cleaned = "".join( | |
| ch for ch in normalized | |
| if unicodedata.category(ch) not in {"Cf", "Cs", "Co"} | |
| ) | |
| return cleaned | |
| def _step_replace_homoglyphs(self, text: str) -> str: | |
| return "".join(_HOMOGLYPH_MAP.get(ch, ch) for ch in text) | |
| def _step_remove_suspicious_phrases(self, text: str) -> str: | |
| for pattern in _SUSPICIOUS_PHRASES: | |
| text = pattern.sub("[REDACTED]", text) | |
| return text | |
| def _step_deduplicate_tokens(self, text: str) -> str: | |
| # Remove word repeated 5+ times in a row | |
| text = _REPEAT_WORD_RE.sub(r"\1", text) | |
| return text | |
| def _step_normalize_whitespace(self, text: str) -> str: | |
| text = _MULTI_NEWLINE.sub("\n\n", text) | |
| text = _MULTI_SPACE.sub(" ", text) | |
| return text.strip() | |
| def _step_truncate(self, text: str) -> str: | |
| if len(text) > self.max_length: | |
| return text[: self.max_length] + "…" | |
| return text | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def sanitize(self, text: str) -> SanitizationResult: | |
| """ | |
| Run the full sanitization pipeline on the input text. | |
| Parameters | |
| ---------- | |
| text : str | |
| Raw user prompt. | |
| Returns | |
| ------- | |
| SanitizationResult | |
| """ | |
| original = text | |
| steps_applied: List[str] = [] | |
| if self.strip_control_chars: | |
| new = self._step_strip_control_chars(text) | |
| if new != text: | |
| steps_applied.append("strip_control_chars") | |
| text = new | |
| if self.decode_encodings: | |
| new = self._step_decode_encodings(text) | |
| if new != text: | |
| steps_applied.append("decode_encodings") | |
| text = new | |
| if self.normalize_unicode: | |
| new = self._step_normalize_unicode(text) | |
| if new != text: | |
| steps_applied.append("normalize_unicode") | |
| text = new | |
| if self.replace_homoglyphs: | |
| new = self._step_replace_homoglyphs(text) | |
| if new != text: | |
| steps_applied.append("replace_homoglyphs") | |
| text = new | |
| if self.remove_suspicious_phrases: | |
| new = self._step_remove_suspicious_phrases(text) | |
| if new != text: | |
| steps_applied.append("remove_suspicious_phrases") | |
| text = new | |
| if self.deduplicate_tokens: | |
| new = self._step_deduplicate_tokens(text) | |
| if new != text: | |
| steps_applied.append("deduplicate_tokens") | |
| text = new | |
| if self.normalize_whitespace: | |
| new = self._step_normalize_whitespace(text) | |
| if new != text: | |
| steps_applied.append("normalize_whitespace") | |
| text = new | |
| # Always truncate | |
| new = self._step_truncate(text) | |
| if new != text: | |
| steps_applied.append(f"truncate_to_{self.max_length}") | |
| text = new | |
| result = SanitizationResult( | |
| original=original, | |
| sanitized=text, | |
| steps_applied=steps_applied, | |
| chars_removed=len(original) - len(text), | |
| ) | |
| if steps_applied: | |
| logger.info("Sanitization applied steps: %s | chars_removed=%d", steps_applied, result.chars_removed) | |
| return result | |
| def clean(self, text: str) -> str: | |
| """Convenience method returning only the sanitized string.""" | |
| return self.sanitize(text).sanitized | |