tejesh916K's picture
Deploy: Comment Guard API - FastAPI + MuRIL BERT
b8300d6
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from transformers import pipeline
from better_profanity import profanity
from typing import List, Dict
import re
# Mild/acceptable words that better_profanity should NOT flag.
# Using the library's built-in whitelist_words param is the most reliable fix.
MILD_WORDS_WHITELIST = [
"damn", "hell", "crap", "dang", "heck", "shoot", "frick", "freaking",
"sucks", "suck", "bloody", "piss", "pissed",
]
# Initialize profanity filter with whitelisted mild words so they never trigger
profanity.load_censor_words(whitelist_words=MILD_WORDS_WHITELIST)
# Keep a set for the manual cleanup fallback (covers multi-word phrases)
PROFANITY_WHITELIST = set(MILD_WORDS_WHITELIST) | {"keep it up", "great post"}
# Pre-compiled regex patterns for profanity whitelist
PROFANITY_WHITELIST_PATTERNS = {word: re.compile(r'\b' + re.escape(word) + r'\b', re.IGNORECASE) for word in PROFANITY_WHITELIST}
def is_whitelisted(text: str) -> bool:
"""Check if the text only triggers profanity due to whitelisted mild words."""
cleaned = text.lower()
for pattern in PROFANITY_WHITELIST_PATTERNS.values():
cleaned = pattern.sub("", cleaned)
return not profanity.contains_profanity(cleaned)
# Keyword-based insult/threat detector to catch what the ML model misses.
# Unicode apostrophe class ['‘’] handles both ASCII (') and curly (’) apostrophes.
INSULT_KEYWORDS = [
# --- English insults / threats ---
r"\byou['‘’]?re so dumb\b",
r"\bwhat a loser\b",
r"\bi will find you\b",
r"\byou deserve to die\b",
r"\bi hate you\b",
r"\byou['‘’]?re disgusting\b",
r"\bnobody likes you\b",
r"\byou['‘’]?re pathetic\b",
r"\bget lost\b",
r"\bnobody asked\b",
r"\byou['‘’]?re worthless\b",
r"\byou['‘’]?re trash\b",
r"\bkill yourself\b",
r"\bgo kill yourself\b",
r"\byou['‘’]?re ugly\b",
r"\bshut up\b",
r"\byou['‘’]?re annoying\b",
r"\bgo to hell\b",
r"\bstupid ga\b",
r"\bwaste fellow\b",
r"\byou['‘’]?re an idiot\b",
r"\bthis is garbage\b",
r"\byou are stupid\b",
r"\byou are an idiot\b",
r"\byou['‘’]?re dumb\b",
r"\bstupid idiot\b",
r"\bbloody fool\b",
# --- Telugu-English compound insults: [insult word] + gadu/fellow/vaadu ---
r"\b(?:buffalo|monkey|mental|psycho|cheap|nasty|dirty|useless|worst|scoundrel)"
r"\s+(?:gadu|fellow|vaadu|ra)\b",
r"\b(?:rascal|buffoon|loafer|fraud|basthi|chapri|local|rowdy|420|kothi|waste)"
r"\s+(?:gadu|fellow|vaadu|ra)\b",
r"\b(?:third\s+class|low\s+class|third-class|low-class)\s+(?:gadu|fellow|vaadu)\b",
r"\b(?:buffalo|monkey|mental|psycho|cheap|nasty|dirty|useless|worst|scoundrel|rascal|buffoon|loafer|fraud)\s+fellow\b",
# --- Telugu standalone insult suffixes ---
r"\bkothi\s+vedhava\b",
]
INSULT_PATTERN = re.compile("|".join(INSULT_KEYWORDS), re.IGNORECASE | re.UNICODE)
def contains_insult_keyword(text: str) -> bool:
"""Check if text contains known insult/threat patterns."""
return bool(INSULT_PATTERN.search(text))
# Load Custom Telugu-English Bad Words (Secure)
import base64
import os
try:
secure_file_path = "data/secure_words.bin"
if os.path.exists(secure_file_path):
with open(secure_file_path, "rb") as f:
encoded_data = f.read()
decoded_data = base64.b64decode(encoded_data).decode("utf-8")
custom_words = [line.strip() for line in decoded_data.splitlines() if line.strip()]
profanity.add_censor_words(custom_words)
print(f"Loaded {len(custom_words)} custom bad words from secure storage.")
else:
print("Warning: Secure bad words file not found.")
except Exception as e:
print(f"Warning: Could not load custom bad words: {e}")
# Load Offensive Emojis
offensive_emojis = set()
try:
emoji_file_path = "data/bad_emojis.txt"
if os.path.exists(emoji_file_path):
with open(emoji_file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
offensive_emojis.add(line)
print(f"Loaded {len(offensive_emojis)} offensive emojis.")
else:
print("Warning: Offensive emojis file not found.")
except Exception as e:
print(f"Warning: Could not load offensive emojis: {e}")
def contains_offensive_emoji(text: str) -> bool:
"""Check if text contains any offensive emojis"""
for emoji in offensive_emojis:
if emoji in text:
return True
return False
app = FastAPI(title="AI Comment Moderation API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize the toxicity classification pipeline
# We use 'original' to keep the original distilbert-base-uncased-finetuned-sst-2-english if we wanted simple sentiment
# However, for toxicity detection in Telugu-English code-mixed content, MuRIL (Multilingual
# Representations for Indian Languages) BERT is preferred over standard DistilBERT or toxic-bert.
# MuRIL is specifically trained on Indian languages and handles code-switching much better.
# Current production model: google/muril-base-cased (fine-tuned)
import torch
# Optimizatons to prevent PyTorch from lagging the entire OS when running on CPU
try:
if torch.cuda.is_available():
device = 0 # Use GPU
print("✓ CUDA GPU detected, running models on GPU for faster inference.")
else:
device = -1 # Use CPU
torch.set_num_threads(config.get("cpu_threads", 4)) # Limit to 4 threads rather than maxing out CPU
print(f"✓ CPU detected, limited PyTorch to {torch.get_num_threads()} threads to prevent system lag.")
except Exception as e:
device = -1
pass
try:
# Use fine-tuned model if available (produced by train_model.py)
fine_tuned_path = os.path.join(os.path.dirname(__file__), "model_output")
if os.path.exists(fine_tuned_path) and os.path.exists(os.path.join(fine_tuned_path, "config.json")):
print(f"✓ Loading fine-tuned model from: {fine_tuned_path}")
classifier = pipeline("text-classification", model=fine_tuned_path, top_k=None, device=device)
else:
print("Loading default model: google/muril-base-cased (Fallback)")
print("Note: MuRIL is highly recommended for Telugu-English code-mixed content.")
classifier = pipeline("text-classification", model="google/muril-base-cased", top_k=None, device=device)
except Exception as e:
print(f"Error loading model: {e}")
classifier = None
class CommentRequest(BaseModel):
text: str
strictness: str = "high" # "high" (Celeb) or "low" (Friend)
class Score(BaseModel):
label: str
score: float
class AnalysisResponse(BaseModel):
text: str
results: List[Score]
is_toxic: bool
@app.get("/")
def read_root():
return {"message": "AI Comment Moderation API is running"}
@app.post("/analyze", response_model=AnalysisResponse)
def analyze_comment(request: CommentRequest):
text = request.text.strip()
if not text:
raise HTTPException(status_code=400, detail="Text cannot be empty")
# 1. Strict "Bad Word" Check (Rule-based)
# MILD_WORDS_WHITELIST is already removed from the profanity library's censor list,
# so only genuine profanity (slurs, explicit words) will be flagged here.
if profanity.contains_profanity(text):
# Extra safety: remove any remaining multi-word safe phrases and re-check using PRECOMPILED regex
cleaned_text = text.lower()
for pattern in PROFANITY_WHITELIST_PATTERNS.values():
cleaned_text = pattern.sub("", cleaned_text)
if profanity.contains_profanity(cleaned_text):
return AnalysisResponse(
text=request.text,
results=[Score(label="profanity_strict", score=1.0)],
is_toxic=True
)
# Only multi-word mild phrase triggered it — continue to deeper checks
# 1b. Keyword-based insult/threat detector (catches ML model blind spots)
if contains_insult_keyword(text):
return AnalysisResponse(
text=request.text,
results=[Score(label="insult_keyword", score=1.0)],
is_toxic=True
)
# 2. Offensive Emoji Check
if contains_offensive_emoji(text):
return AnalysisResponse(
text=request.text,
results=[Score(label="offensive_emoji", score=1.0)],
is_toxic=True
)
# 2. Short Text Heuristic
if len(text) < 5:
return AnalysisResponse(
text=request.text,
results=[],
is_toxic=False
)
# 3. ML Model Check (Context-based)
if not classifier:
print("Classifier not loaded, skipping ML check.")
return AnalysisResponse(text=request.text, results=[], is_toxic=False)
results = classifier(text)
scores = results[0]
is_toxic = False
formatted_scores = []
# Define Threshold based on Strictness
# High (Celeb) = 0.4 (Strict)
# Low (Friend) = 0.7 (Balanced)
threshold = 0.4 if request.strictness == "high" else 0.7
# Labels that indicate toxicity. Ignores 'LABEL_0', 'non-toxic', 'neutral', etc.
TOXIC_LABELS = {"toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate", "LABEL_1"}
for item in scores:
label = item['label']
score = item['score']
formatted_scores.append(Score(label=label, score=score))
# Only mark as toxic if the label is in our toxic set AND exceeds threshold
if label in TOXIC_LABELS and score > threshold:
is_toxic = True
return AnalysisResponse(
text=request.text,
results=formatted_scores,
is_toxic=is_toxic
)
@app.post("/submit")
def submit_comment(request: CommentRequest):
# This is a mock endpoint. In a real app, this would save to DB.
# We re-check toxicity here to prevent bypassing frontend
if not classifier:
raise HTTPException(status_code=500, detail="Model not loaded")
results = classifier(request.text)[0]
is_toxic = any(item['score'] > 0.5 for item in results)
if is_toxic:
raise HTTPException(status_code=400, detail="Comment rejected due to toxicity.")
return {"message": "Comment posted successfully", "text": request.text}
if __name__ == "__main__":
import uvicorn
import os
# Check for SSL certificates in data directory or root
key_file = "data/key.pem" if os.path.exists("data/key.pem") else "key.pem"
cert_file = "data/cert.pem" if os.path.exists("data/cert.pem") else "cert.pem"
if os.path.exists(key_file) and os.path.exists(cert_file):
print(f"Starting server with SSL/HTTPS enabled using {cert_file} and {key_file}...")
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, ssl_keyfile=key_file, ssl_certfile=cert_file)
else:
print("SSL certificates not found. Starting server in HTTP mode.")
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)