Spaces:
Sleeping
Sleeping
| """ | |
| Bad Word Sentinel | |
| Core monitoring and alerting logic for OFP content moderation | |
| """ | |
| from typing import Dict, List, Optional | |
| import logging | |
| from datetime import datetime, timezone | |
| from .ofp_client import OFPClient | |
| from .profanity_detector import ProfanityDetector | |
| from .models import Envelope | |
| logger = logging.getLogger(__name__) | |
| class BadWordSentinel: | |
| """Sentinel agent for monitoring OFP conversations for profanity""" | |
| def __init__( | |
| self, | |
| speaker_uri: str, | |
| service_url: str, | |
| profanity_detector: ProfanityDetector, | |
| convener_uri: str, | |
| convener_url: str | |
| ): | |
| """ | |
| Initialize sentinel agent | |
| Args: | |
| speaker_uri: Sentinel's unique speaker URI | |
| service_url: Sentinel's service endpoint URL | |
| profanity_detector: Configured profanity detector instance | |
| convener_uri: Convener's speaker URI | |
| convener_url: Convener's service endpoint URL | |
| """ | |
| self.speaker_uri = speaker_uri | |
| self.service_url = service_url | |
| self.convener_uri = convener_uri | |
| self.convener_url = convener_url | |
| # Initialize OFP client | |
| manifest = self._create_manifest() | |
| self.ofp_client = OFPClient(speaker_uri, service_url, manifest) | |
| # Initialize profanity detector | |
| self.detector = profanity_detector | |
| # Statistics tracking | |
| self.violations_detected = 0 | |
| self.alerts_sent = 0 | |
| self.messages_processed = 0 | |
| self.activity_log = [] | |
| self.connection_status = "Initializing..." | |
| self.is_monitoring = False | |
| logger.info(f"Bad Word Sentinel initialized: {speaker_uri}") | |
| def _create_manifest(self) -> Dict: | |
| """Create assistant manifest for sentinel""" | |
| return { | |
| "identification": { | |
| "speakerUri": self.speaker_uri, | |
| "serviceUrl": self.service_url, | |
| "conversationalName": "Content Moderator Sentinel", | |
| "role": "Monitoring Agent", | |
| "synopsis": "Automated content moderation and profanity detection for OFP conversations" | |
| }, | |
| "capabilities": [{ | |
| "keyphrases": ["content moderation", "safety monitoring", "profanity detection"], | |
| "supportedLayers": ["text"], | |
| "descriptions": ["Monitors conversations for policy violations and alerts conveners"] | |
| }] | |
| } | |
| def process_envelope(self, envelope: Envelope): | |
| """ | |
| Process incoming OFP envelope and check for profanity | |
| Args: | |
| envelope: OFP envelope to process | |
| """ | |
| try: | |
| self.messages_processed += 1 | |
| for event in envelope.events: | |
| # Only process utterance events | |
| if event.get('eventType') != 'utterance': | |
| continue | |
| # Extract text from dialog event | |
| params = event.get('parameters', {}) | |
| dialog_event = params.get('dialogEvent', {}) | |
| features = dialog_event.get('features', {}) | |
| text_feature = features.get('text', {}) | |
| tokens = text_feature.get('tokens', []) | |
| # Combine all token values into text | |
| text = ' '.join(token.get('value', '') for token in tokens) | |
| if not text: | |
| continue | |
| # Check for profanity | |
| violation = self.detector.detect_violations(text) | |
| if violation: | |
| self._handle_violation( | |
| envelope=envelope, | |
| event=event, | |
| dialog_event=dialog_event, | |
| violation=violation | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error processing envelope: {e}") | |
| self._log_activity(f"ERROR: Failed to process envelope - {str(e)}") | |
| def _handle_violation( | |
| self, | |
| envelope: Envelope, | |
| event: Dict, | |
| dialog_event: Dict, | |
| violation: Dict | |
| ): | |
| """ | |
| Handle detected profanity violation | |
| Args: | |
| envelope: Original envelope | |
| event: Event containing violation | |
| dialog_event: Dialog event with text | |
| violation: Violation details from detector | |
| """ | |
| self.violations_detected += 1 | |
| # Extract speaker information | |
| violating_speaker = dialog_event.get('speakerUri', 'unknown') | |
| # Create alert data | |
| alert_data = { | |
| "alertType": "content_violation", | |
| "severity": violation['severity'], | |
| "violatingMessage": { | |
| "messageId": dialog_event.get('id'), | |
| "speakerUri": violating_speaker, | |
| "timestamp": dialog_event.get('span', {}).get('startTime'), | |
| "excerpt": violation['censored_text'] | |
| }, | |
| "detectedPatterns": violation['violations'], | |
| "violationCount": violation['violation_count'], | |
| "recommendedAction": self._recommend_action(violation['severity']), | |
| "context": { | |
| "conversationId": envelope.conversation.get('id'), | |
| "totalViolations": self.violations_detected, | |
| "detectionTime": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'), | |
| "sentinelUri": self.speaker_uri | |
| } | |
| } | |
| # Send private alert to convener | |
| logger.warning( | |
| f"VIOLATION DETECTED: {violation['severity'].upper()} severity - " | |
| f"{len(violation['violations'])} violations by {violating_speaker}" | |
| ) | |
| success = self.ofp_client.send_private_alert( | |
| convener_uri=self.convener_uri, | |
| convener_url=self.convener_url, | |
| conversation_id=envelope.conversation.get('id'), | |
| alert_data=alert_data | |
| ) | |
| if success: | |
| self.alerts_sent += 1 | |
| log_msg = ( | |
| f"ALERT: {violation['severity'].upper()} severity - " | |
| f"{len(violation['violations'])} violation(s) detected from {violating_speaker}" | |
| ) | |
| self._log_activity(log_msg) | |
| logger.info(f"Alert sent successfully to convener") | |
| else: | |
| self._log_activity("ERROR: Failed to send alert to convener") | |
| logger.error("Failed to send alert to convener") | |
| def _recommend_action(self, severity: str) -> str: | |
| """ | |
| Recommend enforcement action based on severity | |
| Args: | |
| severity: Violation severity level | |
| Returns: | |
| Recommended action for convener | |
| """ | |
| actions = { | |
| "low": "warn_user", | |
| "medium": "revoke_floor_temporary", | |
| "high": "uninvite_user" | |
| } | |
| return actions.get(severity, "warn_user") | |
| def _log_activity(self, message: str): | |
| """ | |
| Log activity with timestamp | |
| Args: | |
| message: Activity message to log | |
| """ | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| self.activity_log.append(log_entry) | |
| # Keep only last 100 entries | |
| if len(self.activity_log) > 100: | |
| self.activity_log = self.activity_log[-100:] | |
| def get_status(self) -> Dict: | |
| """ | |
| Get current sentinel status | |
| Returns: | |
| Dictionary with status information | |
| """ | |
| return { | |
| "connection_status": self.connection_status, | |
| "is_monitoring": self.is_monitoring, | |
| "violations_detected": self.violations_detected, | |
| "alerts_sent": self.alerts_sent, | |
| "messages_processed": self.messages_processed, | |
| "recent_logs": self.activity_log[-10:] if self.activity_log else [], | |
| "speaker_uri": self.speaker_uri, | |
| "convener_uri": self.convener_uri | |
| } | |
| def get_full_log(self) -> List[str]: | |
| """Get complete activity log""" | |
| return self.activity_log.copy() | |
| def start_monitoring(self): | |
| """Start the sentinel monitoring service""" | |
| self.is_monitoring = True | |
| self.connection_status = "✅ Monitoring Active" | |
| self._log_activity("Sentinel monitoring started") | |
| logger.info("Bad word sentinel started successfully") | |
| def stop_monitoring(self): | |
| """Stop the sentinel monitoring service""" | |
| self.is_monitoring = False | |
| self.connection_status = "⏸️ Monitoring Paused" | |
| self._log_activity("Sentinel monitoring stopped") | |
| logger.info("Bad word sentinel stopped") | |
| def reset_statistics(self): | |
| """Reset violation statistics""" | |
| self.violations_detected = 0 | |
| self.alerts_sent = 0 | |
| self.messages_processed = 0 | |
| self._log_activity("Statistics reset") | |
| logger.info("Sentinel statistics reset") | |
| def get_manifest(self) -> Dict: | |
| """Get assistant manifest""" | |
| return self.ofp_client.get_manifest() | |