Trouter-Library commited on
Commit
4290200
·
verified ·
1 Parent(s): ce03ed3

Create content_moderation.py

Browse files
Files changed (1) hide show
  1. content_moderation.py +494 -0
content_moderation.py ADDED
@@ -0,0 +1,494 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Advanced Content Moderation System for Helion-V2
3
+ Provides production-grade content filtering and safety checks.
4
+ """
5
+
6
+ import re
7
+ import json
8
+ from typing import List, Dict, Tuple, Optional, Set
9
+ from dataclasses import dataclass, asdict
10
+ from datetime import datetime
11
+ import logging
12
+
13
+
14
+ # Configure logging
15
+ logging.basicConfig(level=logging.INFO)
16
+ logger = logging.getLogger(__name__)
17
+
18
+
19
+ @dataclass
20
+ class ModerationResult:
21
+ """Detailed moderation result."""
22
+ timestamp: str
23
+ is_approved: bool
24
+ risk_level: str # low, medium, high, critical
25
+ violations: List[str]
26
+ confidence_scores: Dict[str, float]
27
+ recommended_action: str
28
+ sanitized_content: Optional[str] = None
29
+ metadata: Optional[Dict] = None
30
+
31
+
32
+ class ContentFilter:
33
+ """Multi-layer content filtering system."""
34
+
35
+ def __init__(self, config_path: Optional[str] = None):
36
+ """
37
+ Initialize content filter with optional custom configuration.
38
+
39
+ Args:
40
+ config_path: Path to custom filter configuration JSON
41
+ """
42
+ self.config = self._load_config(config_path)
43
+ self._initialize_filters()
44
+
45
+ def _load_config(self, config_path: Optional[str]) -> Dict:
46
+ """Load filter configuration."""
47
+ default_config = {
48
+ "enable_profanity_filter": True,
49
+ "enable_toxicity_detection": True,
50
+ "enable_bias_detection": True,
51
+ "enable_pii_detection": True,
52
+ "enable_spam_detection": True,
53
+ "strictness_level": "medium", # low, medium, high
54
+ "blocked_domains": ["example-spam.com"],
55
+ "allowed_code_patterns": True,
56
+ "max_repetition_ratio": 0.3
57
+ }
58
+
59
+ if config_path:
60
+ try:
61
+ with open(config_path, 'r') as f:
62
+ custom_config = json.load(f)
63
+ default_config.update(custom_config)
64
+ except Exception as e:
65
+ logger.warning(f"Could not load config from {config_path}: {e}")
66
+
67
+ return default_config
68
+
69
+ def _initialize_filters(self):
70
+ """Initialize all filter components."""
71
+
72
+ # Profanity and offensive language
73
+ self.profanity_list = self._load_profanity_list()
74
+
75
+ # Toxic phrases
76
+ self.toxic_phrases = [
77
+ "you should kill yourself",
78
+ "i hope you die",
79
+ "you deserve to suffer",
80
+ "stupid idiot moron",
81
+ "worthless piece of",
82
+ ]
83
+
84
+ # Bias indicators
85
+ self.bias_indicators = {
86
+ "gender": ["all women are", "all men are", "females are", "males are"],
87
+ "race": ["all [race] are", "typical [race]", "[race] people always"],
88
+ "religion": ["all [religion] are", "[religion] believers are"],
89
+ "age": ["all old people", "millennials are all", "boomers are"],
90
+ }
91
+
92
+ # Spam patterns
93
+ self.spam_patterns = [
94
+ r'(?i)(buy now|click here|limited time|act now).{0,50}(http|www)',
95
+ r'(?i)(viagra|cialis|lottery|prince|inheritance)',
96
+ r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+',
97
+ ]
98
+
99
+ # Dangerous instruction patterns
100
+ self.dangerous_instructions = [
101
+ r'(?i)how\s+to\s+(make|build|create|construct)\s+(bomb|explosive|poison|weapon)',
102
+ r'(?i)instructions?\s+(for|to)\s+(kill|murder|harm|torture)',
103
+ r'(?i)(recipe|guide|tutorial)\s+for\s+(meth|cocaine|heroin)',
104
+ r'(?i)how\s+to\s+(hack|crack|break\s+into|bypass)',
105
+ ]
106
+
107
+ # Medical misinformation
108
+ self.medical_misinfo = [
109
+ r'(?i)(cancer|covid|hiv).+(cure|treat|prevent).+(bleach|hydrogen\s+peroxide|vitamin\s+c)',
110
+ r'(?i)vaccines?\s+(cause|lead\s+to|result\s+in)\s+(autism|death|infertility)',
111
+ r'(?i)essential\s+oils?\s+(cure|treat)\s+(cancer|diabetes|heart\s+disease)',
112
+ ]
113
+
114
+ def _load_profanity_list(self) -> Set[str]:
115
+ """Load profanity word list."""
116
+ # Basic profanity list (expand as needed)
117
+ return {
118
+ 'fuck', 'shit', 'bitch', 'asshole', 'bastard', 'damn',
119
+ 'cunt', 'piss', 'cock', 'dick', 'pussy', 'slut', 'whore'
120
+ }
121
+
122
+ def check_profanity(self, text: str) -> Tuple[bool, List[str]]:
123
+ """
124
+ Check for profanity in text.
125
+
126
+ Args:
127
+ text: Text to check
128
+
129
+ Returns:
130
+ Tuple of (has_profanity, list of found words)
131
+ """
132
+ if not self.config["enable_profanity_filter"]:
133
+ return False, []
134
+
135
+ text_lower = text.lower()
136
+ words = re.findall(r'\b\w+\b', text_lower)
137
+ found_profanity = [word for word in words if word in self.profanity_list]
138
+
139
+ return len(found_profanity) > 0, found_profanity
140
+
141
+ def check_toxicity(self, text: str) -> Tuple[bool, float, List[str]]:
142
+ """
143
+ Check for toxic content.
144
+
145
+ Args:
146
+ text: Text to check
147
+
148
+ Returns:
149
+ Tuple of (is_toxic, toxicity_score, matched_phrases)
150
+ """
151
+ if not self.config["enable_toxicity_detection"]:
152
+ return False, 0.0, []
153
+
154
+ text_lower = text.lower()
155
+ matched_phrases = []
156
+ toxicity_score = 0.0
157
+
158
+ for phrase in self.toxic_phrases:
159
+ if phrase in text_lower:
160
+ matched_phrases.append(phrase)
161
+ toxicity_score += 0.3
162
+
163
+ # Check for aggressive language patterns
164
+ aggressive_patterns = [
165
+ r'\b(hate|despise|loathe)\s+you\b',
166
+ r'\byou\s+(are|re)\s+(stupid|dumb|idiot|moron)',
167
+ r'\bshut\s+up\b',
168
+ r'\bgo\s+to\s+hell\b',
169
+ ]
170
+
171
+ for pattern in aggressive_patterns:
172
+ if re.search(pattern, text_lower):
173
+ toxicity_score += 0.2
174
+
175
+ is_toxic = toxicity_score > 0.5
176
+ return is_toxic, min(toxicity_score, 1.0), matched_phrases
177
+
178
+ def check_bias(self, text: str) -> Tuple[bool, Dict[str, List[str]]]:
179
+ """
180
+ Check for biased language.
181
+
182
+ Args:
183
+ text: Text to check
184
+
185
+ Returns:
186
+ Tuple of (has_bias, dictionary of bias types and matched phrases)
187
+ """
188
+ if not self.config["enable_bias_detection"]:
189
+ return False, {}
190
+
191
+ text_lower = text.lower()
192
+ bias_found = {}
193
+
194
+ for bias_type, indicators in self.bias_indicators.items():
195
+ matches = []
196
+ for indicator in indicators:
197
+ # Simple pattern matching (can be enhanced with ML)
198
+ if indicator in text_lower:
199
+ matches.append(indicator)
200
+
201
+ if matches:
202
+ bias_found[bias_type] = matches
203
+
204
+ return len(bias_found) > 0, bias_found
205
+
206
+ def check_pii(self, text: str) -> Tuple[bool, Dict[str, List[str]]]:
207
+ """
208
+ Check for personally identifiable information.
209
+
210
+ Args:
211
+ text: Text to check
212
+
213
+ Returns:
214
+ Tuple of (has_pii, dictionary of PII types found)
215
+ """
216
+ if not self.config["enable_pii_detection"]:
217
+ return False, {}
218
+
219
+ pii_found = {}
220
+
221
+ # Social Security Number
222
+ ssn_pattern = r'\b\d{3}-\d{2}-\d{4}\b'
223
+ ssns = re.findall(ssn_pattern, text)
224
+ if ssns:
225
+ pii_found['ssn'] = ssns
226
+
227
+ # Credit Card
228
+ cc_pattern = r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'
229
+ ccs = re.findall(cc_pattern, text)
230
+ if ccs:
231
+ pii_found['credit_card'] = ccs
232
+
233
+ # Email
234
+ email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
235
+ emails = re.findall(email_pattern, text)
236
+ if emails:
237
+ pii_found['email'] = emails
238
+
239
+ # Phone
240
+ phone_pattern = r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'
241
+ phones = re.findall(phone_pattern, text)
242
+ if phones:
243
+ pii_found['phone'] = phones
244
+
245
+ # Address (basic)
246
+ address_pattern = r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd)\b'
247
+ addresses = re.findall(address_pattern, text, re.IGNORECASE)
248
+ if addresses:
249
+ pii_found['address'] = addresses
250
+
251
+ return len(pii_found) > 0, pii_found
252
+
253
+ def check_spam(self, text: str) -> Tuple[bool, List[str]]:
254
+ """
255
+ Check for spam content.
256
+
257
+ Args:
258
+ text: Text to check
259
+
260
+ Returns:
261
+ Tuple of (is_spam, list of matched patterns)
262
+ """
263
+ if not self.config["enable_spam_detection"]:
264
+ return False, []
265
+
266
+ matched_patterns = []
267
+
268
+ for pattern in self.spam_patterns:
269
+ if re.search(pattern, text):
270
+ matched_patterns.append(pattern)
271
+
272
+ # Check for blocked domains
273
+ for domain in self.config["blocked_domains"]:
274
+ if domain in text.lower():
275
+ matched_patterns.append(f"Blocked domain: {domain}")
276
+
277
+ return len(matched_patterns) > 0, matched_patterns
278
+
279
+ def check_dangerous_content(self, text: str) -> Tuple[bool, List[str]]:
280
+ """
281
+ Check for dangerous instructions or content.
282
+
283
+ Args:
284
+ text: Text to check
285
+
286
+ Returns:
287
+ Tuple of (is_dangerous, list of matched categories)
288
+ """
289
+ text_lower = text.lower()
290
+ dangerous_categories = []
291
+
292
+ # Check dangerous instructions
293
+ for pattern in self.dangerous_instructions:
294
+ if re.search(pattern, text_lower):
295
+ dangerous_categories.append("dangerous_instructions")
296
+ break
297
+
298
+ # Check medical misinformation
299
+ for pattern in self.medical_misinfo:
300
+ if re.search(pattern, text_lower):
301
+ dangerous_categories.append("medical_misinformation")
302
+ break
303
+
304
+ return len(dangerous_categories) > 0, dangerous_categories
305
+
306
+ def check_repetition(self, text: str) -> Tuple[bool, float]:
307
+ """
308
+ Check for excessive repetition (potential spam or model failure).
309
+
310
+ Args:
311
+ text: Text to check
312
+
313
+ Returns:
314
+ Tuple of (is_repetitive, repetition_ratio)
315
+ """
316
+ words = text.split()
317
+ if len(words) < 10:
318
+ return False, 0.0
319
+
320
+ unique_words = len(set(words))
321
+ total_words = len(words)
322
+ repetition_ratio = 1.0 - (unique_words / total_words)
323
+
324
+ is_repetitive = repetition_ratio > self.config["max_repetition_ratio"]
325
+ return is_repetitive, repetition_ratio
326
+
327
+ def moderate_content(self, text: str, context: str = "general") -> ModerationResult:
328
+ """
329
+ Perform comprehensive content moderation.
330
+
331
+ Args:
332
+ text: Text to moderate
333
+ context: Context of the content (general, chat, code, etc.)
334
+
335
+ Returns:
336
+ ModerationResult with detailed analysis
337
+ """
338
+ violations = []
339
+ confidence_scores = {}
340
+ risk_level = "low"
341
+
342
+ # Run all checks
343
+ has_profanity, profanity_words = self.check_profanity(text)
344
+ if has_profanity:
345
+ violations.append(f"Profanity detected: {len(profanity_words)} words")
346
+ confidence_scores["profanity"] = 0.9
347
+ risk_level = "medium"
348
+
349
+ is_toxic, toxicity_score, toxic_phrases = self.check_toxicity(text)
350
+ if is_toxic:
351
+ violations.append(f"Toxic content detected (score: {toxicity_score:.2f})")
352
+ confidence_scores["toxicity"] = toxicity_score
353
+ risk_level = "high"
354
+
355
+ has_bias, bias_types = self.check_bias(text)
356
+ if has_bias:
357
+ violations.append(f"Potential bias detected: {', '.join(bias_types.keys())}")
358
+ confidence_scores["bias"] = 0.7
359
+ if risk_level == "low":
360
+ risk_level = "medium"
361
+
362
+ has_pii, pii_types = self.check_pii(text)
363
+ if has_pii:
364
+ violations.append(f"PII detected: {', '.join(pii_types.keys())}")
365
+ confidence_scores["pii"] = 1.0
366
+ risk_level = "high"
367
+
368
+ is_spam, spam_patterns = self.check_spam(text)
369
+ if is_spam:
370
+ violations.append(f"Spam indicators: {len(spam_patterns)}")
371
+ confidence_scores["spam"] = 0.8
372
+ if risk_level == "low":
373
+ risk_level = "medium"
374
+
375
+ is_dangerous, dangerous_categories = self.check_dangerous_content(text)
376
+ if is_dangerous:
377
+ violations.append(f"Dangerous content: {', '.join(dangerous_categories)}")
378
+ confidence_scores["dangerous"] = 0.95
379
+ risk_level = "critical"
380
+
381
+ is_repetitive, repetition_ratio = self.check_repetition(text)
382
+ if is_repetitive:
383
+ violations.append(f"Excessive repetition ({repetition_ratio:.2%})")
384
+ confidence_scores["repetition"] = repetition_ratio
385
+
386
+ # Determine approval and recommended action
387
+ is_approved = len(violations) == 0 or (risk_level == "low" and not is_dangerous)
388
+
389
+ if risk_level == "critical":
390
+ recommended_action = "block"
391
+ elif risk_level == "high":
392
+ recommended_action = "review"
393
+ elif risk_level == "medium":
394
+ recommended_action = "flag"
395
+ else:
396
+ recommended_action = "approve"
397
+
398
+ # Sanitize if needed
399
+ sanitized_content = None
400
+ if has_pii:
401
+ sanitized_content = self._sanitize_pii(text)
402
+
403
+ return ModerationResult(
404
+ timestamp=datetime.now().isoformat(),
405
+ is_approved=is_approved,
406
+ risk_level=risk_level,
407
+ violations=violations,
408
+ confidence_scores=confidence_scores,
409
+ recommended_action=recommended_action,
410
+ sanitized_content=sanitized_content,
411
+ metadata={
412
+ "text_length": len(text),
413
+ "word_count": len(text.split()),
414
+ "context": context
415
+ }
416
+ )
417
+
418
+ def _sanitize_pii(self, text: str) -> str:
419
+ """Sanitize text by removing/redacting PII."""
420
+ sanitized = text
421
+
422
+ # Redact SSN
423
+ sanitized = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN-REDACTED]', sanitized)
424
+
425
+ # Redact credit cards
426
+ sanitized = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CC-REDACTED]', sanitized)
427
+
428
+ # Redact emails
429
+ sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL-REDACTED]', sanitized)
430
+
431
+ # Redact phones
432
+ sanitized = re.sub(r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b', '[PHONE-REDACTED]', sanitized)
433
+
434
+ return sanitized
435
+
436
+ def batch_moderate(self, texts: List[str]) -> List[ModerationResult]:
437
+ """
438
+ Moderate multiple texts in batch.
439
+
440
+ Args:
441
+ texts: List of texts to moderate
442
+
443
+ Returns:
444
+ List of ModerationResults
445
+ """
446
+ return [self.moderate_content(text) for text in texts]
447
+
448
+ def export_results(self, results: List[ModerationResult], filepath: str):
449
+ """
450
+ Export moderation results to JSON file.
451
+
452
+ Args:
453
+ results: List of ModerationResults
454
+ filepath: Output file path
455
+ """
456
+ with open(filepath, 'w') as f:
457
+ json.dump([asdict(r) for r in results], f, indent=2)
458
+
459
+ logger.info(f"Exported {len(results)} moderation results to {filepath}")
460
+
461
+
462
+ # Example usage
463
+ if __name__ == "__main__":
464
+ # Initialize filter
465
+ filter_system = ContentFilter()
466
+
467
+ # Test cases
468
+ test_texts = [
469
+ "What is the capital of France?", # Safe
470
+ "You are a stupid idiot!", # Toxic
471
+ "My SSN is 123-45-6789", # PII
472
+ "Buy now! Limited time offer! www.spam.com", # Spam
473
+ "How to make a bomb at home", # Dangerous
474
+ ]
475
+
476
+ print("Content Moderation Results:\n")
477
+ print("=" * 80)
478
+
479
+ for i, text in enumerate(test_texts, 1):
480
+ result = filter_system.moderate_content(text)
481
+
482
+ print(f"\nTest {i}: {text[:50]}...")
483
+ print(f"Approved: {result.is_approved}")
484
+ print(f"Risk Level: {result.risk_level}")
485
+ print(f"Violations: {result.violations}")
486
+ print(f"Recommended Action: {result.recommended_action}")
487
+ if result.sanitized_content:
488
+ print(f"Sanitized: {result.sanitized_content[:100]}...")
489
+ print("-" * 80)
490
+
491
+ # Batch processing example
492
+ results = filter_system.batch_moderate(test_texts)
493
+ filter_system.export_results(results, "moderation_results.json")
494
+ print(f"\n✓ Exported {len(results)} results to moderation_results.json")