File size: 17,794 Bytes
8ad9255
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
from typing import Dict, Optional
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
import joblib
import os
import re
import torch
from deep_translator import GoogleTranslator

class HateSpeechClassifier:
    def __init__(self):
        base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        models_dir = os.path.join(base_dir, "models", "model_weights", "custom_models")
        
        # Initialize translator
        self.translator = GoogleTranslator(source='bn', target='en')
        
        # Use multiple pretrained models for better accuracy
        self.pretrained_models = {
            "primary": {
                "name": "facebook/roberta-hate-speech-dynabench-r4-target",
                "pipeline": None,
                "weight": 0.6
            },
            "secondary": {
                "name": "cardiffnlp/twitter-roberta-base-hate-latest",
                "pipeline": None,
                "weight": 0.4
            }
        }
        
        # English custom model paths
        self.english_model_path = os.path.join(models_dir, "english_model.pkl")
        self.english_vectorizer_path = os.path.join(models_dir, "english_vectorizer.pkl")
        self.english_model = None
        self.english_vectorizer = None
        self.english_model_loaded = False
        
        # Bengali custom model paths
        self.bengali_model_path = os.path.join(models_dir, "bengali_model.pkl")
        self.bengali_vectorizer_path = os.path.join(models_dir, "bengali_vectorizer.pkl")
        self.bengali_model = None
        self.bengali_vectorizer = None
        self.bengali_model_loaded = False
        
        # Load models
        self._load_custom_models()
        
        # Enhanced hate keywords
        self.hate_keywords = {
            "english": [
                "hate", "kill", "death", "violence", "murder", "attack", "destroy", "eliminate",
                "die", "dead", "shoot", "stab", "burn", "hang", "lynch",
                "terrorist", "racist", "sexist", "discrimination", "discriminate",
                "scheduled caste", "scheduled tribe", "dalit", "lower caste", "untouchable",
                "chamar", "bhangi", "sc/st", "reservation quota",
                "no right to live", "don't deserve", "shouldn't exist", "subhuman",
                "inferior", "worthless", "scum", "vermin", "parasite",
                "should be killed", "must die", "deserve to die", "need to be eliminated",
                "jihadi", "kafir", "infidel", "terrorist religion", "religious extremist",
                "nigger", "chink", "paki", "kike", "faggot", "tranny"
            ],
            "bengali": [
                "শালা", "হালা", "মাগি", "কুত্তা", "হারামি", "চোদ", "বাল",
                "ঘৃণা", "মারো", "মৃত্যু", "সন্ত্রাসী", "বোকা", "মূর্খ",
                "বিদ্বেষ", "ভয়ঙ্কর", "জঘন্য", "হত্যা", "আক্রমণ",
                "দলিত", "নিম্নবর্ণ", "অস্পৃশ্য"
            ]
        }
        
        self.hate_patterns = {
            "english": [
                r"no right to (live|exist|be here|survive)",
                r"(should|must|need to|ought to) (die|be killed|be eliminated|perish)",
                r"don'?t deserve (to live|life|existence|to exist)",
                r"(get rid of|eliminate|exterminate|wipe out) (them|these|those|the)",
                r"(scheduled caste|dalit|lower caste|sc/st).{0,50}(no right|shouldn't|don't deserve)",
                r"(religious|ethnic|caste|racial) (cleansing|purification|genocide)",
                r"(send|throw|kick|drive) (them|back) (out|away|home)",
                r"(all|these) .{0,30} (should die|must be killed|need to go)",
                r"(death to|kill all|eliminate all) .{0,30}",
                r"(inferior|subhuman|less than human|not human)",
            ],
            "bengali": [
                r"বাঁচার অধিকার নেই",
                r"মরে যাওয়া উচিত",
                r"নিশ্চিহ্ন করা উচিত"
            ]
        }
        
        self.offensive_keywords = {
            "english": [
                "damn", "hell", "crap", "suck", "dumb", "loser", "trash", 
                "stupid", "idiot", "moron", "pathetic", "bad", "ugly", 
                "disgusting", "nasty", "filthy", "asshole", "bitch", "bastard"
            ],
            "bengali": ["বাজে", "খারাপ", "নোংরা", "বেকুব"]
        }
    
    def _translate_to_english(self, text: str) -> Optional[str]:
        """Translate Bengali to English using deep-translator"""
        try:
            print(f"🔄 Translating Bengali text to English...")
            
            # deep-translator has a 5000 character limit per request
            max_chars = 4500
            if len(text) > max_chars:
                text_to_translate = text[:max_chars]
                print(f"⚠️  Text truncated to {max_chars} characters for translation")
            else:
                text_to_translate = text
            
            # Translate using Google Translate
            translated_text = self.translator.translate(text_to_translate)
            
            print(f"✓ Translation successful")
            print(f"   Original (Bengali): {text_to_translate[:100]}...")
            print(f"   Translated (English): {translated_text[:100]}...")
            
            return translated_text
        except Exception as e:
            print(f"❌ Translation failed: {e}")
            # Try splitting into smaller chunks if it fails
            try:
                print("🔄 Retrying with smaller chunks...")
                words = text.split()
                chunks = []
                current_chunk = []
                current_length = 0
                
                for word in words:
                    if current_length + len(word) > 1000:  # Smaller chunks
                        if current_chunk:
                            chunks.append(' '.join(current_chunk))
                        current_chunk = [word]
                        current_length = len(word)
                    else:
                        current_chunk.append(word)
                        current_length += len(word) + 1
                
                if current_chunk:
                    chunks.append(' '.join(current_chunk))
                
                translated_chunks = []
                for chunk in chunks[:5]:  # Translate max 5 chunks
                    translated_chunk = self.translator.translate(chunk)
                    translated_chunks.append(translated_chunk)
                
                translated_text = ' '.join(translated_chunks)
                print(f"✓ Translation successful with chunking")
                return translated_text
            except Exception as e2:
                print(f"❌ Translation with chunking also failed: {e2}")
                return None
    
    def _load_custom_models(self):
        """Load language-specific custom models"""
        try:
            if os.path.exists(self.english_model_path) and os.path.exists(self.english_vectorizer_path):
                print("Loading English custom model...")
                self.english_model = joblib.load(self.english_model_path)
                self.english_vectorizer = joblib.load(self.english_vectorizer_path)
                self.english_model_loaded = True
                print("✓ English custom model loaded")
            else:
                print("❌ English custom model not found")
                self.english_model_loaded = False
        except Exception as e:
            print(f"❌ Error loading English model: {e}")
            self.english_model_loaded = False
        
        try:
            if os.path.exists(self.bengali_model_path) and os.path.exists(self.bengali_vectorizer_path):
                print("Loading Bengali custom model...")
                self.bengali_model = joblib.load(self.bengali_model_path)
                self.bengali_vectorizer = joblib.load(self.bengali_vectorizer_path)
                self.bengali_model_loaded = True
                print("✓ Bengali custom model loaded")
            else:
                print("❌ Bengali custom model not found")
                self.bengali_model_loaded = False
        except Exception as e:
            print(f"❌ Error loading Bengali model: {e}")
            self.bengali_model_loaded = False
    
    def _load_pretrained_model(self, model_key: str):
        """Lazy load pretrained model"""
        model_info = self.pretrained_models.get(model_key)
        if not model_info:
            return
        
        if model_info["pipeline"] is None:
            try:
                print(f"Loading {model_key} pretrained model: {model_info['name']}...")
                model_info["pipeline"] = pipeline(
                    "text-classification",
                    model=model_info["name"],
                    device=-1,
                    top_k=None,
                    truncation=True,
                    max_length=512
                )
                print(f"✓ {model_key} pretrained model loaded")
            except Exception as e:
                print(f"❌ Error loading {model_key} pretrained model: {e}")
                model_info["pipeline"] = None
    
    async def classify_with_custom_model(self, text: str, language: str) -> Dict:
        """Classify using language-specific custom model"""
        if language == "english":
            if not self.english_model_loaded:
                return None
            model = self.english_model
            vectorizer = self.english_vectorizer
        elif language == "bengali":
            if not self.bengali_model_loaded:
                return None
            model = self.bengali_model
            vectorizer = self.bengali_vectorizer
        else:
            return None
        
        try:
            X = vectorizer.transform([text])
            prediction = model.predict(X)[0]
            
            if hasattr(model, 'predict_proba'):
                probabilities = model.predict_proba(X)[0]
                confidence = float(max(probabilities))
            else:
                confidence = 0.75
            
            if language == "english":
                if prediction == 0:
                    category = "neutral"
                else:
                    category = "hate_speech"
            else:
                if prediction == 0:
                    category = "neutral"
                elif prediction == 1:
                    category = "offensive"
                else:
                    category = "hate_speech"
            
            return {
                "category": category,
                "confidence": confidence,
                "method": f"custom_model_{language}",
                "raw_prediction": int(prediction)
            }
        except Exception as e:
            print(f"❌ Custom model classification failed: {e}")
            return None
    
    async def classify_with_pretrained_model(self, text: str, language: str = "english") -> Dict:
        """Classify using ensemble of pretrained models with translation support"""
        
        # Translate Bengali text to English
        translated_text = None
        if language == "bengali":
            translated_text = self._translate_to_english(text)
            if not translated_text:
                print("❌ Translation failed, skipping pretrained models")
                return None
            text_to_analyze = translated_text
        else:
            text_to_analyze = text
        
        results = []
        
        # For long texts, analyze first 400 words
        words = text_to_analyze.split()
        if len(words) > 400:
            truncated_text = ' '.join(words[:400])
            print(f"⚠️  Text too long ({len(words)} words), analyzing first 400 words")
        else:
            truncated_text = text_to_analyze
        
        # Try primary model
        self._load_pretrained_model("primary")
        primary = self.pretrained_models["primary"]
        
        if primary["pipeline"] is not None:
            try:
                result = primary["pipeline"](truncated_text)[0]
                
                if isinstance(result, list):
                    result = result[0]
                
                label = result['label'].lower()
                confidence = float(result['score'])
                
                if 'hate' in label and 'not' not in label:
                    category = "hate_speech"
                elif 'not' in label or 'non' in label:
                    category = "neutral"
                else:
                    category = "offensive"
                
                results.append({
                    "category": category,
                    "confidence": confidence,
                    "weight": primary["weight"],
                    "model": "primary",
                    "raw_label": result['label']
                })
                
                print(f"[Primary Model] {result['label']} -> {category} ({confidence:.2%})")
            except Exception as e:
                print(f"❌ Primary model failed: {e}")
        
        # Try secondary model
        self._load_pretrained_model("secondary")
        secondary = self.pretrained_models["secondary"]
        
        if secondary["pipeline"] is not None:
            try:
                result = secondary["pipeline"](truncated_text)[0]
                
                if isinstance(result, list):
                    result = result[0]
                
                label = result['label'].lower()
                confidence = float(result['score'])
                
                if 'hate' in label:
                    category = "hate_speech"
                elif 'offensive' in label:
                    category = "offensive"
                else:
                    category = "neutral"
                
                results.append({
                    "category": category,
                    "confidence": confidence,
                    "weight": secondary["weight"],
                    "model": "secondary",
                    "raw_label": result['label']
                })
                
                print(f"[Secondary Model] {result['label']} -> {category} ({confidence:.2%})")
            except Exception as e:
                print(f"❌ Secondary model failed: {e}")
        
        if not results:
            return None
        
        # Ensemble voting
        category_scores = {}
        for result in results:
            cat = result["category"]
            score = result["confidence"] * result["weight"]
            category_scores[cat] = category_scores.get(cat, 0) + score
        
        final_category = max(category_scores, key=category_scores.get)
        total_weight = sum(r["weight"] for r in results)
        final_confidence = category_scores[final_category] / total_weight
        
        raw_labels = [r["raw_label"] for r in results]
        
        return {
            "category": final_category,
            "confidence": final_confidence,
            "method": "pretrained_ensemble",
            "raw_labels": raw_labels,
            "models_used": [r["model"] for r in results],
            "translated": language == "bengali",
            "translated_text": translated_text[:200] + "..." if translated_text and len(translated_text) > 200 else translated_text
        }
    
    def classify_with_keywords(self, text: str, language: str) -> Dict:
        """Classify using keyword and pattern matching"""
        text_lower = text.lower()
        
        hate_count = sum(1 for keyword in self.hate_keywords.get(language, []) 
                        if keyword.lower() in text_lower)
        offensive_count = sum(1 for keyword in self.offensive_keywords.get(language, []) 
                             if keyword.lower() in text_lower)
        
        pattern_matches = []
        matched_patterns = []
        for pattern in self.hate_patterns.get(language, []):
            match = re.search(pattern, text_lower, re.IGNORECASE)
            if match:
                pattern_matches.append(pattern)
                matched_patterns.append(match.group(0))
        
        if pattern_matches or hate_count > 0:
            category = "hate_speech"
            base_confidence = 0.90 if pattern_matches else 0.7
            confidence = min(base_confidence + (hate_count * 0.03), 0.98)
        elif offensive_count > 0:
            category = "offensive"
            confidence = min(0.6 + (offensive_count * 0.08), 0.88)
        else:
            category = "neutral"
            confidence = 0.7
        
        detected_keywords = []
        for keyword in self.hate_keywords.get(language, []):
            if keyword.lower() in text_lower:
                detected_keywords.append(keyword)
        for keyword in self.offensive_keywords.get(language, []):
            if keyword.lower() in text_lower:
                detected_keywords.append(keyword)
        
        return {
            "category": category,
            "confidence": confidence,
            "method": "keyword_matching",
            "detected_keywords": detected_keywords,
            "hate_count": hate_count,
            "offensive_count": offensive_count,
            "pattern_matches": len(pattern_matches),
            "matched_patterns": matched_patterns[:3]
        }