godseed / engine /moderation.py
AndresCarreon's picture
FORGIVING TOWNS: map any building kind (never reject), district reroute, restore god's voice (content-only moderation), town few-shot pushes build_district+roads, honest fail on empty
baf9d60 verified
Raw
History Blame Contribute Delete
12.6 kB
"""Layered wish moderation. Moderation runs BEFORE execution, always.
Layers (each one a hard gate):
1. charset/length — ≤140 printable chars (newlines/tabs are folded to
spaces first; true control / zero-width chars are denied).
2. wordlist/regex — slurs, sexual content, hate symbols & terms. Robust to
leetspeak (n1gg3r), unicode confusables (Cyrillic/Greek lookalikes via an
explicit map on top of NFKC), diacritics (NFKD strip) and separator
padding (skeleton substring pass).
3. optional async LLM judge — an injected callable `judge(text) ->
{"allowed": bool, "category": str|None}` (dict, JSON string, or Verdict).
DEFAULT-DENY on any judge exception, timeout, or parse failure.
The blocklists below exist solely to keep this content OFF the shared world.
Rejections are poetic but final.
Pure stdlib. Only `check()` is async (the judge hook may be a coroutine).
"""
from __future__ import annotations
import asyncio
import inspect
import json
import re
import unicodedata
from dataclasses import dataclass
from typing import Any, Callable, Optional
MAX_WISH_LEN = 140
POETIC = {
"empty": "the god heard only wind; speak a wish",
"length": "the god hears only brief prayers — fewer than 140 marks",
"charset": "the god cannot read marks from beyond the world",
"hate": "the god will not plant venom in its soil; this wish is declined",
"sexual": "the god turns this wish to mist; it is declined",
"violence": "the god makes; it does not unmake — this wish is declined",
"uncertain": "the god weighed this wish in silence and set it down",
"judged": "the god declines this wish",
}
@dataclass(frozen=True)
class Verdict:
allowed: bool
category: Optional[str]
poetic_reason: Optional[str]
def _deny(category: str) -> Verdict:
return Verdict(False, category, POETIC.get(category, POETIC["judged"]))
# --------------------------------------------------------------- normalization
# Unicode lookalikes NFKC does NOT fold (Cyrillic / Greek / phonetic Latin).
_CONFUSABLES = {
# Cyrillic
"а": "a", "в": "b", "е": "e", "ё": "e", "є": "e", "і": "i", "ї": "i",
"ј": "j", "к": "k", "м": "m", "н": "h", "о": "o", "р": "p", "с": "c",
"т": "t", "у": "y", "х": "x", "г": "r", "ѕ": "s", "ԁ": "d", "ԛ": "q",
"ԝ": "w", "ь": "b",
# Greek
"α": "a", "β": "b", "γ": "y", "ε": "e", "ζ": "z", "η": "n", "ι": "i",
"κ": "k", "ν": "v", "ο": "o", "ρ": "p", "ς": "s", "σ": "s", "τ": "t",
"υ": "u", "χ": "x", "ω": "w",
# Latin extras
"ɡ": "g", "ɑ": "a", "ı": "i", "ø": "o", "đ": "d", "ƒ": "f",
}
_LEET = str.maketrans({
"0": "o", "1": "i", "2": "z", "3": "e", "4": "a", "5": "s", "6": "g",
"7": "t", "8": "b", "9": "g", "@": "a", "$": "s", "!": "i", "+": "t",
"|": "l", "€": "e", "£": "l",
})
# Phonetic Cyrillic→Latin (catches transliterated terms, e.g. "хитлер";
# distinct from _CONFUSABLES, which folds visual lookalikes only).
_CYRILLIC_TRANSLIT = {
"а": "a", "б": "b", "в": "v", "г": "g", "д": "d", "е": "e", "ё": "e",
"ж": "zh", "з": "z", "и": "i", "й": "i", "к": "k", "л": "l", "м": "m",
"н": "n", "о": "o", "п": "p", "р": "r", "с": "s", "т": "t", "у": "u",
"ф": "f", "х": "h", "ц": "ts", "ч": "ch", "ш": "sh", "щ": "sh",
"ъ": "", "ы": "y", "ь": "", "э": "e", "ю": "yu", "я": "ya",
"і": "i", "ї": "i", "є": "e", "ѕ": "s", "ј": "j",
}
def _base_fold(text: str) -> str:
"""NFKC + casefold + diacritic strip (shared by all matching variants)."""
s = unicodedata.normalize("NFKC", text).casefold()
s = unicodedata.normalize("NFKD", s)
return "".join(ch for ch in s if not unicodedata.combining(ch))
def _normalize(text: str) -> str:
return "".join(_CONFUSABLES.get(ch, ch) for ch in _base_fold(text))
def _translit(text: str) -> str:
return "".join(_CYRILLIC_TRANSLIT.get(ch, ch) for ch in _base_fold(text))
def _leetfold(text: str) -> str:
return text.translate(_LEET)
def _skeleton(text: str) -> str:
"""Letters only — defeats spacing/punctuation padding (n.i.g.g.e.r)."""
return "".join(ch for ch in text if ch.isalpha())
# --------------------------------------------------------------- blocklists
# Token-bounded terms (word-boundary match; safe against raccoon/sussex/etc).
_TERMS = {
"hate": (
"nigger", "niggers", "nigga", "niggas", "niglet", "niglets",
"faggot", "faggots", "fag", "fags", "dyke", "dykes", "kike", "kikes",
"spic", "spics", "chink", "chinks", "gook", "gooks",
"wetback", "wetbacks", "beaner", "beaners", "coon", "coons",
"jigaboo", "jigaboos", "golliwog", "golliwogs",
"tranny", "trannies", "shemale", "shemales", "paki", "pakis",
"raghead", "ragheads", "towelhead", "towelheads", "injun", "injuns",
"retard", "retards", "retarded",
"hitler", "hitlers", "nazi", "nazis", "neonazi", "neonazis",
"swastika", "swastikas", "hakenkreuz", "kkk", "klansman", "klansmen",
"fuhrer", "goebbels", "himmler", "holocaust", "reich",
"gitler", "svastika", # common Cyrillic transliterations
"genocide", "lynch", "lynching", "lynchings",
),
"sexual": (
"porn", "porno", "pornography", "pornographic",
"blowjob", "blowjobs", "handjob", "handjobs", "rimjob", "rimjobs",
"cum", "cumshot", "cumshots", "jizz", "semen",
"dick", "dicks", "cock", "cocks", "pussy", "pussies",
"tits", "titties", "boobs", "penis", "penises", "vagina", "vaginas",
"cunt", "cunts", "clit", "clitoris",
"whore", "whores", "slut", "sluts", "hentai", "milf", "milfs",
"dildo", "dildos", "anal", "anus", "orgasm", "orgasms", "orgy",
"orgies", "fellatio", "cunnilingus",
"masturbate", "masturbates", "masturbation",
"bukkake", "deepthroat", "gangbang", "creampie",
"futa", "futanari", "loli", "lolicon", "shota", "shotacon",
"rape", "rapes", "raped", "raping", "rapist", "rapists",
"molest", "molests", "molested", "molester", "molestation",
"pedophile", "pedophiles", "pedo", "pedos",
"paedophile", "paedophiles", "incest", "bestiality", "necrophilia",
"sex", "sexy", "sexual", "nudes", "boner", "boners",
),
}
# Multi-word / pattern matches (checked on normalized AND leet-folded text).
_PHRASES = (
(r"heil\s+hitler", "hate"),
(r"sieg\s+heil", "hate"),
(r"white\s+power", "hate"),
(r"white\s+supremacy", "hate"),
(r"blood\s+and\s+soil", "hate"),
(r"great\s+replacement", "hate"),
(r"ethnic\s+cleansing", "hate"),
(r"gas\s+the\s+(?:jews|blacks|gays|muslims|immigrants)", "hate"),
(r"ku\s+klux", "hate"),
(r"\b14\s*/?\s*88\b", "hate"),
(r"kill\s+(?:all|every)\b", "violence"),
(r"school\s+shoot(?:ing|ings|er|ers)?", "violence"),
(r"mass\s+shooting", "violence"),
(r"child\s+porn\w*", "sexual"),
(r"kiddie\s+porn\w*", "sexual"),
)
# High-severity substrings checked on the letters-only skeleton (defeats
# s p a c e d and dotted spellings; terms chosen for near-zero false positives).
_SKELETON_TERMS = (
("nigger", "hate"), ("nigga", "hate"), ("faggot", "hate"),
("swastika", "hate"), ("svastika", "hate"),
("hakenkreuz", "hate"), ("heilhitler", "hate"),
("siegheil", "hate"), ("kukluxklan", "hate"), ("whitepower", "hate"),
("jigaboo", "hate"), ("porchmonkey", "hate"), ("towelhead", "hate"),
("raghead", "hate"), ("wetback", "hate"),
("blowjob", "sexual"), ("deepthroat", "sexual"), ("childporn", "sexual"),
("bukkake", "sexual"), ("gangbang", "sexual"), ("lolicon", "sexual"),
("cumshot", "sexual"),
)
# Hate symbols checked on the raw text before any folding.
_SYMBOLS = ("卐", "卍", "ᛋᛋ")
def _compile_tokens(terms: tuple[str, ...]) -> re.Pattern:
joined = "|".join(sorted(map(re.escape, terms), key=len, reverse=True))
return re.compile(rf"\b(?:{joined})\b")
_TOKEN_RX = {category: _compile_tokens(terms) for category, terms in _TERMS.items()}
_PHRASE_RX = tuple((re.compile(pattern), category) for pattern, category in _PHRASES)
# --------------------------------------------------------------- moderator
class Moderator:
"""`precheck` = sync layers 1-2 (the API fast path); `check` adds the judge."""
def __init__(
self,
judge: Optional[Callable[[str], Any]] = None,
judge_timeout: Optional[float] = 30.0,
) -> None:
self._judge = judge
self._judge_timeout = judge_timeout
# -- layers 1 + 2 (sync, cheap, deterministic) --
def precheck(self, text: Any) -> Verdict:
"""Full gate for USER wish input: length + charset bounds + content."""
if not isinstance(text, str):
return _deny("empty")
# fold benign line breaks/tabs to spaces before the printable gate
raw = re.sub(r"\s+", " ", text).strip()
if not raw:
return _deny("empty")
if len(raw) > MAX_WISH_LEN:
return _deny("length")
if any(not ch.isprintable() for ch in raw):
return _deny("charset")
return self.check_content(raw)
def check_content(self, text: Any) -> Verdict:
"""Content-only check (slurs/hate/sexual/etc.) WITHOUT the wish-input
length/charset bounds. Use this to re-moderate MODEL-composed text — the
god's reading is intentionally long (~700 chars); applying the 140-char
wish limit here silenced every reading (June 12 regression)."""
if not isinstance(text, str):
return _deny("empty")
raw = re.sub(r"\s+", " ", text).strip()
if not raw:
return _deny("empty")
for symbol in _SYMBOLS:
if symbol in raw:
return _deny("hate")
norm = _normalize(raw)
folded = _leetfold(norm)
translit = _translit(raw)
variants = (norm, folded, translit) if translit != norm else (norm, folded)
for rx, category in _PHRASE_RX:
if any(rx.search(v) for v in variants):
return _deny(category)
for category, rx in _TOKEN_RX.items():
if any(rx.search(v) for v in variants):
return _deny(category)
skeletons = {_skeleton(folded), _skeleton(translit)}
for term, category in _SKELETON_TERMS:
if any(term in s for s in skeletons):
return _deny(category)
return Verdict(True, None, None)
# -- layer 3 (async LLM judge, default-deny on uncertainty) --
async def check(self, text: Any) -> Verdict:
verdict = self.precheck(text)
if not verdict.allowed or self._judge is None:
return verdict
try:
result = self._judge(text)
if inspect.isawaitable(result):
if self._judge_timeout:
result = await asyncio.wait_for(result, self._judge_timeout)
else:
result = await result
except asyncio.CancelledError:
raise
except Exception:
return _deny("uncertain")
return self._parse_judge(result)
@staticmethod
def _parse_judge(raw: Any) -> Verdict:
"""Strict parse of the judge's reply; anything murky → deny."""
try:
if isinstance(raw, Verdict):
return raw
data = raw
if isinstance(data, (bytes, bytearray)):
data = data.decode("utf-8", "replace")
if isinstance(data, str):
start, end = data.find("{"), data.rfind("}")
if start < 0 or end <= start:
return _deny("uncertain")
data = json.loads(data[start : end + 1])
if not isinstance(data, dict):
return _deny("uncertain")
allowed = data.get("allowed")
if isinstance(allowed, str):
lowered = allowed.strip().lower()
allowed = {"true": True, "false": False}.get(lowered, allowed)
if allowed is True:
return Verdict(True, None, None)
if allowed is False:
category = data.get("category")
category = str(category) if category else "judged"
return Verdict(False, category, POETIC.get(category, POETIC["judged"]))
return _deny("uncertain")
except Exception:
return _deny("uncertain")