""" Advanced Content Moderation System for Helion-V2 Provides production-grade content filtering and safety checks. """ import re import json from typing import List, Dict, Tuple, Optional, Set from dataclasses import dataclass, asdict from datetime import datetime import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class ModerationResult: """Detailed moderation result.""" timestamp: str is_approved: bool risk_level: str # low, medium, high, critical violations: List[str] confidence_scores: Dict[str, float] recommended_action: str sanitized_content: Optional[str] = None metadata: Optional[Dict] = None class ContentFilter: """Multi-layer content filtering system.""" def __init__(self, config_path: Optional[str] = None): """ Initialize content filter with optional custom configuration. Args: config_path: Path to custom filter configuration JSON """ self.config = self._load_config(config_path) self._initialize_filters() def _load_config(self, config_path: Optional[str]) -> Dict: """Load filter configuration.""" default_config = { "enable_profanity_filter": True, "enable_toxicity_detection": True, "enable_bias_detection": True, "enable_pii_detection": True, "enable_spam_detection": True, "strictness_level": "medium", # low, medium, high "blocked_domains": ["example-spam.com"], "allowed_code_patterns": True, "max_repetition_ratio": 0.3 } if config_path: try: with open(config_path, 'r') as f: custom_config = json.load(f) default_config.update(custom_config) except Exception as e: logger.warning(f"Could not load config from {config_path}: {e}") return default_config def _initialize_filters(self): """Initialize all filter components.""" # Profanity and offensive language self.profanity_list = self._load_profanity_list() # Toxic phrases self.toxic_phrases = [ "you should kill yourself", "i hope you die", "you deserve to suffer", "stupid idiot moron", "worthless piece of", ] # Bias indicators self.bias_indicators = { "gender": ["all women are", "all men are", "females are", "males are"], "race": ["all [race] are", "typical [race]", "[race] people always"], "religion": ["all [religion] are", "[religion] believers are"], "age": ["all old people", "millennials are all", "boomers are"], } # Spam patterns self.spam_patterns = [ r'(?i)(buy now|click here|limited time|act now).{0,50}(http|www)', r'(?i)(viagra|cialis|lottery|prince|inheritance)', r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', ] # Dangerous instruction patterns self.dangerous_instructions = [ r'(?i)how\s+to\s+(make|build|create|construct)\s+(bomb|explosive|poison|weapon)', r'(?i)instructions?\s+(for|to)\s+(kill|murder|harm|torture)', r'(?i)(recipe|guide|tutorial)\s+for\s+(meth|cocaine|heroin)', r'(?i)how\s+to\s+(hack|crack|break\s+into|bypass)', ] # Medical misinformation self.medical_misinfo = [ r'(?i)(cancer|covid|hiv).+(cure|treat|prevent).+(bleach|hydrogen\s+peroxide|vitamin\s+c)', r'(?i)vaccines?\s+(cause|lead\s+to|result\s+in)\s+(autism|death|infertility)', r'(?i)essential\s+oils?\s+(cure|treat)\s+(cancer|diabetes|heart\s+disease)', ] def _load_profanity_list(self) -> Set[str]: """Load profanity word list.""" # Basic profanity list (expand as needed) return { 'fuck', 'shit', 'bitch', 'asshole', 'bastard', 'damn', 'cunt', 'piss', 'cock', 'dick', 'pussy', 'slut', 'whore' } def check_profanity(self, text: str) -> Tuple[bool, List[str]]: """ Check for profanity in text. Args: text: Text to check Returns: Tuple of (has_profanity, list of found words) """ if not self.config["enable_profanity_filter"]: return False, [] text_lower = text.lower() words = re.findall(r'\b\w+\b', text_lower) found_profanity = [word for word in words if word in self.profanity_list] return len(found_profanity) > 0, found_profanity def check_toxicity(self, text: str) -> Tuple[bool, float, List[str]]: """ Check for toxic content. Args: text: Text to check Returns: Tuple of (is_toxic, toxicity_score, matched_phrases) """ if not self.config["enable_toxicity_detection"]: return False, 0.0, [] text_lower = text.lower() matched_phrases = [] toxicity_score = 0.0 for phrase in self.toxic_phrases: if phrase in text_lower: matched_phrases.append(phrase) toxicity_score += 0.3 # Check for aggressive language patterns aggressive_patterns = [ r'\b(hate|despise|loathe)\s+you\b', r'\byou\s+(are|re)\s+(stupid|dumb|idiot|moron)', r'\bshut\s+up\b', r'\bgo\s+to\s+hell\b', ] for pattern in aggressive_patterns: if re.search(pattern, text_lower): toxicity_score += 0.2 is_toxic = toxicity_score > 0.5 return is_toxic, min(toxicity_score, 1.0), matched_phrases def check_bias(self, text: str) -> Tuple[bool, Dict[str, List[str]]]: """ Check for biased language. Args: text: Text to check Returns: Tuple of (has_bias, dictionary of bias types and matched phrases) """ if not self.config["enable_bias_detection"]: return False, {} text_lower = text.lower() bias_found = {} for bias_type, indicators in self.bias_indicators.items(): matches = [] for indicator in indicators: # Simple pattern matching (can be enhanced with ML) if indicator in text_lower: matches.append(indicator) if matches: bias_found[bias_type] = matches return len(bias_found) > 0, bias_found def check_pii(self, text: str) -> Tuple[bool, Dict[str, List[str]]]: """ Check for personally identifiable information. Args: text: Text to check Returns: Tuple of (has_pii, dictionary of PII types found) """ if not self.config["enable_pii_detection"]: return False, {} pii_found = {} # Social Security Number ssn_pattern = r'\b\d{3}-\d{2}-\d{4}\b' ssns = re.findall(ssn_pattern, text) if ssns: pii_found['ssn'] = ssns # Credit Card cc_pattern = r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b' ccs = re.findall(cc_pattern, text) if ccs: pii_found['credit_card'] = ccs # Email email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' emails = re.findall(email_pattern, text) if emails: pii_found['email'] = emails # Phone phone_pattern = r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b' phones = re.findall(phone_pattern, text) if phones: pii_found['phone'] = phones # Address (basic) address_pattern = r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd)\b' addresses = re.findall(address_pattern, text, re.IGNORECASE) if addresses: pii_found['address'] = addresses return len(pii_found) > 0, pii_found def check_spam(self, text: str) -> Tuple[bool, List[str]]: """ Check for spam content. Args: text: Text to check Returns: Tuple of (is_spam, list of matched patterns) """ if not self.config["enable_spam_detection"]: return False, [] matched_patterns = [] for pattern in self.spam_patterns: if re.search(pattern, text): matched_patterns.append(pattern) # Check for blocked domains for domain in self.config["blocked_domains"]: if domain in text.lower(): matched_patterns.append(f"Blocked domain: {domain}") return len(matched_patterns) > 0, matched_patterns def check_dangerous_content(self, text: str) -> Tuple[bool, List[str]]: """ Check for dangerous instructions or content. Args: text: Text to check Returns: Tuple of (is_dangerous, list of matched categories) """ text_lower = text.lower() dangerous_categories = [] # Check dangerous instructions for pattern in self.dangerous_instructions: if re.search(pattern, text_lower): dangerous_categories.append("dangerous_instructions") break # Check medical misinformation for pattern in self.medical_misinfo: if re.search(pattern, text_lower): dangerous_categories.append("medical_misinformation") break return len(dangerous_categories) > 0, dangerous_categories def check_repetition(self, text: str) -> Tuple[bool, float]: """ Check for excessive repetition (potential spam or model failure). Args: text: Text to check Returns: Tuple of (is_repetitive, repetition_ratio) """ words = text.split() if len(words) < 10: return False, 0.0 unique_words = len(set(words)) total_words = len(words) repetition_ratio = 1.0 - (unique_words / total_words) is_repetitive = repetition_ratio > self.config["max_repetition_ratio"] return is_repetitive, repetition_ratio def moderate_content(self, text: str, context: str = "general") -> ModerationResult: """ Perform comprehensive content moderation. Args: text: Text to moderate context: Context of the content (general, chat, code, etc.) Returns: ModerationResult with detailed analysis """ violations = [] confidence_scores = {} risk_level = "low" # Run all checks has_profanity, profanity_words = self.check_profanity(text) if has_profanity: violations.append(f"Profanity detected: {len(profanity_words)} words") confidence_scores["profanity"] = 0.9 risk_level = "medium" is_toxic, toxicity_score, toxic_phrases = self.check_toxicity(text) if is_toxic: violations.append(f"Toxic content detected (score: {toxicity_score:.2f})") confidence_scores["toxicity"] = toxicity_score risk_level = "high" has_bias, bias_types = self.check_bias(text) if has_bias: violations.append(f"Potential bias detected: {', '.join(bias_types.keys())}") confidence_scores["bias"] = 0.7 if risk_level == "low": risk_level = "medium" has_pii, pii_types = self.check_pii(text) if has_pii: violations.append(f"PII detected: {', '.join(pii_types.keys())}") confidence_scores["pii"] = 1.0 risk_level = "high" is_spam, spam_patterns = self.check_spam(text) if is_spam: violations.append(f"Spam indicators: {len(spam_patterns)}") confidence_scores["spam"] = 0.8 if risk_level == "low": risk_level = "medium" is_dangerous, dangerous_categories = self.check_dangerous_content(text) if is_dangerous: violations.append(f"Dangerous content: {', '.join(dangerous_categories)}") confidence_scores["dangerous"] = 0.95 risk_level = "critical" is_repetitive, repetition_ratio = self.check_repetition(text) if is_repetitive: violations.append(f"Excessive repetition ({repetition_ratio:.2%})") confidence_scores["repetition"] = repetition_ratio # Determine approval and recommended action is_approved = len(violations) == 0 or (risk_level == "low" and not is_dangerous) if risk_level == "critical": recommended_action = "block" elif risk_level == "high": recommended_action = "review" elif risk_level == "medium": recommended_action = "flag" else: recommended_action = "approve" # Sanitize if needed sanitized_content = None if has_pii: sanitized_content = self._sanitize_pii(text) return ModerationResult( timestamp=datetime.now().isoformat(), is_approved=is_approved, risk_level=risk_level, violations=violations, confidence_scores=confidence_scores, recommended_action=recommended_action, sanitized_content=sanitized_content, metadata={ "text_length": len(text), "word_count": len(text.split()), "context": context } ) def _sanitize_pii(self, text: str) -> str: """Sanitize text by removing/redacting PII.""" sanitized = text # Redact SSN sanitized = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN-REDACTED]', sanitized) # Redact credit cards sanitized = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CC-REDACTED]', sanitized) # Redact emails sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL-REDACTED]', sanitized) # Redact phones sanitized = re.sub(r'\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b', '[PHONE-REDACTED]', sanitized) return sanitized def batch_moderate(self, texts: List[str]) -> List[ModerationResult]: """ Moderate multiple texts in batch. Args: texts: List of texts to moderate Returns: List of ModerationResults """ return [self.moderate_content(text) for text in texts] def export_results(self, results: List[ModerationResult], filepath: str): """ Export moderation results to JSON file. Args: results: List of ModerationResults filepath: Output file path """ with open(filepath, 'w') as f: json.dump([asdict(r) for r in results], f, indent=2) logger.info(f"Exported {len(results)} moderation results to {filepath}") # Example usage if __name__ == "__main__": # Initialize filter filter_system = ContentFilter() # Test cases test_texts = [ "What is the capital of France?", # Safe "You are a stupid idiot!", # Toxic "My SSN is 123-45-6789", # PII "Buy now! Limited time offer! www.spam.com", # Spam "How to make a bomb at home", # Dangerous ] print("Content Moderation Results:\n") print("=" * 80) for i, text in enumerate(test_texts, 1): result = filter_system.moderate_content(text) print(f"\nTest {i}: {text[:50]}...") print(f"Approved: {result.is_approved}") print(f"Risk Level: {result.risk_level}") print(f"Violations: {result.violations}") print(f"Recommended Action: {result.recommended_action}") if result.sanitized_content: print(f"Sanitized: {result.sanitized_content[:100]}...") print("-" * 80) # Batch processing example results = filter_system.batch_moderate(test_texts) filter_system.export_results(results, "moderation_results.json") print(f"\n✓ Exported {len(results)} results to moderation_results.json")