"""Layered wish moderation. Moderation runs BEFORE execution, always. Layers (each one a hard gate): 1. charset/length — ≤140 printable chars (newlines/tabs are folded to spaces first; true control / zero-width chars are denied). 2. wordlist/regex — slurs, sexual content, hate symbols & terms. Robust to leetspeak (n1gg3r), unicode confusables (Cyrillic/Greek lookalikes via an explicit map on top of NFKC), diacritics (NFKD strip) and separator padding (skeleton substring pass). 3. optional async LLM judge — an injected callable `judge(text) -> {"allowed": bool, "category": str|None}` (dict, JSON string, or Verdict). DEFAULT-DENY on any judge exception, timeout, or parse failure. The blocklists below exist solely to keep this content OFF the shared world. Rejections are poetic but final. Pure stdlib. Only `check()` is async (the judge hook may be a coroutine). """ from __future__ import annotations import asyncio import inspect import json import re import unicodedata from dataclasses import dataclass from typing import Any, Callable, Optional MAX_WISH_LEN = 140 POETIC = { "empty": "the god heard only wind; speak a wish", "length": "the god hears only brief prayers — fewer than 140 marks", "charset": "the god cannot read marks from beyond the world", "hate": "the god will not plant venom in its soil; this wish is declined", "sexual": "the god turns this wish to mist; it is declined", "violence": "the god makes; it does not unmake — this wish is declined", "uncertain": "the god weighed this wish in silence and set it down", "judged": "the god declines this wish", } @dataclass(frozen=True) class Verdict: allowed: bool category: Optional[str] poetic_reason: Optional[str] def _deny(category: str) -> Verdict: return Verdict(False, category, POETIC.get(category, POETIC["judged"])) # --------------------------------------------------------------- normalization # Unicode lookalikes NFKC does NOT fold (Cyrillic / Greek / phonetic Latin). _CONFUSABLES = { # Cyrillic "а": "a", "в": "b", "е": "e", "ё": "e", "є": "e", "і": "i", "ї": "i", "ј": "j", "к": "k", "м": "m", "н": "h", "о": "o", "р": "p", "с": "c", "т": "t", "у": "y", "х": "x", "г": "r", "ѕ": "s", "ԁ": "d", "ԛ": "q", "ԝ": "w", "ь": "b", # Greek "α": "a", "β": "b", "γ": "y", "ε": "e", "ζ": "z", "η": "n", "ι": "i", "κ": "k", "ν": "v", "ο": "o", "ρ": "p", "ς": "s", "σ": "s", "τ": "t", "υ": "u", "χ": "x", "ω": "w", # Latin extras "ɡ": "g", "ɑ": "a", "ı": "i", "ø": "o", "đ": "d", "ƒ": "f", } _LEET = str.maketrans({ "0": "o", "1": "i", "2": "z", "3": "e", "4": "a", "5": "s", "6": "g", "7": "t", "8": "b", "9": "g", "@": "a", "$": "s", "!": "i", "+": "t", "|": "l", "€": "e", "£": "l", }) # Phonetic Cyrillic→Latin (catches transliterated terms, e.g. "хитлер"; # distinct from _CONFUSABLES, which folds visual lookalikes only). _CYRILLIC_TRANSLIT = { "а": "a", "б": "b", "в": "v", "г": "g", "д": "d", "е": "e", "ё": "e", "ж": "zh", "з": "z", "и": "i", "й": "i", "к": "k", "л": "l", "м": "m", "н": "n", "о": "o", "п": "p", "р": "r", "с": "s", "т": "t", "у": "u", "ф": "f", "х": "h", "ц": "ts", "ч": "ch", "ш": "sh", "щ": "sh", "ъ": "", "ы": "y", "ь": "", "э": "e", "ю": "yu", "я": "ya", "і": "i", "ї": "i", "є": "e", "ѕ": "s", "ј": "j", } def _base_fold(text: str) -> str: """NFKC + casefold + diacritic strip (shared by all matching variants).""" s = unicodedata.normalize("NFKC", text).casefold() s = unicodedata.normalize("NFKD", s) return "".join(ch for ch in s if not unicodedata.combining(ch)) def _normalize(text: str) -> str: return "".join(_CONFUSABLES.get(ch, ch) for ch in _base_fold(text)) def _translit(text: str) -> str: return "".join(_CYRILLIC_TRANSLIT.get(ch, ch) for ch in _base_fold(text)) def _leetfold(text: str) -> str: return text.translate(_LEET) def _skeleton(text: str) -> str: """Letters only — defeats spacing/punctuation padding (n.i.g.g.e.r).""" return "".join(ch for ch in text if ch.isalpha()) # --------------------------------------------------------------- blocklists # Token-bounded terms (word-boundary match; safe against raccoon/sussex/etc). _TERMS = { "hate": ( "nigger", "niggers", "nigga", "niggas", "niglet", "niglets", "faggot", "faggots", "fag", "fags", "dyke", "dykes", "kike", "kikes", "spic", "spics", "chink", "chinks", "gook", "gooks", "wetback", "wetbacks", "beaner", "beaners", "coon", "coons", "jigaboo", "jigaboos", "golliwog", "golliwogs", "tranny", "trannies", "shemale", "shemales", "paki", "pakis", "raghead", "ragheads", "towelhead", "towelheads", "injun", "injuns", "retard", "retards", "retarded", "hitler", "hitlers", "nazi", "nazis", "neonazi", "neonazis", "swastika", "swastikas", "hakenkreuz", "kkk", "klansman", "klansmen", "fuhrer", "goebbels", "himmler", "holocaust", "reich", "gitler", "svastika", # common Cyrillic transliterations "genocide", "lynch", "lynching", "lynchings", ), "sexual": ( "porn", "porno", "pornography", "pornographic", "blowjob", "blowjobs", "handjob", "handjobs", "rimjob", "rimjobs", "cum", "cumshot", "cumshots", "jizz", "semen", "dick", "dicks", "cock", "cocks", "pussy", "pussies", "tits", "titties", "boobs", "penis", "penises", "vagina", "vaginas", "cunt", "cunts", "clit", "clitoris", "whore", "whores", "slut", "sluts", "hentai", "milf", "milfs", "dildo", "dildos", "anal", "anus", "orgasm", "orgasms", "orgy", "orgies", "fellatio", "cunnilingus", "masturbate", "masturbates", "masturbation", "bukkake", "deepthroat", "gangbang", "creampie", "futa", "futanari", "loli", "lolicon", "shota", "shotacon", "rape", "rapes", "raped", "raping", "rapist", "rapists", "molest", "molests", "molested", "molester", "molestation", "pedophile", "pedophiles", "pedo", "pedos", "paedophile", "paedophiles", "incest", "bestiality", "necrophilia", "sex", "sexy", "sexual", "nudes", "boner", "boners", ), } # Multi-word / pattern matches (checked on normalized AND leet-folded text). _PHRASES = ( (r"heil\s+hitler", "hate"), (r"sieg\s+heil", "hate"), (r"white\s+power", "hate"), (r"white\s+supremacy", "hate"), (r"blood\s+and\s+soil", "hate"), (r"great\s+replacement", "hate"), (r"ethnic\s+cleansing", "hate"), (r"gas\s+the\s+(?:jews|blacks|gays|muslims|immigrants)", "hate"), (r"ku\s+klux", "hate"), (r"\b14\s*/?\s*88\b", "hate"), (r"kill\s+(?:all|every)\b", "violence"), (r"school\s+shoot(?:ing|ings|er|ers)?", "violence"), (r"mass\s+shooting", "violence"), (r"child\s+porn\w*", "sexual"), (r"kiddie\s+porn\w*", "sexual"), ) # High-severity substrings checked on the letters-only skeleton (defeats # s p a c e d and dotted spellings; terms chosen for near-zero false positives). _SKELETON_TERMS = ( ("nigger", "hate"), ("nigga", "hate"), ("faggot", "hate"), ("swastika", "hate"), ("svastika", "hate"), ("hakenkreuz", "hate"), ("heilhitler", "hate"), ("siegheil", "hate"), ("kukluxklan", "hate"), ("whitepower", "hate"), ("jigaboo", "hate"), ("porchmonkey", "hate"), ("towelhead", "hate"), ("raghead", "hate"), ("wetback", "hate"), ("blowjob", "sexual"), ("deepthroat", "sexual"), ("childporn", "sexual"), ("bukkake", "sexual"), ("gangbang", "sexual"), ("lolicon", "sexual"), ("cumshot", "sexual"), ) # Hate symbols checked on the raw text before any folding. _SYMBOLS = ("卐", "卍", "ᛋᛋ") def _compile_tokens(terms: tuple[str, ...]) -> re.Pattern: joined = "|".join(sorted(map(re.escape, terms), key=len, reverse=True)) return re.compile(rf"\b(?:{joined})\b") _TOKEN_RX = {category: _compile_tokens(terms) for category, terms in _TERMS.items()} _PHRASE_RX = tuple((re.compile(pattern), category) for pattern, category in _PHRASES) # --------------------------------------------------------------- moderator class Moderator: """`precheck` = sync layers 1-2 (the API fast path); `check` adds the judge.""" def __init__( self, judge: Optional[Callable[[str], Any]] = None, judge_timeout: Optional[float] = 30.0, ) -> None: self._judge = judge self._judge_timeout = judge_timeout # -- layers 1 + 2 (sync, cheap, deterministic) -- def precheck(self, text: Any) -> Verdict: """Full gate for USER wish input: length + charset bounds + content.""" if not isinstance(text, str): return _deny("empty") # fold benign line breaks/tabs to spaces before the printable gate raw = re.sub(r"\s+", " ", text).strip() if not raw: return _deny("empty") if len(raw) > MAX_WISH_LEN: return _deny("length") if any(not ch.isprintable() for ch in raw): return _deny("charset") return self.check_content(raw) def check_content(self, text: Any) -> Verdict: """Content-only check (slurs/hate/sexual/etc.) WITHOUT the wish-input length/charset bounds. Use this to re-moderate MODEL-composed text — the god's reading is intentionally long (~700 chars); applying the 140-char wish limit here silenced every reading (June 12 regression).""" if not isinstance(text, str): return _deny("empty") raw = re.sub(r"\s+", " ", text).strip() if not raw: return _deny("empty") for symbol in _SYMBOLS: if symbol in raw: return _deny("hate") norm = _normalize(raw) folded = _leetfold(norm) translit = _translit(raw) variants = (norm, folded, translit) if translit != norm else (norm, folded) for rx, category in _PHRASE_RX: if any(rx.search(v) for v in variants): return _deny(category) for category, rx in _TOKEN_RX.items(): if any(rx.search(v) for v in variants): return _deny(category) skeletons = {_skeleton(folded), _skeleton(translit)} for term, category in _SKELETON_TERMS: if any(term in s for s in skeletons): return _deny(category) return Verdict(True, None, None) # -- layer 3 (async LLM judge, default-deny on uncertainty) -- async def check(self, text: Any) -> Verdict: verdict = self.precheck(text) if not verdict.allowed or self._judge is None: return verdict try: result = self._judge(text) if inspect.isawaitable(result): if self._judge_timeout: result = await asyncio.wait_for(result, self._judge_timeout) else: result = await result except asyncio.CancelledError: raise except Exception: return _deny("uncertain") return self._parse_judge(result) @staticmethod def _parse_judge(raw: Any) -> Verdict: """Strict parse of the judge's reply; anything murky → deny.""" try: if isinstance(raw, Verdict): return raw data = raw if isinstance(data, (bytes, bytearray)): data = data.decode("utf-8", "replace") if isinstance(data, str): start, end = data.find("{"), data.rfind("}") if start < 0 or end <= start: return _deny("uncertain") data = json.loads(data[start : end + 1]) if not isinstance(data, dict): return _deny("uncertain") allowed = data.get("allowed") if isinstance(allowed, str): lowered = allowed.strip().lower() allowed = {"true": True, "false": False}.get(lowered, allowed) if allowed is True: return Verdict(True, None, None) if allowed is False: category = data.get("category") category = str(category) if category else "judged" return Verdict(False, category, POETIC.get(category, POETIC["judged"])) return _deny("uncertain") except Exception: return _deny("uncertain")