|
|
""" |
|
|
Advanced Content Moderation System for Helion-V2 |
|
|
Provides production-grade content filtering and safety checks. |
|
|
""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
from typing import List, Dict, Tuple, Optional, Set |
|
|
from dataclasses import dataclass, asdict |
|
|
from datetime import datetime |
|
|
import logging |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ModerationResult: |
|
|
"""Detailed moderation result.""" |
|
|
timestamp: str |
|
|
is_approved: bool |
|
|
risk_level: str |
|
|
violations: List[str] |
|
|
confidence_scores: Dict[str, float] |
|
|
recommended_action: str |
|
|
sanitized_content: Optional[str] = None |
|
|
metadata: Optional[Dict] = None |
|
|
|
|
|
|
|
|
class ContentFilter: |
|
|
"""Multi-layer content filtering system.""" |
|
|
|
|
|
def __init__(self, config_path: Optional[str] = None): |
|
|
""" |
|
|
Initialize content filter with optional custom configuration. |
|
|
|
|
|
Args: |
|
|
config_path: Path to custom filter configuration JSON |
|
|
""" |
|
|
self.config = self._load_config(config_path) |
|
|
self._initialize_filters() |
|
|
|
|
|
def _load_config(self, config_path: Optional[str]) -> Dict: |
|
|
"""Load filter configuration.""" |
|
|
default_config = { |
|
|
"enable_profanity_filter": True, |
|
|
"enable_toxicity_detection": True, |
|
|
"enable_bias_detection": True, |
|
|
"enable_pii_detection": True, |
|
|
"enable_spam_detection": True, |
|
|
"strictness_level": "medium", |
|
|
"blocked_domains": ["example-spam.com"], |
|
|
"allowed_code_patterns": True, |
|
|
"max_repetition_ratio": 0.3 |
|
|
} |
|
|
|
|
|
if config_path: |
|
|
try: |
|
|
with open(config_path, 'r') as f: |
|
|
custom_config = json.load(f) |
|
|
default_config.update(custom_config) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not load config from {config_path}: {e}") |
|
|
|
|
|
return default_config |
|
|
|
|
|
def _initialize_filters(self): |
|
|
"""Initialize all filter components.""" |
|
|
|
|
|
|
|
|
self.profanity_list = self._load_profanity_list() |
|
|
|
|
|
|
|
|
self.toxic_phrases = [ |
|
|
"you should kill yourself", |
|
|
"i hope you die", |
|
|
"you deserve to suffer", |
|
|
"stupid idiot moron", |
|
|
"worthless piece of", |
|
|
] |
|
|
|
|
|
|
|
|
self.bias_indicators = { |
|
|
"gender": ["all women are", "all men are", "females are", "males are"], |
|
|
"race": ["all [race] are", "typical [race]", "[race] people always"], |
|
|
"religion": ["all [religion] are", "[religion] believers are"], |
|
|
"age": ["all old people", "millennials are all", "boomers are"], |
|
|
} |
|
|
|
|
|
|
|
|
self.spam_patterns = [ |
|
|
r'(?i)(buy now|click here|limited time|act now).{0,50}(http|www)', |
|
|
r'(?i)(viagra|cialis|lottery|prince|inheritance)', |
|
|
r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', |
|
|
] |
|
|
|
|
|
|
|
|
self.dangerous_instructions = [ |
|
|
r'(?i)how\s+to\s+(make|build|create|construct)\s+(bomb|explosive|poison|weapon)', |
|
|
r'(?i)instructions?\s+(for|to)\s+(kill|murder|harm|torture)', |
|
|
r'(?i)(recipe|guide|tutorial)\s+for\s+(meth|cocaine|heroin)', |
|
|
r'(?i)how\s+to\s+(hack|crack|break\s+into|bypass)', |
|
|
] |
|
|
|
|
|
|
|
|
self.medical_misinfo = [ |
|
|
r'(?i)(cancer|covid|hiv).+(cure|treat|prevent).+(bleach|hydrogen\s+peroxide|vitamin\s+c)', |
|
|
r'(?i)vaccines?\s+(cause|lead\s+to|result\s+in)\s+(autism|death|infertility)', |
|
|
r'(?i)essential\s+oils?\s+(cure|treat)\s+(cancer|diabetes|heart\s+disease)', |
|
|
] |
|
|
|
|
|
def _load_profanity_list(self) -> Set[str]: |
|
|
"""Load profanity word list.""" |
|
|
|
|
|
return { |
|
|
'fuck', 'shit', 'bitch', 'asshole', 'bastard', 'damn', |
|
|
'cunt', 'piss', 'cock', 'dick', 'pussy', 'slut', 'whore' |
|
|
} |
|
|
|
|
|
def check_profanity(self, text: str) -> Tuple[bool, List[str]]: |
|
|
""" |
|
|
Check for profanity in text. |
|
|
|
|
|
Args: |
|
|
text: Text to check |
|
|
|
|
|
Returns: |
|
|
Tuple of (has_profanity, list of found words) |
|
|
""" |
|
|
if not self.config["enable_profanity_filter"]: |
|
|
return False, [] |
|
|
|
|
|
text_lower = text.lower() |
|
|
words = re.findall(r'\b\w+\b', text_lower) |
|
|
found_profanity = [word for word in words if word in self.profanity_list] |
|
|
|
|
|
return len(found_profanity) > 0, found_profanity |
|
|
|
|
|
def check_toxicity(self, text: str) -> Tuple[bool, float, List[str]]: |
|
|
""" |
|
|
Check for toxic content. |
|
|
|
|
|
Args: |
|
|
text: Text to check |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_toxic, toxicity_score, matched_phrases) |
|
|
""" |
|
|
if not self.config["enable_toxicity_detection"]: |
|
|
return False, 0.0, [] |
|
|
|
|
|
text_lower = text.lower() |
|
|
matched_phrases = [] |
|
|
toxicity_score = 0.0 |
|
|
|
|
|
for phrase in self.toxic_phrases: |
|
|
if phrase in text_lower: |
|
|
matched_phrases.append(phrase) |
|
|
toxicity_score += 0.3 |
|
|
|
|
|
|
|
|
aggressive_patterns = [ |
|
|
r'\b(hate|despise|loathe)\s+you\b', |
|
|
r'\byou\s+(are|re)\s+(stupid|dumb|idiot|moron)', |
|
|
r'\bshut\s+up\b', |
|
|
r'\bgo\s+to\s+hell\b', |
|
|
] |
|
|
|
|
|
for pattern in aggressive_patterns: |
|
|
if re.search(pattern, text_lower): |
|
|
toxicity_score += 0.2 |
|
|
|
|
|
is_toxic = toxicity_score > 0.5 |
|
|
return is_toxic, min(toxicity_score, 1.0), matched_phrases |
|
|
|
|
|
def check_bias(self, text: str) -> Tuple[bool, Dict[str, List[str]]]: |
|
|
""" |
|
|
Check for biased language. |
|
|
|
|
|
Args: |
|
|
text: Text to check |
|
|
|
|
|
Returns: |
|
|
Tuple of (has_bias, dictionary of bias types and matched phrases) |
|
|
""" |
|
|
if not self.config["enable_bias_detection"]: |
|
|
return False, {} |
|
|
|
|
|
text_lower = text.lower() |
|
|
bias_found = {} |
|
|
|
|
|
for bias_type, indicators in self.bias_indicators.items(): |
|
|
matches = [] |
|
|
for indicator in indicators: |
|
|
|
|
|
if indicator in text_lower: |
|
|
matches.append(indicator) |
|
|
|
|
|
if matches: |
|
|
bias_found[bias_type] = matches |
|
|
|
|
|
return len(bias_found) > 0, bias_found |
|
|
|
|
|
def check_pii(self, text: str) -> Tuple[bool, Dict[str, List[str]]]: |
|
|
""" |
|
|
Check for personally identifiable information. |
|
|
|
|
|
Args: |
|
|
text: Text to check |
|
|
|
|
|
Returns: |
|
|
Tuple of (has_pii, dictionary of PII types found) |
|
|
""" |
|
|
if not self.config["enable_pii_detection"]: |
|
|
return False, {} |
|
|
|
|
|
pii_found = {} |
|
|
|
|
|
|
|
|
ssn_pattern = r'\b\d{3}-\d{2}-\d{4}\b' |
|
|
ssns = re.findall(ssn_pattern, text) |
|
|
if ssns: |
|
|
pii_found['ssn'] = ssns |
|
|
|
|
|
|
|
|
cc_pattern = r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b' |
|
|
ccs = re.findall(cc_pattern, text) |
|
|
if ccs: |
|
|
pii_found['credit_card'] = ccs |
|
|
|
|
|
|
|
|
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
|
|
emails = re.findall(email_pattern, text) |
|
|
if emails: |
|
|
pii_found['email'] = emails |
|
|
|
|
|
|
|
|
phone_pattern = r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b' |
|
|
phones = re.findall(phone_pattern, text) |
|
|
if phones: |
|
|
pii_found['phone'] = phones |
|
|
|
|
|
|
|
|
address_pattern = r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd)\b' |
|
|
addresses = re.findall(address_pattern, text, re.IGNORECASE) |
|
|
if addresses: |
|
|
pii_found['address'] = addresses |
|
|
|
|
|
return len(pii_found) > 0, pii_found |
|
|
|
|
|
def check_spam(self, text: str) -> Tuple[bool, List[str]]: |
|
|
""" |
|
|
Check for spam content. |
|
|
|
|
|
Args: |
|
|
text: Text to check |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_spam, list of matched patterns) |
|
|
""" |
|
|
if not self.config["enable_spam_detection"]: |
|
|
return False, [] |
|
|
|
|
|
matched_patterns = [] |
|
|
|
|
|
for pattern in self.spam_patterns: |
|
|
if re.search(pattern, text): |
|
|
matched_patterns.append(pattern) |
|
|
|
|
|
|
|
|
for domain in self.config["blocked_domains"]: |
|
|
if domain in text.lower(): |
|
|
matched_patterns.append(f"Blocked domain: {domain}") |
|
|
|
|
|
return len(matched_patterns) > 0, matched_patterns |
|
|
|
|
|
def check_dangerous_content(self, text: str) -> Tuple[bool, List[str]]: |
|
|
""" |
|
|
Check for dangerous instructions or content. |
|
|
|
|
|
Args: |
|
|
text: Text to check |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_dangerous, list of matched categories) |
|
|
""" |
|
|
text_lower = text.lower() |
|
|
dangerous_categories = [] |
|
|
|
|
|
|
|
|
for pattern in self.dangerous_instructions: |
|
|
if re.search(pattern, text_lower): |
|
|
dangerous_categories.append("dangerous_instructions") |
|
|
break |
|
|
|
|
|
|
|
|
for pattern in self.medical_misinfo: |
|
|
if re.search(pattern, text_lower): |
|
|
dangerous_categories.append("medical_misinformation") |
|
|
break |
|
|
|
|
|
return len(dangerous_categories) > 0, dangerous_categories |
|
|
|
|
|
def check_repetition(self, text: str) -> Tuple[bool, float]: |
|
|
""" |
|
|
Check for excessive repetition (potential spam or model failure). |
|
|
|
|
|
Args: |
|
|
text: Text to check |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_repetitive, repetition_ratio) |
|
|
""" |
|
|
words = text.split() |
|
|
if len(words) < 10: |
|
|
return False, 0.0 |
|
|
|
|
|
unique_words = len(set(words)) |
|
|
total_words = len(words) |
|
|
repetition_ratio = 1.0 - (unique_words / total_words) |
|
|
|
|
|
is_repetitive = repetition_ratio > self.config["max_repetition_ratio"] |
|
|
return is_repetitive, repetition_ratio |
|
|
|
|
|
def moderate_content(self, text: str, context: str = "general") -> ModerationResult: |
|
|
""" |
|
|
Perform comprehensive content moderation. |
|
|
|
|
|
Args: |
|
|
text: Text to moderate |
|
|
context: Context of the content (general, chat, code, etc.) |
|
|
|
|
|
Returns: |
|
|
ModerationResult with detailed analysis |
|
|
""" |
|
|
violations = [] |
|
|
confidence_scores = {} |
|
|
risk_level = "low" |
|
|
|
|
|
|
|
|
has_profanity, profanity_words = self.check_profanity(text) |
|
|
if has_profanity: |
|
|
violations.append(f"Profanity detected: {len(profanity_words)} words") |
|
|
confidence_scores["profanity"] = 0.9 |
|
|
risk_level = "medium" |
|
|
|
|
|
is_toxic, toxicity_score, toxic_phrases = self.check_toxicity(text) |
|
|
if is_toxic: |
|
|
violations.append(f"Toxic content detected (score: {toxicity_score:.2f})") |
|
|
confidence_scores["toxicity"] = toxicity_score |
|
|
risk_level = "high" |
|
|
|
|
|
has_bias, bias_types = self.check_bias(text) |
|
|
if has_bias: |
|
|
violations.append(f"Potential bias detected: {', '.join(bias_types.keys())}") |
|
|
confidence_scores["bias"] = 0.7 |
|
|
if risk_level == "low": |
|
|
risk_level = "medium" |
|
|
|
|
|
has_pii, pii_types = self.check_pii(text) |
|
|
if has_pii: |
|
|
violations.append(f"PII detected: {', '.join(pii_types.keys())}") |
|
|
confidence_scores["pii"] = 1.0 |
|
|
risk_level = "high" |
|
|
|
|
|
is_spam, spam_patterns = self.check_spam(text) |
|
|
if is_spam: |
|
|
violations.append(f"Spam indicators: {len(spam_patterns)}") |
|
|
confidence_scores["spam"] = 0.8 |
|
|
if risk_level == "low": |
|
|
risk_level = "medium" |
|
|
|
|
|
is_dangerous, dangerous_categories = self.check_dangerous_content(text) |
|
|
if is_dangerous: |
|
|
violations.append(f"Dangerous content: {', '.join(dangerous_categories)}") |
|
|
confidence_scores["dangerous"] = 0.95 |
|
|
risk_level = "critical" |
|
|
|
|
|
is_repetitive, repetition_ratio = self.check_repetition(text) |
|
|
if is_repetitive: |
|
|
violations.append(f"Excessive repetition ({repetition_ratio:.2%})") |
|
|
confidence_scores["repetition"] = repetition_ratio |
|
|
|
|
|
|
|
|
is_approved = len(violations) == 0 or (risk_level == "low" and not is_dangerous) |
|
|
|
|
|
if risk_level == "critical": |
|
|
recommended_action = "block" |
|
|
elif risk_level == "high": |
|
|
recommended_action = "review" |
|
|
elif risk_level == "medium": |
|
|
recommended_action = "flag" |
|
|
else: |
|
|
recommended_action = "approve" |
|
|
|
|
|
|
|
|
sanitized_content = None |
|
|
if has_pii: |
|
|
sanitized_content = self._sanitize_pii(text) |
|
|
|
|
|
return ModerationResult( |
|
|
timestamp=datetime.now().isoformat(), |
|
|
is_approved=is_approved, |
|
|
risk_level=risk_level, |
|
|
violations=violations, |
|
|
confidence_scores=confidence_scores, |
|
|
recommended_action=recommended_action, |
|
|
sanitized_content=sanitized_content, |
|
|
metadata={ |
|
|
"text_length": len(text), |
|
|
"word_count": len(text.split()), |
|
|
"context": context |
|
|
} |
|
|
) |
|
|
|
|
|
def _sanitize_pii(self, text: str) -> str: |
|
|
"""Sanitize text by removing/redacting PII.""" |
|
|
sanitized = text |
|
|
|
|
|
|
|
|
sanitized = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN-REDACTED]', sanitized) |
|
|
|
|
|
|
|
|
sanitized = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CC-REDACTED]', sanitized) |
|
|
|
|
|
|
|
|
sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL-REDACTED]', sanitized) |
|
|
|
|
|
|
|
|
sanitized = re.sub(r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b', '[PHONE-REDACTED]', sanitized) |
|
|
|
|
|
return sanitized |
|
|
|
|
|
def batch_moderate(self, texts: List[str]) -> List[ModerationResult]: |
|
|
""" |
|
|
Moderate multiple texts in batch. |
|
|
|
|
|
Args: |
|
|
texts: List of texts to moderate |
|
|
|
|
|
Returns: |
|
|
List of ModerationResults |
|
|
""" |
|
|
return [self.moderate_content(text) for text in texts] |
|
|
|
|
|
def export_results(self, results: List[ModerationResult], filepath: str): |
|
|
""" |
|
|
Export moderation results to JSON file. |
|
|
|
|
|
Args: |
|
|
results: List of ModerationResults |
|
|
filepath: Output file path |
|
|
""" |
|
|
with open(filepath, 'w') as f: |
|
|
json.dump([asdict(r) for r in results], f, indent=2) |
|
|
|
|
|
logger.info(f"Exported {len(results)} moderation results to {filepath}") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
filter_system = ContentFilter() |
|
|
|
|
|
|
|
|
test_texts = [ |
|
|
"What is the capital of France?", |
|
|
"You are a stupid idiot!", |
|
|
"My SSN is 123-45-6789", |
|
|
"Buy now! Limited time offer! www.spam.com", |
|
|
"How to make a bomb at home", |
|
|
] |
|
|
|
|
|
print("Content Moderation Results:\n") |
|
|
print("=" * 80) |
|
|
|
|
|
for i, text in enumerate(test_texts, 1): |
|
|
result = filter_system.moderate_content(text) |
|
|
|
|
|
print(f"\nTest {i}: {text[:50]}...") |
|
|
print(f"Approved: {result.is_approved}") |
|
|
print(f"Risk Level: {result.risk_level}") |
|
|
print(f"Violations: {result.violations}") |
|
|
print(f"Recommended Action: {result.recommended_action}") |
|
|
if result.sanitized_content: |
|
|
print(f"Sanitized: {result.sanitized_content[:100]}...") |
|
|
print("-" * 80) |
|
|
|
|
|
|
|
|
results = filter_system.batch_moderate(test_texts) |
|
|
filter_system.export_results(results, "moderation_results.json") |
|
|
print(f"\n✓ Exported {len(results)} results to moderation_results.json") |