Helion-V2 / content_moderation.py
Trouter-Library's picture
Create content_moderation.py
4290200 verified
"""
Advanced Content Moderation System for Helion-V2
Provides production-grade content filtering and safety checks.
"""
import re
import json
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass, asdict
from datetime import datetime
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ModerationResult:
"""Detailed moderation result."""
timestamp: str
is_approved: bool
risk_level: str # low, medium, high, critical
violations: List[str]
confidence_scores: Dict[str, float]
recommended_action: str
sanitized_content: Optional[str] = None
metadata: Optional[Dict] = None
class ContentFilter:
"""Multi-layer content filtering system."""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize content filter with optional custom configuration.
Args:
config_path: Path to custom filter configuration JSON
"""
self.config = self._load_config(config_path)
self._initialize_filters()
def _load_config(self, config_path: Optional[str]) -> Dict:
"""Load filter configuration."""
default_config = {
"enable_profanity_filter": True,
"enable_toxicity_detection": True,
"enable_bias_detection": True,
"enable_pii_detection": True,
"enable_spam_detection": True,
"strictness_level": "medium", # low, medium, high
"blocked_domains": ["example-spam.com"],
"allowed_code_patterns": True,
"max_repetition_ratio": 0.3
}
if config_path:
try:
with open(config_path, 'r') as f:
custom_config = json.load(f)
default_config.update(custom_config)
except Exception as e:
logger.warning(f"Could not load config from {config_path}: {e}")
return default_config
def _initialize_filters(self):
"""Initialize all filter components."""
# Profanity and offensive language
self.profanity_list = self._load_profanity_list()
# Toxic phrases
self.toxic_phrases = [
"you should kill yourself",
"i hope you die",
"you deserve to suffer",
"stupid idiot moron",
"worthless piece of",
]
# Bias indicators
self.bias_indicators = {
"gender": ["all women are", "all men are", "females are", "males are"],
"race": ["all [race] are", "typical [race]", "[race] people always"],
"religion": ["all [religion] are", "[religion] believers are"],
"age": ["all old people", "millennials are all", "boomers are"],
}
# Spam patterns
self.spam_patterns = [
r'(?i)(buy now|click here|limited time|act now).{0,50}(http|www)',
r'(?i)(viagra|cialis|lottery|prince|inheritance)',
r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+',
]
# Dangerous instruction patterns
self.dangerous_instructions = [
r'(?i)how\s+to\s+(make|build|create|construct)\s+(bomb|explosive|poison|weapon)',
r'(?i)instructions?\s+(for|to)\s+(kill|murder|harm|torture)',
r'(?i)(recipe|guide|tutorial)\s+for\s+(meth|cocaine|heroin)',
r'(?i)how\s+to\s+(hack|crack|break\s+into|bypass)',
]
# Medical misinformation
self.medical_misinfo = [
r'(?i)(cancer|covid|hiv).+(cure|treat|prevent).+(bleach|hydrogen\s+peroxide|vitamin\s+c)',
r'(?i)vaccines?\s+(cause|lead\s+to|result\s+in)\s+(autism|death|infertility)',
r'(?i)essential\s+oils?\s+(cure|treat)\s+(cancer|diabetes|heart\s+disease)',
]
def _load_profanity_list(self) -> Set[str]:
"""Load profanity word list."""
# Basic profanity list (expand as needed)
return {
'fuck', 'shit', 'bitch', 'asshole', 'bastard', 'damn',
'cunt', 'piss', 'cock', 'dick', 'pussy', 'slut', 'whore'
}
def check_profanity(self, text: str) -> Tuple[bool, List[str]]:
"""
Check for profanity in text.
Args:
text: Text to check
Returns:
Tuple of (has_profanity, list of found words)
"""
if not self.config["enable_profanity_filter"]:
return False, []
text_lower = text.lower()
words = re.findall(r'\b\w+\b', text_lower)
found_profanity = [word for word in words if word in self.profanity_list]
return len(found_profanity) > 0, found_profanity
def check_toxicity(self, text: str) -> Tuple[bool, float, List[str]]:
"""
Check for toxic content.
Args:
text: Text to check
Returns:
Tuple of (is_toxic, toxicity_score, matched_phrases)
"""
if not self.config["enable_toxicity_detection"]:
return False, 0.0, []
text_lower = text.lower()
matched_phrases = []
toxicity_score = 0.0
for phrase in self.toxic_phrases:
if phrase in text_lower:
matched_phrases.append(phrase)
toxicity_score += 0.3
# Check for aggressive language patterns
aggressive_patterns = [
r'\b(hate|despise|loathe)\s+you\b',
r'\byou\s+(are|re)\s+(stupid|dumb|idiot|moron)',
r'\bshut\s+up\b',
r'\bgo\s+to\s+hell\b',
]
for pattern in aggressive_patterns:
if re.search(pattern, text_lower):
toxicity_score += 0.2
is_toxic = toxicity_score > 0.5
return is_toxic, min(toxicity_score, 1.0), matched_phrases
def check_bias(self, text: str) -> Tuple[bool, Dict[str, List[str]]]:
"""
Check for biased language.
Args:
text: Text to check
Returns:
Tuple of (has_bias, dictionary of bias types and matched phrases)
"""
if not self.config["enable_bias_detection"]:
return False, {}
text_lower = text.lower()
bias_found = {}
for bias_type, indicators in self.bias_indicators.items():
matches = []
for indicator in indicators:
# Simple pattern matching (can be enhanced with ML)
if indicator in text_lower:
matches.append(indicator)
if matches:
bias_found[bias_type] = matches
return len(bias_found) > 0, bias_found
def check_pii(self, text: str) -> Tuple[bool, Dict[str, List[str]]]:
"""
Check for personally identifiable information.
Args:
text: Text to check
Returns:
Tuple of (has_pii, dictionary of PII types found)
"""
if not self.config["enable_pii_detection"]:
return False, {}
pii_found = {}
# Social Security Number
ssn_pattern = r'\b\d{3}-\d{2}-\d{4}\b'
ssns = re.findall(ssn_pattern, text)
if ssns:
pii_found['ssn'] = ssns
# Credit Card
cc_pattern = r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'
ccs = re.findall(cc_pattern, text)
if ccs:
pii_found['credit_card'] = ccs
# Email
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, text)
if emails:
pii_found['email'] = emails
# Phone
phone_pattern = r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'
phones = re.findall(phone_pattern, text)
if phones:
pii_found['phone'] = phones
# Address (basic)
address_pattern = r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd)\b'
addresses = re.findall(address_pattern, text, re.IGNORECASE)
if addresses:
pii_found['address'] = addresses
return len(pii_found) > 0, pii_found
def check_spam(self, text: str) -> Tuple[bool, List[str]]:
"""
Check for spam content.
Args:
text: Text to check
Returns:
Tuple of (is_spam, list of matched patterns)
"""
if not self.config["enable_spam_detection"]:
return False, []
matched_patterns = []
for pattern in self.spam_patterns:
if re.search(pattern, text):
matched_patterns.append(pattern)
# Check for blocked domains
for domain in self.config["blocked_domains"]:
if domain in text.lower():
matched_patterns.append(f"Blocked domain: {domain}")
return len(matched_patterns) > 0, matched_patterns
def check_dangerous_content(self, text: str) -> Tuple[bool, List[str]]:
"""
Check for dangerous instructions or content.
Args:
text: Text to check
Returns:
Tuple of (is_dangerous, list of matched categories)
"""
text_lower = text.lower()
dangerous_categories = []
# Check dangerous instructions
for pattern in self.dangerous_instructions:
if re.search(pattern, text_lower):
dangerous_categories.append("dangerous_instructions")
break
# Check medical misinformation
for pattern in self.medical_misinfo:
if re.search(pattern, text_lower):
dangerous_categories.append("medical_misinformation")
break
return len(dangerous_categories) > 0, dangerous_categories
def check_repetition(self, text: str) -> Tuple[bool, float]:
"""
Check for excessive repetition (potential spam or model failure).
Args:
text: Text to check
Returns:
Tuple of (is_repetitive, repetition_ratio)
"""
words = text.split()
if len(words) < 10:
return False, 0.0
unique_words = len(set(words))
total_words = len(words)
repetition_ratio = 1.0 - (unique_words / total_words)
is_repetitive = repetition_ratio > self.config["max_repetition_ratio"]
return is_repetitive, repetition_ratio
def moderate_content(self, text: str, context: str = "general") -> ModerationResult:
"""
Perform comprehensive content moderation.
Args:
text: Text to moderate
context: Context of the content (general, chat, code, etc.)
Returns:
ModerationResult with detailed analysis
"""
violations = []
confidence_scores = {}
risk_level = "low"
# Run all checks
has_profanity, profanity_words = self.check_profanity(text)
if has_profanity:
violations.append(f"Profanity detected: {len(profanity_words)} words")
confidence_scores["profanity"] = 0.9
risk_level = "medium"
is_toxic, toxicity_score, toxic_phrases = self.check_toxicity(text)
if is_toxic:
violations.append(f"Toxic content detected (score: {toxicity_score:.2f})")
confidence_scores["toxicity"] = toxicity_score
risk_level = "high"
has_bias, bias_types = self.check_bias(text)
if has_bias:
violations.append(f"Potential bias detected: {', '.join(bias_types.keys())}")
confidence_scores["bias"] = 0.7
if risk_level == "low":
risk_level = "medium"
has_pii, pii_types = self.check_pii(text)
if has_pii:
violations.append(f"PII detected: {', '.join(pii_types.keys())}")
confidence_scores["pii"] = 1.0
risk_level = "high"
is_spam, spam_patterns = self.check_spam(text)
if is_spam:
violations.append(f"Spam indicators: {len(spam_patterns)}")
confidence_scores["spam"] = 0.8
if risk_level == "low":
risk_level = "medium"
is_dangerous, dangerous_categories = self.check_dangerous_content(text)
if is_dangerous:
violations.append(f"Dangerous content: {', '.join(dangerous_categories)}")
confidence_scores["dangerous"] = 0.95
risk_level = "critical"
is_repetitive, repetition_ratio = self.check_repetition(text)
if is_repetitive:
violations.append(f"Excessive repetition ({repetition_ratio:.2%})")
confidence_scores["repetition"] = repetition_ratio
# Determine approval and recommended action
is_approved = len(violations) == 0 or (risk_level == "low" and not is_dangerous)
if risk_level == "critical":
recommended_action = "block"
elif risk_level == "high":
recommended_action = "review"
elif risk_level == "medium":
recommended_action = "flag"
else:
recommended_action = "approve"
# Sanitize if needed
sanitized_content = None
if has_pii:
sanitized_content = self._sanitize_pii(text)
return ModerationResult(
timestamp=datetime.now().isoformat(),
is_approved=is_approved,
risk_level=risk_level,
violations=violations,
confidence_scores=confidence_scores,
recommended_action=recommended_action,
sanitized_content=sanitized_content,
metadata={
"text_length": len(text),
"word_count": len(text.split()),
"context": context
}
)
def _sanitize_pii(self, text: str) -> str:
"""Sanitize text by removing/redacting PII."""
sanitized = text
# Redact SSN
sanitized = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN-REDACTED]', sanitized)
# Redact credit cards
sanitized = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CC-REDACTED]', sanitized)
# Redact emails
sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL-REDACTED]', sanitized)
# Redact phones
sanitized = re.sub(r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b', '[PHONE-REDACTED]', sanitized)
return sanitized
def batch_moderate(self, texts: List[str]) -> List[ModerationResult]:
"""
Moderate multiple texts in batch.
Args:
texts: List of texts to moderate
Returns:
List of ModerationResults
"""
return [self.moderate_content(text) for text in texts]
def export_results(self, results: List[ModerationResult], filepath: str):
"""
Export moderation results to JSON file.
Args:
results: List of ModerationResults
filepath: Output file path
"""
with open(filepath, 'w') as f:
json.dump([asdict(r) for r in results], f, indent=2)
logger.info(f"Exported {len(results)} moderation results to {filepath}")
# Example usage
if __name__ == "__main__":
# Initialize filter
filter_system = ContentFilter()
# Test cases
test_texts = [
"What is the capital of France?", # Safe
"You are a stupid idiot!", # Toxic
"My SSN is 123-45-6789", # PII
"Buy now! Limited time offer! www.spam.com", # Spam
"How to make a bomb at home", # Dangerous
]
print("Content Moderation Results:\n")
print("=" * 80)
for i, text in enumerate(test_texts, 1):
result = filter_system.moderate_content(text)
print(f"\nTest {i}: {text[:50]}...")
print(f"Approved: {result.is_approved}")
print(f"Risk Level: {result.risk_level}")
print(f"Violations: {result.violations}")
print(f"Recommended Action: {result.recommended_action}")
if result.sanitized_content:
print(f"Sanitized: {result.sanitized_content[:100]}...")
print("-" * 80)
# Batch processing example
results = filter_system.batch_moderate(test_texts)
filter_system.export_results(results, "moderation_results.json")
print(f"\n✓ Exported {len(results)} results to moderation_results.json")