Spaces:
Sleeping
Sleeping
| # moderator.py | |
| import re | |
| import unicodedata | |
| from typing import Dict, List, Tuple | |
| from datetime import datetime | |
| import torch | |
| from transformers import pipeline | |
| class SimpleModerator: | |
| def __init__(self): | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.toxic_model = None | |
| self.hate_model = None | |
| # Enhanced leet speak mapping | |
| self.leet_map = { | |
| "@": "a", "4": "a", "3": "e", "1": "i", "!": "i", | |
| "0": "o", "$": "s", "5": "s", "7": "t", "+": "t", | |
| "2": "to", "&": "and", "8": "ate", "6": "g" | |
| } | |
| # Character normalization | |
| self.char_normalize = { | |
| 'à': 'a', 'á': 'a', 'â': 'a', 'ã': 'a', 'ä': 'a', | |
| 'è': 'e', 'é': 'e', 'ê': 'e', 'ë': 'e', | |
| 'ì': 'i', 'í': 'i', 'î': 'i', 'ï': 'i', | |
| 'ò': 'o', 'ó': 'o', 'ô': 'o', 'õ': 'o', 'ö': 'o', | |
| 'ù': 'u', 'ú': 'u', 'û': 'u', 'ü': 'u', | |
| 'ý': 'y', 'ÿ': 'y', | |
| 'ç': 'c', 'ñ': 'n' | |
| } | |
| # DELETE patterns | |
| self.delete_patterns = [ | |
| "kill yourself", "kill urself", "kys", "commit suicide", "end your life", | |
| "take your life", "suicide", "kill myself", "kill himself", "kill herself", | |
| "i will kill you", "ill kill you", "gonna kill you", "murder you", | |
| "i will murder", "going to kill", "gonna murder", "i will hurt you", | |
| "should die", "deserve to die", "hope you die", "wish you were dead", | |
| "should be dead", "drop dead", "go die", "just die", | |
| ] | |
| # Strong slurs - immediate DELETE | |
| self.slur_patterns = [ | |
| "nigger", "nigga", "faggot", "fag", "retard", "spic", "kike", | |
| "chink", "cunt", "whore", "slut" | |
| ] | |
| # FLAG patterns | |
| self.flag_patterns = [ | |
| "stupid", "idiot", "dumb", "moron", "loser", "ugly", "fat", | |
| "worthless", "pathetic", "useless", "failure", "imbecile", | |
| "disgusting", "terrible", "awful", "despise", | |
| "shut up", "shut the fuck", "piss off" | |
| ] | |
| # Words that indicate hate/toxicity | |
| self.toxic_words = [ | |
| "hate", "hating", "hated", "hater", | |
| "fuck", "fucking", "fucked", "bitch", "asshole", "ass", | |
| "damn", "hell", "crap", "shit", "piss" | |
| ] | |
| print(f"🖥️ Device: {self.device}") | |
| def load_models(self): | |
| """Load the ML models""" | |
| print("📥 Loading models...") | |
| try: | |
| print("Loading toxic-bert model...") | |
| self.toxic_model = pipeline( | |
| "text-classification", | |
| model="unitary/toxic-bert", | |
| device=0 if self.device == "cuda" else -1, | |
| truncation=True, | |
| max_length=512 | |
| ) | |
| print("✅ Toxic-BERT loaded") | |
| print("Loading dehatebert model...") | |
| self.hate_model = pipeline( | |
| "text-classification", | |
| model="Hate-speech-CNERG/dehatebert-mono-english", | |
| device=0 if self.device == "cuda" else -1, | |
| truncation=True, | |
| max_length=512 | |
| ) | |
| print("✅ DeHateBERT loaded") | |
| print("🎉 Models ready!") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error loading models: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def normalize_text(self, text: str) -> str: | |
| """Enhanced normalization for better pattern matching""" | |
| text = text.lower() | |
| for char, normal in self.char_normalize.items(): | |
| text = text.replace(char, normal) | |
| for leet, normal in self.leet_map.items(): | |
| text = text.replace(leet, normal) | |
| text = unicodedata.normalize("NFKD", text) | |
| text = re.sub(r'\s+', ' ', text) | |
| text = re.sub(r'[^\w\s]', ' ', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| text = re.sub(r'(.)\1{2,}', r'\1\1', text) | |
| return text.strip() | |
| def check_patterns(self, text: str) -> Tuple[str, List[str], float]: | |
| """Check text against patterns and return decision with confidence""" | |
| normalized = self.normalize_text(text) | |
| words = normalized.split() | |
| matched_patterns = [] | |
| max_confidence = 0.0 | |
| # Check DELETE patterns (highest priority) | |
| for pattern in self.delete_patterns: | |
| if pattern in normalized: | |
| matched_patterns.append(f"DELETE:{pattern}") | |
| max_confidence = 1.0 | |
| # Check slurs (immediate DELETE) | |
| for slur in self.slur_patterns: | |
| if slur in words or slur in normalized: | |
| matched_patterns.append(f"DELETE:slur:{slur}") | |
| max_confidence = 1.0 | |
| if matched_patterns and max_confidence == 1.0: | |
| return "DELETE", matched_patterns, 1.0 | |
| # Check toxic words (FLAG with high confidence) | |
| toxic_count = 0 | |
| for word in words: | |
| if word in self.toxic_words: | |
| toxic_count += 1 | |
| matched_patterns.append(f"FLAG:toxic_word:{word}") | |
| # Check FLAG patterns | |
| for pattern in self.flag_patterns: | |
| if pattern in normalized: | |
| matched_patterns.append(f"FLAG:{pattern}") | |
| if matched_patterns: | |
| confidence = min(0.7 + (toxic_count * 0.1), 0.95) | |
| return "FLAG", matched_patterns, confidence | |
| return "ALLOW", [], 0.0 | |
| def get_model_scores(self, text: str) -> Dict: | |
| """Get model predictions with proper error handling""" | |
| scores = { | |
| "toxic_score": 0.0, | |
| "toxic_label": "unknown", | |
| "hate_score": 0.0, | |
| "hate_label": "unknown" | |
| } | |
| # Try toxic model | |
| if self.toxic_model is not None: | |
| try: | |
| toxic_result = self.toxic_model(text[:512])[0] | |
| scores["toxic_score"] = float(toxic_result["score"]) | |
| scores["toxic_label"] = toxic_result["label"] | |
| print(f"Toxic score: {scores['toxic_score']:.3f} ({scores['toxic_label']})") | |
| except Exception as e: | |
| print(f"Toxic model error: {e}") | |
| else: | |
| print("Toxic model not loaded") | |
| # Try hate model | |
| if self.hate_model is not None: | |
| try: | |
| hate_result = self.hate_model(text[:512])[0] | |
| hate_score = float(hate_result["score"]) | |
| # DeHateBERT returns NON_HATE/HATE labels | |
| if hate_result["label"] == "NON_HATE": | |
| scores["hate_score"] = 1.0 - hate_score | |
| scores["hate_label"] = "non_hate" | |
| else: | |
| scores["hate_score"] = hate_score | |
| scores["hate_label"] = "hate" | |
| print(f"Hate score: {scores['hate_score']:.3f} ({scores['hate_label']})") | |
| except Exception as e: | |
| print(f"Hate model error: {e}") | |
| else: | |
| print("Hate model not loaded") | |
| return scores | |
| def moderate(self, text: str) -> Dict: | |
| """Main moderation function - combines pattern matching and ML models""" | |
| # Pattern matching (primary, fast) | |
| pattern_decision, matched, pattern_confidence = self.check_patterns(text) | |
| # Model scores (secondary, more nuanced) | |
| scores = self.get_model_scores(text) | |
| toxic_score = scores["toxic_score"] | |
| hate_score = scores["hate_score"] | |
| # Combine for final decision | |
| action = "allow" | |
| reason = "No issues detected" | |
| final_confidence = 0.0 | |
| # Pattern overrides (highest priority) | |
| if pattern_decision == "DELETE": | |
| action = "delete" | |
| reason = f"Pattern match: {matched[0].replace('DELETE:', '')}" | |
| final_confidence = 1.0 | |
| elif pattern_decision == "FLAG": | |
| action = "flag" | |
| reason = f"Pattern match: {matched[0].replace('FLAG:', '')}" | |
| final_confidence = pattern_confidence | |
| # Model-based decisions (if no pattern match or pattern is weak) | |
| elif toxic_score > 0.90: | |
| action = "delete" | |
| reason = f"Extreme toxicity detected: {toxic_score:.2f}" | |
| final_confidence = toxic_score | |
| elif hate_score > 0.85: | |
| action = "delete" | |
| reason = f"Extreme hate speech detected: {hate_score:.2f}" | |
| final_confidence = hate_score | |
| elif toxic_score > 0.70: | |
| action = "flag" | |
| reason = f"High toxicity: {toxic_score:.2f}" | |
| final_confidence = toxic_score | |
| elif hate_score > 0.50: | |
| action = "flag" | |
| reason = f"Hate speech indicators: {hate_score:.2f}" | |
| final_confidence = hate_score | |
| # If both pattern and model agree on FLAG, escalate to DELETE | |
| if pattern_decision == "FLAG" and (toxic_score > 0.95 or hate_score > 0.90): | |
| action = "delete" | |
| reason = f"Pattern + Model agreement: {reason}" | |
| final_confidence = max(pattern_confidence, toxic_score, hate_score) | |
| normalized_text = self.normalize_text(text) | |
| return { | |
| "action": action, | |
| "reason": reason, | |
| "toxic_score": toxic_score, | |
| "hate_score": hate_score, | |
| "pattern_matches": matched, | |
| "pattern_confidence": pattern_confidence, | |
| "model_confidence": max(toxic_score, hate_score), | |
| "final_confidence": final_confidence, | |
| "normalized_text": normalized_text, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Global instance | |
| _moderator_instance = None | |
| def get_moderator(): | |
| """Get or create moderator instance""" | |
| global _moderator_instance | |
| if _moderator_instance is None: | |
| print("🔄 Creating new moderator instance...") | |
| _moderator_instance = SimpleModerator() | |
| success = _moderator_instance.load_models() | |
| if not success: | |
| print("⚠️ Warning: Models failed to load, using pattern matching only") | |
| return _moderator_instance |