""" Bad Word Sentinel Core monitoring and alerting logic for OFP content moderation """ from typing import Dict, List, Optional import logging from datetime import datetime, timezone from .ofp_client import OFPClient from .profanity_detector import ProfanityDetector from .models import Envelope logger = logging.getLogger(__name__) class BadWordSentinel: """Sentinel agent for monitoring OFP conversations for profanity""" def __init__( self, speaker_uri: str, service_url: str, profanity_detector: ProfanityDetector, convener_uri: str, convener_url: str ): """ Initialize sentinel agent Args: speaker_uri: Sentinel's unique speaker URI service_url: Sentinel's service endpoint URL profanity_detector: Configured profanity detector instance convener_uri: Convener's speaker URI convener_url: Convener's service endpoint URL """ self.speaker_uri = speaker_uri self.service_url = service_url self.convener_uri = convener_uri self.convener_url = convener_url # Initialize OFP client manifest = self._create_manifest() self.ofp_client = OFPClient(speaker_uri, service_url, manifest) # Initialize profanity detector self.detector = profanity_detector # Statistics tracking self.violations_detected = 0 self.alerts_sent = 0 self.messages_processed = 0 self.activity_log = [] self.connection_status = "Initializing..." self.is_monitoring = False logger.info(f"Bad Word Sentinel initialized: {speaker_uri}") def _create_manifest(self) -> Dict: """Create assistant manifest for sentinel""" return { "identification": { "speakerUri": self.speaker_uri, "serviceUrl": self.service_url, "conversationalName": "Content Moderator Sentinel", "role": "Monitoring Agent", "synopsis": "Automated content moderation and profanity detection for OFP conversations" }, "capabilities": [{ "keyphrases": ["content moderation", "safety monitoring", "profanity detection"], "supportedLayers": ["text"], "descriptions": ["Monitors conversations for policy violations and alerts conveners"] }] } def process_envelope(self, envelope: Envelope): """ Process incoming OFP envelope and check for profanity Args: envelope: OFP envelope to process """ try: self.messages_processed += 1 for event in envelope.events: # Only process utterance events if event.get('eventType') != 'utterance': continue # Extract text from dialog event params = event.get('parameters', {}) dialog_event = params.get('dialogEvent', {}) features = dialog_event.get('features', {}) text_feature = features.get('text', {}) tokens = text_feature.get('tokens', []) # Combine all token values into text text = ' '.join(token.get('value', '') for token in tokens) if not text: continue # Check for profanity violation = self.detector.detect_violations(text) if violation: self._handle_violation( envelope=envelope, event=event, dialog_event=dialog_event, violation=violation ) except Exception as e: logger.error(f"Error processing envelope: {e}") self._log_activity(f"ERROR: Failed to process envelope - {str(e)}") def _handle_violation( self, envelope: Envelope, event: Dict, dialog_event: Dict, violation: Dict ): """ Handle detected profanity violation Args: envelope: Original envelope event: Event containing violation dialog_event: Dialog event with text violation: Violation details from detector """ self.violations_detected += 1 # Extract speaker information violating_speaker = dialog_event.get('speakerUri', 'unknown') # Create alert data alert_data = { "alertType": "content_violation", "severity": violation['severity'], "violatingMessage": { "messageId": dialog_event.get('id'), "speakerUri": violating_speaker, "timestamp": dialog_event.get('span', {}).get('startTime'), "excerpt": violation['censored_text'] }, "detectedPatterns": violation['violations'], "violationCount": violation['violation_count'], "recommendedAction": self._recommend_action(violation['severity']), "context": { "conversationId": envelope.conversation.get('id'), "totalViolations": self.violations_detected, "detectionTime": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'), "sentinelUri": self.speaker_uri } } # Send private alert to convener logger.warning( f"VIOLATION DETECTED: {violation['severity'].upper()} severity - " f"{len(violation['violations'])} violations by {violating_speaker}" ) success = self.ofp_client.send_private_alert( convener_uri=self.convener_uri, convener_url=self.convener_url, conversation_id=envelope.conversation.get('id'), alert_data=alert_data ) if success: self.alerts_sent += 1 log_msg = ( f"ALERT: {violation['severity'].upper()} severity - " f"{len(violation['violations'])} violation(s) detected from {violating_speaker}" ) self._log_activity(log_msg) logger.info(f"Alert sent successfully to convener") else: self._log_activity("ERROR: Failed to send alert to convener") logger.error("Failed to send alert to convener") def _recommend_action(self, severity: str) -> str: """ Recommend enforcement action based on severity Args: severity: Violation severity level Returns: Recommended action for convener """ actions = { "low": "warn_user", "medium": "revoke_floor_temporary", "high": "uninvite_user" } return actions.get(severity, "warn_user") def _log_activity(self, message: str): """ Log activity with timestamp Args: message: Activity message to log """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" self.activity_log.append(log_entry) # Keep only last 100 entries if len(self.activity_log) > 100: self.activity_log = self.activity_log[-100:] def get_status(self) -> Dict: """ Get current sentinel status Returns: Dictionary with status information """ return { "connection_status": self.connection_status, "is_monitoring": self.is_monitoring, "violations_detected": self.violations_detected, "alerts_sent": self.alerts_sent, "messages_processed": self.messages_processed, "recent_logs": self.activity_log[-10:] if self.activity_log else [], "speaker_uri": self.speaker_uri, "convener_uri": self.convener_uri } def get_full_log(self) -> List[str]: """Get complete activity log""" return self.activity_log.copy() def start_monitoring(self): """Start the sentinel monitoring service""" self.is_monitoring = True self.connection_status = "✅ Monitoring Active" self._log_activity("Sentinel monitoring started") logger.info("Bad word sentinel started successfully") def stop_monitoring(self): """Stop the sentinel monitoring service""" self.is_monitoring = False self.connection_status = "⏸️ Monitoring Paused" self._log_activity("Sentinel monitoring stopped") logger.info("Bad word sentinel stopped") def reset_statistics(self): """Reset violation statistics""" self.violations_detected = 0 self.alerts_sent = 0 self.messages_processed = 0 self._log_activity("Statistics reset") logger.info("Sentinel statistics reset") def get_manifest(self) -> Dict: """Get assistant manifest""" return self.ofp_client.get_manifest()