Spaces:
Sleeping
Sleeping
File size: 11,356 Bytes
b8300d6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 | from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from transformers import pipeline
from better_profanity import profanity
from typing import List, Dict
import re
# Mild/acceptable words that better_profanity should NOT flag.
# Using the library's built-in whitelist_words param is the most reliable fix.
MILD_WORDS_WHITELIST = [
"damn", "hell", "crap", "dang", "heck", "shoot", "frick", "freaking",
"sucks", "suck", "bloody", "piss", "pissed",
]
# Initialize profanity filter with whitelisted mild words so they never trigger
profanity.load_censor_words(whitelist_words=MILD_WORDS_WHITELIST)
# Keep a set for the manual cleanup fallback (covers multi-word phrases)
PROFANITY_WHITELIST = set(MILD_WORDS_WHITELIST) | {"keep it up", "great post"}
# Pre-compiled regex patterns for profanity whitelist
PROFANITY_WHITELIST_PATTERNS = {word: re.compile(r'\b' + re.escape(word) + r'\b', re.IGNORECASE) for word in PROFANITY_WHITELIST}
def is_whitelisted(text: str) -> bool:
"""Check if the text only triggers profanity due to whitelisted mild words."""
cleaned = text.lower()
for pattern in PROFANITY_WHITELIST_PATTERNS.values():
cleaned = pattern.sub("", cleaned)
return not profanity.contains_profanity(cleaned)
# Keyword-based insult/threat detector to catch what the ML model misses.
# Unicode apostrophe class ['ββ] handles both ASCII (') and curly (β) apostrophes.
INSULT_KEYWORDS = [
# --- English insults / threats ---
r"\byou['ββ]?re so dumb\b",
r"\bwhat a loser\b",
r"\bi will find you\b",
r"\byou deserve to die\b",
r"\bi hate you\b",
r"\byou['ββ]?re disgusting\b",
r"\bnobody likes you\b",
r"\byou['ββ]?re pathetic\b",
r"\bget lost\b",
r"\bnobody asked\b",
r"\byou['ββ]?re worthless\b",
r"\byou['ββ]?re trash\b",
r"\bkill yourself\b",
r"\bgo kill yourself\b",
r"\byou['ββ]?re ugly\b",
r"\bshut up\b",
r"\byou['ββ]?re annoying\b",
r"\bgo to hell\b",
r"\bstupid ga\b",
r"\bwaste fellow\b",
r"\byou['ββ]?re an idiot\b",
r"\bthis is garbage\b",
r"\byou are stupid\b",
r"\byou are an idiot\b",
r"\byou['ββ]?re dumb\b",
r"\bstupid idiot\b",
r"\bbloody fool\b",
# --- Telugu-English compound insults: [insult word] + gadu/fellow/vaadu ---
r"\b(?:buffalo|monkey|mental|psycho|cheap|nasty|dirty|useless|worst|scoundrel)"
r"\s+(?:gadu|fellow|vaadu|ra)\b",
r"\b(?:rascal|buffoon|loafer|fraud|basthi|chapri|local|rowdy|420|kothi|waste)"
r"\s+(?:gadu|fellow|vaadu|ra)\b",
r"\b(?:third\s+class|low\s+class|third-class|low-class)\s+(?:gadu|fellow|vaadu)\b",
r"\b(?:buffalo|monkey|mental|psycho|cheap|nasty|dirty|useless|worst|scoundrel|rascal|buffoon|loafer|fraud)\s+fellow\b",
# --- Telugu standalone insult suffixes ---
r"\bkothi\s+vedhava\b",
]
INSULT_PATTERN = re.compile("|".join(INSULT_KEYWORDS), re.IGNORECASE | re.UNICODE)
def contains_insult_keyword(text: str) -> bool:
"""Check if text contains known insult/threat patterns."""
return bool(INSULT_PATTERN.search(text))
# Load Custom Telugu-English Bad Words (Secure)
import base64
import os
try:
secure_file_path = "data/secure_words.bin"
if os.path.exists(secure_file_path):
with open(secure_file_path, "rb") as f:
encoded_data = f.read()
decoded_data = base64.b64decode(encoded_data).decode("utf-8")
custom_words = [line.strip() for line in decoded_data.splitlines() if line.strip()]
profanity.add_censor_words(custom_words)
print(f"Loaded {len(custom_words)} custom bad words from secure storage.")
else:
print("Warning: Secure bad words file not found.")
except Exception as e:
print(f"Warning: Could not load custom bad words: {e}")
# Load Offensive Emojis
offensive_emojis = set()
try:
emoji_file_path = "data/bad_emojis.txt"
if os.path.exists(emoji_file_path):
with open(emoji_file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
offensive_emojis.add(line)
print(f"Loaded {len(offensive_emojis)} offensive emojis.")
else:
print("Warning: Offensive emojis file not found.")
except Exception as e:
print(f"Warning: Could not load offensive emojis: {e}")
def contains_offensive_emoji(text: str) -> bool:
"""Check if text contains any offensive emojis"""
for emoji in offensive_emojis:
if emoji in text:
return True
return False
app = FastAPI(title="AI Comment Moderation API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize the toxicity classification pipeline
# We use 'original' to keep the original distilbert-base-uncased-finetuned-sst-2-english if we wanted simple sentiment
# However, for toxicity detection in Telugu-English code-mixed content, MuRIL (Multilingual
# Representations for Indian Languages) BERT is preferred over standard DistilBERT or toxic-bert.
# MuRIL is specifically trained on Indian languages and handles code-switching much better.
# Current production model: google/muril-base-cased (fine-tuned)
import torch
# Optimizatons to prevent PyTorch from lagging the entire OS when running on CPU
try:
if torch.cuda.is_available():
device = 0 # Use GPU
print("β CUDA GPU detected, running models on GPU for faster inference.")
else:
device = -1 # Use CPU
torch.set_num_threads(config.get("cpu_threads", 4)) # Limit to 4 threads rather than maxing out CPU
print(f"β CPU detected, limited PyTorch to {torch.get_num_threads()} threads to prevent system lag.")
except Exception as e:
device = -1
pass
try:
# Use fine-tuned model if available (produced by train_model.py)
fine_tuned_path = os.path.join(os.path.dirname(__file__), "model_output")
if os.path.exists(fine_tuned_path) and os.path.exists(os.path.join(fine_tuned_path, "config.json")):
print(f"β Loading fine-tuned model from: {fine_tuned_path}")
classifier = pipeline("text-classification", model=fine_tuned_path, top_k=None, device=device)
else:
print("Loading default model: google/muril-base-cased (Fallback)")
print("Note: MuRIL is highly recommended for Telugu-English code-mixed content.")
classifier = pipeline("text-classification", model="google/muril-base-cased", top_k=None, device=device)
except Exception as e:
print(f"Error loading model: {e}")
classifier = None
class CommentRequest(BaseModel):
text: str
strictness: str = "high" # "high" (Celeb) or "low" (Friend)
class Score(BaseModel):
label: str
score: float
class AnalysisResponse(BaseModel):
text: str
results: List[Score]
is_toxic: bool
@app.get("/")
def read_root():
return {"message": "AI Comment Moderation API is running"}
@app.post("/analyze", response_model=AnalysisResponse)
def analyze_comment(request: CommentRequest):
text = request.text.strip()
if not text:
raise HTTPException(status_code=400, detail="Text cannot be empty")
# 1. Strict "Bad Word" Check (Rule-based)
# MILD_WORDS_WHITELIST is already removed from the profanity library's censor list,
# so only genuine profanity (slurs, explicit words) will be flagged here.
if profanity.contains_profanity(text):
# Extra safety: remove any remaining multi-word safe phrases and re-check using PRECOMPILED regex
cleaned_text = text.lower()
for pattern in PROFANITY_WHITELIST_PATTERNS.values():
cleaned_text = pattern.sub("", cleaned_text)
if profanity.contains_profanity(cleaned_text):
return AnalysisResponse(
text=request.text,
results=[Score(label="profanity_strict", score=1.0)],
is_toxic=True
)
# Only multi-word mild phrase triggered it β continue to deeper checks
# 1b. Keyword-based insult/threat detector (catches ML model blind spots)
if contains_insult_keyword(text):
return AnalysisResponse(
text=request.text,
results=[Score(label="insult_keyword", score=1.0)],
is_toxic=True
)
# 2. Offensive Emoji Check
if contains_offensive_emoji(text):
return AnalysisResponse(
text=request.text,
results=[Score(label="offensive_emoji", score=1.0)],
is_toxic=True
)
# 2. Short Text Heuristic
if len(text) < 5:
return AnalysisResponse(
text=request.text,
results=[],
is_toxic=False
)
# 3. ML Model Check (Context-based)
if not classifier:
print("Classifier not loaded, skipping ML check.")
return AnalysisResponse(text=request.text, results=[], is_toxic=False)
results = classifier(text)
scores = results[0]
is_toxic = False
formatted_scores = []
# Define Threshold based on Strictness
# High (Celeb) = 0.4 (Strict)
# Low (Friend) = 0.7 (Balanced)
threshold = 0.4 if request.strictness == "high" else 0.7
# Labels that indicate toxicity. Ignores 'LABEL_0', 'non-toxic', 'neutral', etc.
TOXIC_LABELS = {"toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate", "LABEL_1"}
for item in scores:
label = item['label']
score = item['score']
formatted_scores.append(Score(label=label, score=score))
# Only mark as toxic if the label is in our toxic set AND exceeds threshold
if label in TOXIC_LABELS and score > threshold:
is_toxic = True
return AnalysisResponse(
text=request.text,
results=formatted_scores,
is_toxic=is_toxic
)
@app.post("/submit")
def submit_comment(request: CommentRequest):
# This is a mock endpoint. In a real app, this would save to DB.
# We re-check toxicity here to prevent bypassing frontend
if not classifier:
raise HTTPException(status_code=500, detail="Model not loaded")
results = classifier(request.text)[0]
is_toxic = any(item['score'] > 0.5 for item in results)
if is_toxic:
raise HTTPException(status_code=400, detail="Comment rejected due to toxicity.")
return {"message": "Comment posted successfully", "text": request.text}
if __name__ == "__main__":
import uvicorn
import os
# Check for SSL certificates in data directory or root
key_file = "data/key.pem" if os.path.exists("data/key.pem") else "key.pem"
cert_file = "data/cert.pem" if os.path.exists("data/cert.pem") else "cert.pem"
if os.path.exists(key_file) and os.path.exists(cert_file):
print(f"Starting server with SSL/HTTPS enabled using {cert_file} and {key_file}...")
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, ssl_keyfile=key_file, ssl_certfile=cert_file)
else:
print("SSL certificates not found. Starting server in HTTP mode.")
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|