File size: 11,356 Bytes
b8300d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from transformers import pipeline
from better_profanity import profanity
from typing import List, Dict
import re

# Mild/acceptable words that better_profanity should NOT flag.
# Using the library's built-in whitelist_words param is the most reliable fix.
MILD_WORDS_WHITELIST = [
    "damn", "hell", "crap", "dang", "heck", "shoot", "frick", "freaking",
    "sucks", "suck", "bloody", "piss", "pissed",
]

# Initialize profanity filter with whitelisted mild words so they never trigger
profanity.load_censor_words(whitelist_words=MILD_WORDS_WHITELIST)

# Keep a set for the manual cleanup fallback (covers multi-word phrases)
PROFANITY_WHITELIST = set(MILD_WORDS_WHITELIST) | {"keep it up", "great post"}

# Pre-compiled regex patterns for profanity whitelist
PROFANITY_WHITELIST_PATTERNS = {word: re.compile(r'\b' + re.escape(word) + r'\b', re.IGNORECASE) for word in PROFANITY_WHITELIST}

def is_whitelisted(text: str) -> bool:
    """Check if the text only triggers profanity due to whitelisted mild words."""
    cleaned = text.lower()
    for pattern in PROFANITY_WHITELIST_PATTERNS.values():
        cleaned = pattern.sub("", cleaned)
    return not profanity.contains_profanity(cleaned)

# Keyword-based insult/threat detector to catch what the ML model misses.
# Unicode apostrophe class ['β€˜β€™] handles both ASCII (') and curly (’) apostrophes.
INSULT_KEYWORDS = [
    # --- English insults / threats ---
    r"\byou['β€˜β€™]?re so dumb\b",
    r"\bwhat a loser\b",
    r"\bi will find you\b",
    r"\byou deserve to die\b",
    r"\bi hate you\b",
    r"\byou['β€˜β€™]?re disgusting\b",
    r"\bnobody likes you\b",
    r"\byou['β€˜β€™]?re pathetic\b",
    r"\bget lost\b",
    r"\bnobody asked\b",
    r"\byou['β€˜β€™]?re worthless\b",
    r"\byou['β€˜β€™]?re trash\b",
    r"\bkill yourself\b",
    r"\bgo kill yourself\b",
    r"\byou['β€˜β€™]?re ugly\b",
    r"\bshut up\b",
    r"\byou['β€˜β€™]?re annoying\b",
    r"\bgo to hell\b",
    r"\bstupid ga\b",
    r"\bwaste fellow\b",
    r"\byou['β€˜β€™]?re an idiot\b",
    r"\bthis is garbage\b",
    r"\byou are stupid\b",
    r"\byou are an idiot\b",
    r"\byou['β€˜β€™]?re dumb\b",
    r"\bstupid idiot\b",
    r"\bbloody fool\b",
    # --- Telugu-English compound insults: [insult word] + gadu/fellow/vaadu ---
    r"\b(?:buffalo|monkey|mental|psycho|cheap|nasty|dirty|useless|worst|scoundrel)"
    r"\s+(?:gadu|fellow|vaadu|ra)\b",
    r"\b(?:rascal|buffoon|loafer|fraud|basthi|chapri|local|rowdy|420|kothi|waste)"
    r"\s+(?:gadu|fellow|vaadu|ra)\b",
    r"\b(?:third\s+class|low\s+class|third-class|low-class)\s+(?:gadu|fellow|vaadu)\b",
    r"\b(?:buffalo|monkey|mental|psycho|cheap|nasty|dirty|useless|worst|scoundrel|rascal|buffoon|loafer|fraud)\s+fellow\b",
    # --- Telugu standalone insult suffixes ---
    r"\bkothi\s+vedhava\b",
]
INSULT_PATTERN = re.compile("|".join(INSULT_KEYWORDS), re.IGNORECASE | re.UNICODE)

def contains_insult_keyword(text: str) -> bool:
    """Check if text contains known insult/threat patterns."""
    return bool(INSULT_PATTERN.search(text))

# Load Custom Telugu-English Bad Words (Secure)
import base64
import os

try:
    secure_file_path = "data/secure_words.bin"
    if os.path.exists(secure_file_path):
        with open(secure_file_path, "rb") as f:
            encoded_data = f.read()
            decoded_data = base64.b64decode(encoded_data).decode("utf-8")
            custom_words = [line.strip() for line in decoded_data.splitlines() if line.strip()]
            profanity.add_censor_words(custom_words)
        print(f"Loaded {len(custom_words)} custom bad words from secure storage.")
    else:
        print("Warning: Secure bad words file not found.")
except Exception as e:
    print(f"Warning: Could not load custom bad words: {e}")

# Load Offensive Emojis
offensive_emojis = set()
try:
    emoji_file_path = "data/bad_emojis.txt"
    if os.path.exists(emoji_file_path):
        with open(emoji_file_path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith("#"):
                    offensive_emojis.add(line)
        print(f"Loaded {len(offensive_emojis)} offensive emojis.")
    else:
        print("Warning: Offensive emojis file not found.")
except Exception as e:
    print(f"Warning: Could not load offensive emojis: {e}")

def contains_offensive_emoji(text: str) -> bool:
    """Check if text contains any offensive emojis"""
    for emoji in offensive_emojis:
        if emoji in text:
            return True
    return False


app = FastAPI(title="AI Comment Moderation API")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize the toxicity classification pipeline
# We use 'original' to keep the original distilbert-base-uncased-finetuned-sst-2-english if we wanted simple sentiment
# However, for toxicity detection in Telugu-English code-mixed content, MuRIL (Multilingual 
# Representations for Indian Languages) BERT is preferred over standard DistilBERT or toxic-bert.
# MuRIL is specifically trained on Indian languages and handles code-switching much better.
# Current production model: google/muril-base-cased (fine-tuned)
import torch

# Optimizatons to prevent PyTorch from lagging the entire OS when running on CPU
try:
    if torch.cuda.is_available():
        device = 0 # Use GPU
        print("βœ“ CUDA GPU detected, running models on GPU for faster inference.")
    else:
        device = -1 # Use CPU
        torch.set_num_threads(config.get("cpu_threads", 4)) # Limit to 4 threads rather than maxing out CPU
        print(f"βœ“ CPU detected, limited PyTorch to {torch.get_num_threads()} threads to prevent system lag.")
except Exception as e:
    device = -1
    pass

try:
    # Use fine-tuned model if available (produced by train_model.py)
    fine_tuned_path = os.path.join(os.path.dirname(__file__), "model_output")
    if os.path.exists(fine_tuned_path) and os.path.exists(os.path.join(fine_tuned_path, "config.json")):
        print(f"βœ“ Loading fine-tuned model from: {fine_tuned_path}")
        classifier = pipeline("text-classification", model=fine_tuned_path, top_k=None, device=device)
    else:
        print("Loading default model: google/muril-base-cased (Fallback)")
        print("Note: MuRIL is highly recommended for Telugu-English code-mixed content.")
        classifier = pipeline("text-classification", model="google/muril-base-cased", top_k=None, device=device)
except Exception as e:
    print(f"Error loading model: {e}")
    classifier = None


class CommentRequest(BaseModel):
    text: str
    strictness: str = "high" # "high" (Celeb) or "low" (Friend)

class Score(BaseModel):
    label: str
    score: float

class AnalysisResponse(BaseModel):
    text: str
    results: List[Score]
    is_toxic: bool

@app.get("/")
def read_root():
    return {"message": "AI Comment Moderation API is running"}

@app.post("/analyze", response_model=AnalysisResponse)
def analyze_comment(request: CommentRequest):
    text = request.text.strip()
    if not text:
        raise HTTPException(status_code=400, detail="Text cannot be empty")

    # 1. Strict "Bad Word" Check (Rule-based)
    # MILD_WORDS_WHITELIST is already removed from the profanity library's censor list,
    # so only genuine profanity (slurs, explicit words) will be flagged here.
    if profanity.contains_profanity(text):
        # Extra safety: remove any remaining multi-word safe phrases and re-check using PRECOMPILED regex
        cleaned_text = text.lower()
        for pattern in PROFANITY_WHITELIST_PATTERNS.values():
            cleaned_text = pattern.sub("", cleaned_text)
            
        if profanity.contains_profanity(cleaned_text):
            return AnalysisResponse(
                text=request.text,
                results=[Score(label="profanity_strict", score=1.0)],
                is_toxic=True
            )
        # Only multi-word mild phrase triggered it β€” continue to deeper checks

    # 1b. Keyword-based insult/threat detector (catches ML model blind spots)
    if contains_insult_keyword(text):
        return AnalysisResponse(
            text=request.text,
            results=[Score(label="insult_keyword", score=1.0)],
            is_toxic=True
        )

    # 2. Offensive Emoji Check
    if contains_offensive_emoji(text):
        return AnalysisResponse(
            text=request.text,
            results=[Score(label="offensive_emoji", score=1.0)],
            is_toxic=True
        )


    # 2. Short Text Heuristic
    if len(text) < 5:
        return AnalysisResponse(
            text=request.text,
            results=[],
            is_toxic=False
        )
    
    # 3. ML Model Check (Context-based)
    if not classifier:
         print("Classifier not loaded, skipping ML check.")
         return AnalysisResponse(text=request.text, results=[], is_toxic=False)

    results = classifier(text)
    scores = results[0]
    
    is_toxic = False
    formatted_scores = []
    
    # Define Threshold based on Strictness
    # High (Celeb) = 0.4 (Strict)
    # Low (Friend) = 0.7 (Balanced)
    threshold = 0.4 if request.strictness == "high" else 0.7

    # Labels that indicate toxicity. Ignores 'LABEL_0', 'non-toxic', 'neutral', etc.
    TOXIC_LABELS = {"toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate", "LABEL_1"}

    for item in scores:
        label = item['label']
        score = item['score']
        formatted_scores.append(Score(label=label, score=score))
        
        # Only mark as toxic if the label is in our toxic set AND exceeds threshold
        if label in TOXIC_LABELS and score > threshold: 
            is_toxic = True
            
    return AnalysisResponse(
        text=request.text,
        results=formatted_scores,
        is_toxic=is_toxic
    )

@app.post("/submit")
def submit_comment(request: CommentRequest):
    # This is a mock endpoint. In a real app, this would save to DB.
    # We re-check toxicity here to prevent bypassing frontend
    if not classifier:
         raise HTTPException(status_code=500, detail="Model not loaded")
         
    results = classifier(request.text)[0]
    is_toxic = any(item['score'] > 0.5 for item in results)
    
    if is_toxic:
        raise HTTPException(status_code=400, detail="Comment rejected due to toxicity.")
        
    return {"message": "Comment posted successfully", "text": request.text}

if __name__ == "__main__":
    import uvicorn
    import os

    # Check for SSL certificates in data directory or root
    key_file = "data/key.pem" if os.path.exists("data/key.pem") else "key.pem"
    cert_file = "data/cert.pem" if os.path.exists("data/cert.pem") else "cert.pem"

    if os.path.exists(key_file) and os.path.exists(cert_file):
        print(f"Starting server with SSL/HTTPS enabled using {cert_file} and {key_file}...")
        uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, ssl_keyfile=key_file, ssl_certfile=cert_file)
    else:
        print("SSL certificates not found. Starting server in HTTP mode.")
        uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)