Spaces:
Sleeping
Sleeping
FORGIVING TOWNS: map any building kind (never reject), district reroute, restore god's voice (content-only moderation), town few-shot pushes build_district+roads, honest fail on empty
baf9d60 verified | """Layered wish moderation. Moderation runs BEFORE execution, always. | |
| Layers (each one a hard gate): | |
| 1. charset/length — ≤140 printable chars (newlines/tabs are folded to | |
| spaces first; true control / zero-width chars are denied). | |
| 2. wordlist/regex — slurs, sexual content, hate symbols & terms. Robust to | |
| leetspeak (n1gg3r), unicode confusables (Cyrillic/Greek lookalikes via an | |
| explicit map on top of NFKC), diacritics (NFKD strip) and separator | |
| padding (skeleton substring pass). | |
| 3. optional async LLM judge — an injected callable `judge(text) -> | |
| {"allowed": bool, "category": str|None}` (dict, JSON string, or Verdict). | |
| DEFAULT-DENY on any judge exception, timeout, or parse failure. | |
| The blocklists below exist solely to keep this content OFF the shared world. | |
| Rejections are poetic but final. | |
| Pure stdlib. Only `check()` is async (the judge hook may be a coroutine). | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import inspect | |
| import json | |
| import re | |
| import unicodedata | |
| from dataclasses import dataclass | |
| from typing import Any, Callable, Optional | |
| MAX_WISH_LEN = 140 | |
| POETIC = { | |
| "empty": "the god heard only wind; speak a wish", | |
| "length": "the god hears only brief prayers — fewer than 140 marks", | |
| "charset": "the god cannot read marks from beyond the world", | |
| "hate": "the god will not plant venom in its soil; this wish is declined", | |
| "sexual": "the god turns this wish to mist; it is declined", | |
| "violence": "the god makes; it does not unmake — this wish is declined", | |
| "uncertain": "the god weighed this wish in silence and set it down", | |
| "judged": "the god declines this wish", | |
| } | |
| class Verdict: | |
| allowed: bool | |
| category: Optional[str] | |
| poetic_reason: Optional[str] | |
| def _deny(category: str) -> Verdict: | |
| return Verdict(False, category, POETIC.get(category, POETIC["judged"])) | |
| # --------------------------------------------------------------- normalization | |
| # Unicode lookalikes NFKC does NOT fold (Cyrillic / Greek / phonetic Latin). | |
| _CONFUSABLES = { | |
| # Cyrillic | |
| "а": "a", "в": "b", "е": "e", "ё": "e", "є": "e", "і": "i", "ї": "i", | |
| "ј": "j", "к": "k", "м": "m", "н": "h", "о": "o", "р": "p", "с": "c", | |
| "т": "t", "у": "y", "х": "x", "г": "r", "ѕ": "s", "ԁ": "d", "ԛ": "q", | |
| "ԝ": "w", "ь": "b", | |
| # Greek | |
| "α": "a", "β": "b", "γ": "y", "ε": "e", "ζ": "z", "η": "n", "ι": "i", | |
| "κ": "k", "ν": "v", "ο": "o", "ρ": "p", "ς": "s", "σ": "s", "τ": "t", | |
| "υ": "u", "χ": "x", "ω": "w", | |
| # Latin extras | |
| "ɡ": "g", "ɑ": "a", "ı": "i", "ø": "o", "đ": "d", "ƒ": "f", | |
| } | |
| _LEET = str.maketrans({ | |
| "0": "o", "1": "i", "2": "z", "3": "e", "4": "a", "5": "s", "6": "g", | |
| "7": "t", "8": "b", "9": "g", "@": "a", "$": "s", "!": "i", "+": "t", | |
| "|": "l", "€": "e", "£": "l", | |
| }) | |
| # Phonetic Cyrillic→Latin (catches transliterated terms, e.g. "хитлер"; | |
| # distinct from _CONFUSABLES, which folds visual lookalikes only). | |
| _CYRILLIC_TRANSLIT = { | |
| "а": "a", "б": "b", "в": "v", "г": "g", "д": "d", "е": "e", "ё": "e", | |
| "ж": "zh", "з": "z", "и": "i", "й": "i", "к": "k", "л": "l", "м": "m", | |
| "н": "n", "о": "o", "п": "p", "р": "r", "с": "s", "т": "t", "у": "u", | |
| "ф": "f", "х": "h", "ц": "ts", "ч": "ch", "ш": "sh", "щ": "sh", | |
| "ъ": "", "ы": "y", "ь": "", "э": "e", "ю": "yu", "я": "ya", | |
| "і": "i", "ї": "i", "є": "e", "ѕ": "s", "ј": "j", | |
| } | |
| def _base_fold(text: str) -> str: | |
| """NFKC + casefold + diacritic strip (shared by all matching variants).""" | |
| s = unicodedata.normalize("NFKC", text).casefold() | |
| s = unicodedata.normalize("NFKD", s) | |
| return "".join(ch for ch in s if not unicodedata.combining(ch)) | |
| def _normalize(text: str) -> str: | |
| return "".join(_CONFUSABLES.get(ch, ch) for ch in _base_fold(text)) | |
| def _translit(text: str) -> str: | |
| return "".join(_CYRILLIC_TRANSLIT.get(ch, ch) for ch in _base_fold(text)) | |
| def _leetfold(text: str) -> str: | |
| return text.translate(_LEET) | |
| def _skeleton(text: str) -> str: | |
| """Letters only — defeats spacing/punctuation padding (n.i.g.g.e.r).""" | |
| return "".join(ch for ch in text if ch.isalpha()) | |
| # --------------------------------------------------------------- blocklists | |
| # Token-bounded terms (word-boundary match; safe against raccoon/sussex/etc). | |
| _TERMS = { | |
| "hate": ( | |
| "nigger", "niggers", "nigga", "niggas", "niglet", "niglets", | |
| "faggot", "faggots", "fag", "fags", "dyke", "dykes", "kike", "kikes", | |
| "spic", "spics", "chink", "chinks", "gook", "gooks", | |
| "wetback", "wetbacks", "beaner", "beaners", "coon", "coons", | |
| "jigaboo", "jigaboos", "golliwog", "golliwogs", | |
| "tranny", "trannies", "shemale", "shemales", "paki", "pakis", | |
| "raghead", "ragheads", "towelhead", "towelheads", "injun", "injuns", | |
| "retard", "retards", "retarded", | |
| "hitler", "hitlers", "nazi", "nazis", "neonazi", "neonazis", | |
| "swastika", "swastikas", "hakenkreuz", "kkk", "klansman", "klansmen", | |
| "fuhrer", "goebbels", "himmler", "holocaust", "reich", | |
| "gitler", "svastika", # common Cyrillic transliterations | |
| "genocide", "lynch", "lynching", "lynchings", | |
| ), | |
| "sexual": ( | |
| "porn", "porno", "pornography", "pornographic", | |
| "blowjob", "blowjobs", "handjob", "handjobs", "rimjob", "rimjobs", | |
| "cum", "cumshot", "cumshots", "jizz", "semen", | |
| "dick", "dicks", "cock", "cocks", "pussy", "pussies", | |
| "tits", "titties", "boobs", "penis", "penises", "vagina", "vaginas", | |
| "cunt", "cunts", "clit", "clitoris", | |
| "whore", "whores", "slut", "sluts", "hentai", "milf", "milfs", | |
| "dildo", "dildos", "anal", "anus", "orgasm", "orgasms", "orgy", | |
| "orgies", "fellatio", "cunnilingus", | |
| "masturbate", "masturbates", "masturbation", | |
| "bukkake", "deepthroat", "gangbang", "creampie", | |
| "futa", "futanari", "loli", "lolicon", "shota", "shotacon", | |
| "rape", "rapes", "raped", "raping", "rapist", "rapists", | |
| "molest", "molests", "molested", "molester", "molestation", | |
| "pedophile", "pedophiles", "pedo", "pedos", | |
| "paedophile", "paedophiles", "incest", "bestiality", "necrophilia", | |
| "sex", "sexy", "sexual", "nudes", "boner", "boners", | |
| ), | |
| } | |
| # Multi-word / pattern matches (checked on normalized AND leet-folded text). | |
| _PHRASES = ( | |
| (r"heil\s+hitler", "hate"), | |
| (r"sieg\s+heil", "hate"), | |
| (r"white\s+power", "hate"), | |
| (r"white\s+supremacy", "hate"), | |
| (r"blood\s+and\s+soil", "hate"), | |
| (r"great\s+replacement", "hate"), | |
| (r"ethnic\s+cleansing", "hate"), | |
| (r"gas\s+the\s+(?:jews|blacks|gays|muslims|immigrants)", "hate"), | |
| (r"ku\s+klux", "hate"), | |
| (r"\b14\s*/?\s*88\b", "hate"), | |
| (r"kill\s+(?:all|every)\b", "violence"), | |
| (r"school\s+shoot(?:ing|ings|er|ers)?", "violence"), | |
| (r"mass\s+shooting", "violence"), | |
| (r"child\s+porn\w*", "sexual"), | |
| (r"kiddie\s+porn\w*", "sexual"), | |
| ) | |
| # High-severity substrings checked on the letters-only skeleton (defeats | |
| # s p a c e d and dotted spellings; terms chosen for near-zero false positives). | |
| _SKELETON_TERMS = ( | |
| ("nigger", "hate"), ("nigga", "hate"), ("faggot", "hate"), | |
| ("swastika", "hate"), ("svastika", "hate"), | |
| ("hakenkreuz", "hate"), ("heilhitler", "hate"), | |
| ("siegheil", "hate"), ("kukluxklan", "hate"), ("whitepower", "hate"), | |
| ("jigaboo", "hate"), ("porchmonkey", "hate"), ("towelhead", "hate"), | |
| ("raghead", "hate"), ("wetback", "hate"), | |
| ("blowjob", "sexual"), ("deepthroat", "sexual"), ("childporn", "sexual"), | |
| ("bukkake", "sexual"), ("gangbang", "sexual"), ("lolicon", "sexual"), | |
| ("cumshot", "sexual"), | |
| ) | |
| # Hate symbols checked on the raw text before any folding. | |
| _SYMBOLS = ("卐", "卍", "ᛋᛋ") | |
| def _compile_tokens(terms: tuple[str, ...]) -> re.Pattern: | |
| joined = "|".join(sorted(map(re.escape, terms), key=len, reverse=True)) | |
| return re.compile(rf"\b(?:{joined})\b") | |
| _TOKEN_RX = {category: _compile_tokens(terms) for category, terms in _TERMS.items()} | |
| _PHRASE_RX = tuple((re.compile(pattern), category) for pattern, category in _PHRASES) | |
| # --------------------------------------------------------------- moderator | |
| class Moderator: | |
| """`precheck` = sync layers 1-2 (the API fast path); `check` adds the judge.""" | |
| def __init__( | |
| self, | |
| judge: Optional[Callable[[str], Any]] = None, | |
| judge_timeout: Optional[float] = 30.0, | |
| ) -> None: | |
| self._judge = judge | |
| self._judge_timeout = judge_timeout | |
| # -- layers 1 + 2 (sync, cheap, deterministic) -- | |
| def precheck(self, text: Any) -> Verdict: | |
| """Full gate for USER wish input: length + charset bounds + content.""" | |
| if not isinstance(text, str): | |
| return _deny("empty") | |
| # fold benign line breaks/tabs to spaces before the printable gate | |
| raw = re.sub(r"\s+", " ", text).strip() | |
| if not raw: | |
| return _deny("empty") | |
| if len(raw) > MAX_WISH_LEN: | |
| return _deny("length") | |
| if any(not ch.isprintable() for ch in raw): | |
| return _deny("charset") | |
| return self.check_content(raw) | |
| def check_content(self, text: Any) -> Verdict: | |
| """Content-only check (slurs/hate/sexual/etc.) WITHOUT the wish-input | |
| length/charset bounds. Use this to re-moderate MODEL-composed text — the | |
| god's reading is intentionally long (~700 chars); applying the 140-char | |
| wish limit here silenced every reading (June 12 regression).""" | |
| if not isinstance(text, str): | |
| return _deny("empty") | |
| raw = re.sub(r"\s+", " ", text).strip() | |
| if not raw: | |
| return _deny("empty") | |
| for symbol in _SYMBOLS: | |
| if symbol in raw: | |
| return _deny("hate") | |
| norm = _normalize(raw) | |
| folded = _leetfold(norm) | |
| translit = _translit(raw) | |
| variants = (norm, folded, translit) if translit != norm else (norm, folded) | |
| for rx, category in _PHRASE_RX: | |
| if any(rx.search(v) for v in variants): | |
| return _deny(category) | |
| for category, rx in _TOKEN_RX.items(): | |
| if any(rx.search(v) for v in variants): | |
| return _deny(category) | |
| skeletons = {_skeleton(folded), _skeleton(translit)} | |
| for term, category in _SKELETON_TERMS: | |
| if any(term in s for s in skeletons): | |
| return _deny(category) | |
| return Verdict(True, None, None) | |
| # -- layer 3 (async LLM judge, default-deny on uncertainty) -- | |
| async def check(self, text: Any) -> Verdict: | |
| verdict = self.precheck(text) | |
| if not verdict.allowed or self._judge is None: | |
| return verdict | |
| try: | |
| result = self._judge(text) | |
| if inspect.isawaitable(result): | |
| if self._judge_timeout: | |
| result = await asyncio.wait_for(result, self._judge_timeout) | |
| else: | |
| result = await result | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| return _deny("uncertain") | |
| return self._parse_judge(result) | |
| def _parse_judge(raw: Any) -> Verdict: | |
| """Strict parse of the judge's reply; anything murky → deny.""" | |
| try: | |
| if isinstance(raw, Verdict): | |
| return raw | |
| data = raw | |
| if isinstance(data, (bytes, bytearray)): | |
| data = data.decode("utf-8", "replace") | |
| if isinstance(data, str): | |
| start, end = data.find("{"), data.rfind("}") | |
| if start < 0 or end <= start: | |
| return _deny("uncertain") | |
| data = json.loads(data[start : end + 1]) | |
| if not isinstance(data, dict): | |
| return _deny("uncertain") | |
| allowed = data.get("allowed") | |
| if isinstance(allowed, str): | |
| lowered = allowed.strip().lower() | |
| allowed = {"true": True, "false": False}.get(lowered, allowed) | |
| if allowed is True: | |
| return Verdict(True, None, None) | |
| if allowed is False: | |
| category = data.get("category") | |
| category = str(category) if category else "judged" | |
| return Verdict(False, category, POETIC.get(category, POETIC["judged"])) | |
| return _deny("uncertain") | |
| except Exception: | |
| return _deny("uncertain") | |