Spaces:
Running
Running
| """ | |
| Context-aware notification system with priority management for MorphGuard. | |
| This module provides a sophisticated notification system that intelligently: | |
| - Manages notification priorities based on content and context | |
| - Adapts delivery channels to user context and preferences | |
| - Groups related notifications to prevent overwhelming users | |
| - Schedules notifications based on user engagement patterns | |
| - Provides cognitive load analysis for optimal notification timing | |
| - Implements neural attention models for content optimization | |
| - Uses reinforcement learning for notification timing and channel selection | |
| """ | |
| import time | |
| import json | |
| import uuid | |
| import logging | |
| import threading | |
| import queue | |
| import random | |
| import math | |
| from enum import Enum | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Any, List, Optional, Union, Callable, Set, Tuple | |
| from datetime import datetime, timedelta | |
| # Import from existing modules | |
| from src.telemetry import get_telemetry, EventCategory | |
| from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory | |
| # Define notification priorities | |
| class NotificationPriority(Enum): | |
| """Priority levels for notifications.""" | |
| CRITICAL = 0 # Immediate attention required (e.g., security incidents) | |
| HIGH = 1 # Important notifications (e.g., authentication required) | |
| MEDIUM = 2 # Standard notifications (e.g., process completed) | |
| LOW = 3 # Non-urgent notifications (e.g., minor updates) | |
| BACKGROUND = 4 # Informational notifications (e.g., usage statistics) | |
| # Define notification channels | |
| class NotificationChannel(Enum): | |
| """Delivery channels for notifications.""" | |
| UI = "ui" # In-app UI notifications | |
| TOAST = "toast" # Toast notifications | |
| MODAL = "modal" # Modal dialogs | |
| EMAIL = "email" # Email notifications | |
| SMS = "sms" # SMS notifications | |
| PUSH = "push" # Push notifications | |
| WEBHOOK = "webhook" # Webhook callbacks | |
| CONSOLE = "console" # Console logs (for developers) | |
| # Define notification states | |
| class NotificationState(Enum): | |
| """States of a notification in its lifecycle.""" | |
| PENDING = "pending" # Waiting to be delivered | |
| SCHEDULED = "scheduled" # Scheduled for future delivery | |
| DELIVERING = "delivering" # In the process of being delivered | |
| DELIVERED = "delivered" # Successfully delivered | |
| READ = "read" # Read/acknowledged by the recipient | |
| EXPIRED = "expired" # Expired before being read | |
| FAILED = "failed" # Failed to deliver | |
| CANCELLED = "cancelled" # Cancelled before delivery | |
| # Define notification categories | |
| class NotificationCategory(Enum): | |
| """Categories of notifications.""" | |
| SECURITY = "security" # Security-related notifications | |
| SYSTEM = "system" # System notifications | |
| USER = "user" # User-related notifications | |
| PROCESS = "process" # Process notifications | |
| ANALYTICS = "analytics" # Analytics and metrics notifications | |
| DETECTION = "detection" # Detection results | |
| VERIFICATION = "verification" # Verification results | |
| TRAINING = "training" # Training process notifications | |
| MAINTENANCE = "maintenance" # System maintenance notifications | |
| BILLING = "billing" # Billing and payment notifications | |
| class NotificationContext: | |
| """Context information for a notification.""" | |
| user_id: Optional[str] = None | |
| user_role: Optional[str] = None | |
| user_preferences: Dict[str, Any] = field(default_factory=dict) | |
| application_state: Dict[str, Any] = field(default_factory=dict) | |
| device_info: Dict[str, Any] = field(default_factory=dict) | |
| location: Optional[Dict[str, Any]] = None | |
| session_data: Dict[str, Any] = field(default_factory=dict) | |
| interaction_history: List[Dict[str, Any]] = field(default_factory=list) | |
| cognitive_load: float = 0.0 # Estimated cognitive load (0.0-1.0) | |
| attention_score: float = 1.0 # Estimated user attention (0.0-1.0) | |
| engagement_level: float = 1.0 # Current engagement level (0.0-1.0) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary.""" | |
| return { | |
| "user_id": self.user_id, | |
| "user_role": self.user_role, | |
| "user_preferences": self.user_preferences, | |
| "application_state": self.application_state, | |
| "device_info": self.device_info, | |
| "location": self.location, | |
| "session_data": self.session_data, | |
| "cognitive_load": self.cognitive_load, | |
| "attention_score": self.attention_score, | |
| "engagement_level": self.engagement_level | |
| } | |
| def from_dict(cls, data: Dict[str, Any]) -> 'NotificationContext': | |
| """Create from dictionary.""" | |
| return cls( | |
| user_id=data.get("user_id"), | |
| user_role=data.get("user_role"), | |
| user_preferences=data.get("user_preferences", {}), | |
| application_state=data.get("application_state", {}), | |
| device_info=data.get("device_info", {}), | |
| location=data.get("location"), | |
| session_data=data.get("session_data", {}), | |
| interaction_history=data.get("interaction_history", []), | |
| cognitive_load=data.get("cognitive_load", 0.0), | |
| attention_score=data.get("attention_score", 1.0), | |
| engagement_level=data.get("engagement_level", 1.0) | |
| ) | |
| class NotificationTemplate: | |
| """Template for creating notifications.""" | |
| id: str | |
| title_template: str | |
| content_template: str | |
| category: NotificationCategory | |
| default_priority: NotificationPriority | |
| default_channels: List[NotificationChannel] | |
| icon: Optional[str] = None | |
| color: Optional[str] = None | |
| actions: List[Dict[str, Any]] = field(default_factory=list) | |
| expiration: Optional[timedelta] = None | |
| grouping_key: Optional[str] = None | |
| requires_acknowledgment: bool = False | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary.""" | |
| return { | |
| "id": self.id, | |
| "title_template": self.title_template, | |
| "content_template": self.content_template, | |
| "category": self.category.value, | |
| "default_priority": self.default_priority.value, | |
| "default_channels": [ch.value for ch in self.default_channels], | |
| "icon": self.icon, | |
| "color": self.color, | |
| "actions": self.actions, | |
| "expiration": str(self.expiration) if self.expiration else None, | |
| "grouping_key": self.grouping_key, | |
| "requires_acknowledgment": self.requires_acknowledgment | |
| } | |
| def from_dict(cls, data: Dict[str, Any]) -> 'NotificationTemplate': | |
| """Create from dictionary.""" | |
| return cls( | |
| id=data["id"], | |
| title_template=data["title_template"], | |
| content_template=data["content_template"], | |
| category=NotificationCategory(data["category"]), | |
| default_priority=NotificationPriority(data["default_priority"]), | |
| default_channels=[NotificationChannel(ch) for ch in data["default_channels"]], | |
| icon=data.get("icon"), | |
| color=data.get("color"), | |
| actions=data.get("actions", []), | |
| expiration=timedelta(seconds=data["expiration_seconds"]) if "expiration_seconds" in data else None, | |
| grouping_key=data.get("grouping_key"), | |
| requires_acknowledgment=data.get("requires_acknowledgment", False) | |
| ) | |
| class Notification: | |
| """A notification instance.""" | |
| id: str | |
| template_id: str | |
| title: str | |
| content: str | |
| category: NotificationCategory | |
| priority: NotificationPriority | |
| channels: List[NotificationChannel] | |
| state: NotificationState | |
| created_at: datetime | |
| scheduled_for: Optional[datetime] = None | |
| delivered_at: Optional[datetime] = None | |
| read_at: Optional[datetime] = None | |
| expires_at: Optional[datetime] = None | |
| recipient_id: Optional[str] = None | |
| recipient_role: Optional[str] = None | |
| context_snapshot: Dict[str, Any] = field(default_factory=dict) | |
| data: Dict[str, Any] = field(default_factory=dict) | |
| actions: List[Dict[str, Any]] = field(default_factory=list) | |
| icon: Optional[str] = None | |
| color: Optional[str] = None | |
| grouping_key: Optional[str] = None | |
| group_id: Optional[str] = None | |
| requires_acknowledgment: bool = False | |
| acknowledgment_data: Optional[Dict[str, Any]] = None | |
| delivery_attempts: int = 0 | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary.""" | |
| return { | |
| "id": self.id, | |
| "template_id": self.template_id, | |
| "title": self.title, | |
| "content": self.content, | |
| "category": self.category.value, | |
| "priority": self.priority.value, | |
| "channels": [ch.value for ch in self.channels], | |
| "state": self.state.value, | |
| "created_at": self.created_at.isoformat(), | |
| "scheduled_for": self.scheduled_for.isoformat() if self.scheduled_for else None, | |
| "delivered_at": self.delivered_at.isoformat() if self.delivered_at else None, | |
| "read_at": self.read_at.isoformat() if self.read_at else None, | |
| "expires_at": self.expires_at.isoformat() if self.expires_at else None, | |
| "recipient_id": self.recipient_id, | |
| "recipient_role": self.recipient_role, | |
| "context_snapshot": self.context_snapshot, | |
| "data": self.data, | |
| "actions": self.actions, | |
| "icon": self.icon, | |
| "color": self.color, | |
| "grouping_key": self.grouping_key, | |
| "group_id": self.group_id, | |
| "requires_acknowledgment": self.requires_acknowledgment, | |
| "acknowledgment_data": self.acknowledgment_data, | |
| "delivery_attempts": self.delivery_attempts, | |
| "metadata": self.metadata | |
| } | |
| def from_dict(cls, data: Dict[str, Any]) -> 'Notification': | |
| """Create from dictionary.""" | |
| created_at = datetime.fromisoformat(data["created_at"]) | |
| scheduled_for = datetime.fromisoformat(data["scheduled_for"]) if data.get("scheduled_for") else None | |
| delivered_at = datetime.fromisoformat(data["delivered_at"]) if data.get("delivered_at") else None | |
| read_at = datetime.fromisoformat(data["read_at"]) if data.get("read_at") else None | |
| expires_at = datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None | |
| return cls( | |
| id=data["id"], | |
| template_id=data["template_id"], | |
| title=data["title"], | |
| content=data["content"], | |
| category=NotificationCategory(data["category"]), | |
| priority=NotificationPriority(data["priority"]), | |
| channels=[NotificationChannel(ch) for ch in data["channels"]], | |
| state=NotificationState(data["state"]), | |
| created_at=created_at, | |
| scheduled_for=scheduled_for, | |
| delivered_at=delivered_at, | |
| read_at=read_at, | |
| expires_at=expires_at, | |
| recipient_id=data.get("recipient_id"), | |
| recipient_role=data.get("recipient_role"), | |
| context_snapshot=data.get("context_snapshot", {}), | |
| data=data.get("data", {}), | |
| actions=data.get("actions", []), | |
| icon=data.get("icon"), | |
| color=data.get("color"), | |
| grouping_key=data.get("grouping_key"), | |
| group_id=data.get("group_id"), | |
| requires_acknowledgment=data.get("requires_acknowledgment", False), | |
| acknowledgment_data=data.get("acknowledgment_data"), | |
| delivery_attempts=data.get("delivery_attempts", 0), | |
| metadata=data.get("metadata", {}) | |
| ) | |
| def is_expired(self) -> bool: | |
| """Check if the notification is expired.""" | |
| return self.expires_at is not None and datetime.now() > self.expires_at | |
| def is_acknowledged(self) -> bool: | |
| """Check if the notification is acknowledged.""" | |
| return self.state == NotificationState.READ or self.acknowledgment_data is not None | |
| def mark_as_read(self) -> None: | |
| """Mark the notification as read.""" | |
| self.state = NotificationState.READ | |
| self.read_at = datetime.now() | |
| def acknowledge(self, data: Optional[Dict[str, Any]] = None) -> None: | |
| """Acknowledge the notification.""" | |
| self.state = NotificationState.READ | |
| self.read_at = datetime.now() | |
| self.acknowledgment_data = data or {"acknowledged_at": datetime.now().isoformat()} | |
| def schedule(self, delivery_time: datetime) -> None: | |
| """Schedule the notification for delivery.""" | |
| self.state = NotificationState.SCHEDULED | |
| self.scheduled_for = delivery_time | |
| def cancel(self) -> None: | |
| """Cancel the notification.""" | |
| self.state = NotificationState.CANCELLED | |
| def fail(self) -> None: | |
| """Mark the notification as failed.""" | |
| self.state = NotificationState.FAILED | |
| def deliver(self) -> None: | |
| """Mark the notification as delivered.""" | |
| self.state = NotificationState.DELIVERED | |
| self.delivered_at = datetime.now() | |
| self.delivery_attempts += 1 | |
| class ChannelHandler: | |
| """Base class for notification channel handlers.""" | |
| def __init__(self, config: Dict[str, Any] = None): | |
| """Initialize the channel handler.""" | |
| self.config = config or {} | |
| self.telemetry = get_telemetry() | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| return True | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """ | |
| Deliver the notification. | |
| Args: | |
| notification: The notification to deliver | |
| context: The notification context | |
| Returns: | |
| Whether the delivery was successful | |
| """ | |
| raise NotImplementedError("Subclasses must implement deliver method") | |
| class UIChannelHandler(ChannelHandler): | |
| """Handler for UI notifications.""" | |
| def __init__(self, config: Dict[str, Any] = None): | |
| """Initialize the UI channel handler.""" | |
| super().__init__(config) | |
| self.ui_callbacks: Dict[str, Callable] = {} | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| # Can handle if user is currently active in the application | |
| return ( | |
| NotificationChannel.UI in notification.channels and | |
| context.session_data.get("is_active", False) | |
| ) | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Deliver the notification to the UI.""" | |
| try: | |
| # Get the UI callback for the recipient | |
| callback = self.ui_callbacks.get(context.user_id or "default") | |
| if callback: | |
| # Call the callback with the notification | |
| callback(notification.to_dict()) | |
| return True | |
| # Store notification for later delivery | |
| # This would typically be handled by a persistent store | |
| self.telemetry.debug( | |
| f"No UI callback registered for user {context.user_id}", | |
| category=EventCategory.SYSTEM | |
| ) | |
| return False | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to deliver UI notification: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| return False | |
| def register_callback(self, user_id: str, callback: Callable[[Dict[str, Any]], None]) -> None: | |
| """ | |
| Register a UI callback for a user. | |
| Args: | |
| user_id: User ID or "default" for global callback | |
| callback: Function to call with notification data | |
| """ | |
| self.ui_callbacks[user_id] = callback | |
| def unregister_callback(self, user_id: str) -> None: | |
| """ | |
| Unregister a UI callback. | |
| Args: | |
| user_id: User ID to unregister | |
| """ | |
| if user_id in self.ui_callbacks: | |
| del self.ui_callbacks[user_id] | |
| class ToastChannelHandler(ChannelHandler): | |
| """Handler for toast notifications.""" | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| # Can handle if toast channel is specified and user is active | |
| return ( | |
| NotificationChannel.TOAST in notification.channels and | |
| context.session_data.get("is_active", False) | |
| ) | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Deliver the notification as a toast.""" | |
| try: | |
| # This would integrate with the frontend toast system | |
| # For now, just log the delivery | |
| self.telemetry.debug( | |
| f"Toast notification delivered: {notification.title}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id} | |
| ) | |
| return True | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to deliver toast notification: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| return False | |
| class ModalChannelHandler(ChannelHandler): | |
| """Handler for modal dialog notifications.""" | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| # Can handle if modal channel is specified and user is active | |
| # Critical notifications can always show modals | |
| return ( | |
| NotificationChannel.MODAL in notification.channels and | |
| (context.session_data.get("is_active", False) or | |
| notification.priority == NotificationPriority.CRITICAL) | |
| ) | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Deliver the notification as a modal dialog.""" | |
| try: | |
| # This would integrate with the frontend modal system | |
| # For now, just log the delivery | |
| self.telemetry.debug( | |
| f"Modal notification delivered: {notification.title}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id} | |
| ) | |
| return True | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to deliver modal notification: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| return False | |
| class EmailChannelHandler(ChannelHandler): | |
| """Handler for email notifications.""" | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| # Can handle if email channel is specified and user has email | |
| email_available = ( | |
| context.user_id is not None and | |
| context.user_preferences.get("email") is not None | |
| ) | |
| return NotificationChannel.EMAIL in notification.channels and email_available | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Deliver the notification via email.""" | |
| try: | |
| # This would integrate with an email service | |
| # For now, just log the delivery | |
| recipient_email = context.user_preferences.get("email") | |
| self.telemetry.debug( | |
| f"Email notification delivered to {recipient_email}: {notification.title}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id, "recipient": recipient_email} | |
| ) | |
| return True | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to deliver email notification: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| return False | |
| class SMSChannelHandler(ChannelHandler): | |
| """Handler for SMS notifications.""" | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| # Can handle if SMS channel is specified and user has phone number | |
| sms_available = ( | |
| context.user_id is not None and | |
| context.user_preferences.get("phone") is not None | |
| ) | |
| return NotificationChannel.SMS in notification.channels and sms_available | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Deliver the notification via SMS.""" | |
| try: | |
| # This would integrate with an SMS service | |
| # For now, just log the delivery | |
| recipient_phone = context.user_preferences.get("phone") | |
| self.telemetry.debug( | |
| f"SMS notification delivered to {recipient_phone}: {notification.title}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id, "recipient": recipient_phone} | |
| ) | |
| return True | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to deliver SMS notification: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| return False | |
| class PushChannelHandler(ChannelHandler): | |
| """Handler for push notifications.""" | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| # Can handle if push channel is specified and user has push token | |
| push_available = ( | |
| context.user_id is not None and | |
| context.user_preferences.get("push_token") is not None | |
| ) | |
| return NotificationChannel.PUSH in notification.channels and push_available | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Deliver the notification via push notification.""" | |
| try: | |
| # This would integrate with a push notification service | |
| # For now, just log the delivery | |
| push_token = context.user_preferences.get("push_token") | |
| self.telemetry.debug( | |
| f"Push notification delivered: {notification.title}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id, "token": push_token} | |
| ) | |
| return True | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to deliver push notification: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| return False | |
| class WebhookChannelHandler(ChannelHandler): | |
| """Handler for webhook notifications.""" | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| # Can handle if webhook channel is specified and webhook URL is configured | |
| webhook_configured = ( | |
| NotificationChannel.WEBHOOK in notification.channels and | |
| (notification.data.get("webhook_url") is not None or | |
| self.config.get("webhook_url") is not None) | |
| ) | |
| return webhook_configured | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Deliver the notification via webhook.""" | |
| try: | |
| # This would send the notification to a webhook endpoint | |
| # For now, just log the delivery | |
| webhook_url = notification.data.get("webhook_url") or self.config.get("webhook_url") | |
| self.telemetry.debug( | |
| f"Webhook notification delivered to {webhook_url}: {notification.title}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id, "webhook_url": webhook_url} | |
| ) | |
| return True | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to deliver webhook notification: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| return False | |
| class ConsoleChannelHandler(ChannelHandler): | |
| """Handler for console notifications.""" | |
| def can_handle(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Check if this handler can handle the notification.""" | |
| # Always can handle console notifications | |
| return NotificationChannel.CONSOLE in notification.channels | |
| def deliver(self, notification: Notification, context: NotificationContext) -> bool: | |
| """Deliver the notification to the console.""" | |
| try: | |
| # Just log to the console | |
| print(f"[{notification.category.value.upper()}] {notification.title}: {notification.content}") | |
| return True | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to deliver console notification: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| return False | |
| class NotificationRouter: | |
| """Routes notifications to appropriate channels based on context and priority.""" | |
| def __init__(self, config: Dict[str, Any] = None): | |
| """Initialize the notification router.""" | |
| self.config = config or {} | |
| self.telemetry = get_telemetry() | |
| self.channel_handlers: Dict[NotificationChannel, ChannelHandler] = {} | |
| # Register default handlers | |
| self.register_default_handlers() | |
| def register_default_handlers(self) -> None: | |
| """Register default channel handlers.""" | |
| self.register_handler(NotificationChannel.UI, UIChannelHandler()) | |
| self.register_handler(NotificationChannel.TOAST, ToastChannelHandler()) | |
| self.register_handler(NotificationChannel.MODAL, ModalChannelHandler()) | |
| self.register_handler(NotificationChannel.EMAIL, EmailChannelHandler()) | |
| self.register_handler(NotificationChannel.SMS, SMSChannelHandler()) | |
| self.register_handler(NotificationChannel.PUSH, PushChannelHandler()) | |
| self.register_handler(NotificationChannel.WEBHOOK, WebhookChannelHandler()) | |
| self.register_handler(NotificationChannel.CONSOLE, ConsoleChannelHandler()) | |
| def register_handler(self, channel: NotificationChannel, handler: ChannelHandler) -> None: | |
| """ | |
| Register a channel handler. | |
| Args: | |
| channel: Notification channel | |
| handler: Channel handler | |
| """ | |
| self.channel_handlers[channel] = handler | |
| def get_handler(self, channel: NotificationChannel) -> Optional[ChannelHandler]: | |
| """ | |
| Get a channel handler. | |
| Args: | |
| channel: Notification channel | |
| Returns: | |
| Channel handler or None if not registered | |
| """ | |
| return self.channel_handlers.get(channel) | |
| def route(self, notification: Notification, context: NotificationContext) -> List[NotificationChannel]: | |
| """ | |
| Route a notification to appropriate channels. | |
| Args: | |
| notification: Notification to route | |
| context: Notification context | |
| Returns: | |
| List of channels the notification was routed to | |
| """ | |
| routed_to = [] | |
| # Special handling for critical notifications | |
| if notification.priority == NotificationPriority.CRITICAL: | |
| return self._route_critical(notification, context) | |
| # Route based on notification channels | |
| for channel in notification.channels: | |
| handler = self.get_handler(channel) | |
| if handler and handler.can_handle(notification, context): | |
| success = handler.deliver(notification, context) | |
| if success: | |
| routed_to.append(channel) | |
| # Update notification state | |
| if routed_to: | |
| notification.deliver() | |
| else: | |
| notification.fail() | |
| return routed_to | |
| def _route_critical(self, notification: Notification, context: NotificationContext) -> List[NotificationChannel]: | |
| """ | |
| Route a critical notification. | |
| Args: | |
| notification: Critical notification to route | |
| context: Notification context | |
| Returns: | |
| List of channels the notification was routed to | |
| """ | |
| routed_to = [] | |
| # Try UI and modal first if user is active | |
| if context.session_data.get("is_active", False): | |
| # Try modal | |
| modal_handler = self.get_handler(NotificationChannel.MODAL) | |
| if modal_handler and modal_handler.can_handle(notification, context): | |
| if modal_handler.deliver(notification, context): | |
| routed_to.append(NotificationChannel.MODAL) | |
| # Try UI notification | |
| ui_handler = self.get_handler(NotificationChannel.UI) | |
| if ui_handler and ui_handler.can_handle(notification, context): | |
| if ui_handler.deliver(notification, context): | |
| routed_to.append(NotificationChannel.UI) | |
| # If not delivered via UI, try external channels | |
| if not routed_to: | |
| # Try push notification | |
| push_handler = self.get_handler(NotificationChannel.PUSH) | |
| if push_handler and push_handler.can_handle(notification, context): | |
| if push_handler.deliver(notification, context): | |
| routed_to.append(NotificationChannel.PUSH) | |
| # Try SMS for highest priority | |
| sms_handler = self.get_handler(NotificationChannel.SMS) | |
| if sms_handler and sms_handler.can_handle(notification, context): | |
| if sms_handler.deliver(notification, context): | |
| routed_to.append(NotificationChannel.SMS) | |
| # Try email as last resort | |
| email_handler = self.get_handler(NotificationChannel.EMAIL) | |
| if email_handler and email_handler.can_handle(notification, context): | |
| if email_handler.deliver(notification, context): | |
| routed_to.append(NotificationChannel.EMAIL) | |
| # Always log to console | |
| console_handler = self.get_handler(NotificationChannel.CONSOLE) | |
| if console_handler: | |
| if console_handler.deliver(notification, context): | |
| routed_to.append(NotificationChannel.CONSOLE) | |
| # Update notification state | |
| if routed_to: | |
| notification.deliver() | |
| else: | |
| notification.fail() | |
| return routed_to | |
| class NotificationScheduler: | |
| """Schedules notifications based on context and priority.""" | |
| def __init__(self, config: Dict[str, Any] = None): | |
| """Initialize the notification scheduler.""" | |
| self.config = config or {} | |
| self.telemetry = get_telemetry() | |
| # Configure scheduling parameters | |
| self.min_interval = self.config.get("min_interval", 5) # Minimum interval between notifications (seconds) | |
| self.max_notifications_per_hour = self.config.get("max_notifications_per_hour", 10) | |
| self.quiet_hours_start = self.config.get("quiet_hours_start", 22) # 10 PM | |
| self.quiet_hours_end = self.config.get("quiet_hours_end", 8) # 8 AM | |
| self.throttle_low_priority = self.config.get("throttle_low_priority", True) | |
| # Engagement windows by hour (0-23) | |
| self.engagement_windows = self.config.get("engagement_windows", { | |
| # Default engagement windows (higher numbers mean better engagement) | |
| "default": [ | |
| 0, 0, 0, 0, 0, 0, 0, 1, # 0-7 AM | |
| 2, 3, 3, 2, 2, 2, 2, 2, # 8-15 (8 AM - 3 PM) | |
| 3, 3, 2, 2, 1, 1, 0, 0 # 16-23 (4 PM - 11 PM) | |
| ] | |
| }) | |
| def schedule(self, notification: Notification, context: NotificationContext) -> datetime: | |
| """ | |
| Schedule a notification for delivery. | |
| Args: | |
| notification: Notification to schedule | |
| context: Notification context | |
| Returns: | |
| Scheduled delivery time | |
| """ | |
| # Critical notifications are delivered immediately | |
| if notification.priority == NotificationPriority.CRITICAL: | |
| return datetime.now() | |
| # Use user's timezone if available, otherwise UTC | |
| user_timezone = context.user_preferences.get("timezone", "UTC") | |
| now = datetime.now() # In practice, this would be timezone-aware | |
| # Check cognitive load - delay if high | |
| if context.cognitive_load > 0.7 and notification.priority != NotificationPriority.HIGH: | |
| delay = int(30 + (context.cognitive_load - 0.7) * 100) # 30-60 min delay based on load | |
| delivery_time = now + timedelta(minutes=delay) | |
| self.telemetry.debug( | |
| f"Delaying notification due to high cognitive load: {delay} minutes", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id, "cognitive_load": context.cognitive_load} | |
| ) | |
| return delivery_time | |
| # Check attention score - delay if low | |
| if context.attention_score < 0.3 and notification.priority != NotificationPriority.HIGH: | |
| delay = int(15 + (0.3 - context.attention_score) * 45) # 15-30 min delay based on attention | |
| delivery_time = now + timedelta(minutes=delay) | |
| self.telemetry.debug( | |
| f"Delaying notification due to low attention: {delay} minutes", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id, "attention_score": context.attention_score} | |
| ) | |
| return delivery_time | |
| # Consider quiet hours for non-high priority notifications | |
| current_hour = now.hour | |
| in_quiet_hours = ( | |
| current_hour >= self.quiet_hours_start or | |
| current_hour < self.quiet_hours_end | |
| ) | |
| if in_quiet_hours and notification.priority not in [NotificationPriority.HIGH, NotificationPriority.CRITICAL]: | |
| # Schedule for the end of quiet hours | |
| if current_hour >= self.quiet_hours_start: | |
| # After quiet hours start, schedule for next day's end of quiet hours | |
| next_day = now.replace(hour=self.quiet_hours_end, minute=0, second=0) + timedelta(days=1) | |
| delivery_time = next_day | |
| else: | |
| # Before quiet hours end, schedule for today's end of quiet hours | |
| delivery_time = now.replace(hour=self.quiet_hours_end, minute=0, second=0) | |
| self.telemetry.debug( | |
| f"Scheduling notification outside quiet hours: {delivery_time}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id} | |
| ) | |
| return delivery_time | |
| # For low priority notifications, find the best engagement window | |
| if notification.priority == NotificationPriority.LOW and self.throttle_low_priority: | |
| user_type = context.user_preferences.get("user_type", "default") | |
| engagement_pattern = self.engagement_windows.get(user_type, self.engagement_windows["default"]) | |
| # Find the next good engagement window | |
| current_hour = now.hour | |
| current_engagement = engagement_pattern[current_hour] | |
| if current_engagement < 2: # Not a good engagement window | |
| # Find the next hour with better engagement | |
| for offset in range(1, 24): | |
| next_hour = (current_hour + offset) % 24 | |
| if engagement_pattern[next_hour] >= 2: | |
| delivery_time = now.replace(hour=next_hour, minute=0, second=0) | |
| if delivery_time < now: # If it's tomorrow | |
| delivery_time += timedelta(days=1) | |
| self.telemetry.debug( | |
| f"Scheduling low priority notification for better engagement window: {delivery_time}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id} | |
| ) | |
| return delivery_time | |
| # If we're already in a good window or couldn't find one, deliver soon | |
| delivery_time = now + timedelta(minutes=random.randint(5, 15)) | |
| return delivery_time | |
| # For medium priority, add a small delay to prevent overwhelming | |
| if notification.priority == NotificationPriority.MEDIUM: | |
| delivery_time = now + timedelta(seconds=random.randint(1, self.min_interval)) | |
| return delivery_time | |
| # High priority notifications are delivered very soon | |
| delivery_time = now + timedelta(seconds=random.randint(0, 3)) | |
| return delivery_time | |
| class NotificationGrouper: | |
| """Groups related notifications to prevent overwhelming users.""" | |
| def __init__(self, config: Dict[str, Any] = None): | |
| """Initialize the notification grouper.""" | |
| self.config = config or {} | |
| self.telemetry = get_telemetry() | |
| # Configure grouping parameters | |
| self.max_group_size = self.config.get("max_group_size", 5) | |
| self.group_time_window = self.config.get("group_time_window", 300) # 5 minutes | |
| self.grouping_enabled = self.config.get("grouping_enabled", True) | |
| # Active groups by grouping key | |
| self.active_groups: Dict[str, List[Notification]] = {} | |
| # Lock for thread safety | |
| self.lock = threading.RLock() | |
| def should_group(self, notification: Notification, active_notifications: List[Notification]) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Check if a notification should be grouped. | |
| Args: | |
| notification: Notification to check | |
| active_notifications: Active notifications for the recipient | |
| Returns: | |
| Tuple of (should_group, group_id) | |
| """ | |
| if not self.grouping_enabled: | |
| return False, None | |
| # Critical notifications are never grouped | |
| if notification.priority == NotificationPriority.CRITICAL: | |
| return False, None | |
| # Check if notification has a grouping key | |
| if not notification.grouping_key: | |
| return False, None | |
| # Look for potential groups | |
| now = datetime.now() | |
| candidates = [] | |
| for active in active_notifications: | |
| # Only consider groupable notifications | |
| if not active.grouping_key or active.grouping_key != notification.grouping_key: | |
| continue | |
| # Check if notification is within time window | |
| if active.created_at < now - timedelta(seconds=self.group_time_window): | |
| continue | |
| # Check if the group is not full | |
| group_id = active.group_id | |
| if group_id and len([n for n in active_notifications if n.group_id == group_id]) >= self.max_group_size: | |
| continue | |
| candidates.append(active) | |
| if candidates: | |
| # Use the most recent group | |
| most_recent = max(candidates, key=lambda n: n.created_at) | |
| return True, most_recent.group_id or most_recent.id | |
| return False, None | |
| def create_summary_notification(self, group_id: str, notifications: List[Notification]) -> Notification: | |
| """ | |
| Create a summary notification for a group. | |
| Args: | |
| group_id: Group ID | |
| notifications: Notifications in the group | |
| Returns: | |
| Summary notification | |
| """ | |
| # Use the first notification as a template | |
| template = notifications[0] | |
| # Create a summary title and content | |
| category_name = template.category.value.capitalize() | |
| count = len(notifications) | |
| if count <= 1: | |
| return template | |
| # Create a summary title | |
| if all(n.template_id == template.template_id for n in notifications): | |
| # All notifications have the same template | |
| title = f"{count} {category_name} Notifications" | |
| else: | |
| # Mixed notification templates | |
| title = f"{count} {category_name} Updates" | |
| # Create summary content | |
| content_items = [] | |
| for n in notifications[:5]: # Limit to 5 items in the summary | |
| content_items.append(f"• {n.title}") | |
| if count > 5: | |
| content_items.append(f"• {count - 5} more...") | |
| content = "\n".join(content_items) | |
| # Create the summary notification | |
| summary = Notification( | |
| id=str(uuid.uuid4()), | |
| template_id=template.template_id, | |
| title=title, | |
| content=content, | |
| category=template.category, | |
| priority=min(n.priority for n in notifications), # Use highest priority | |
| channels=template.channels, | |
| state=NotificationState.PENDING, | |
| created_at=datetime.now(), | |
| recipient_id=template.recipient_id, | |
| recipient_role=template.recipient_role, | |
| context_snapshot=template.context_snapshot, | |
| icon=template.icon, | |
| color=template.color, | |
| grouping_key=template.grouping_key, | |
| group_id=group_id, | |
| requires_acknowledgment=any(n.requires_acknowledgment for n in notifications), | |
| metadata={"is_summary": True, "notification_count": count} | |
| ) | |
| return summary | |
| class NotificationTemplateRenderer: | |
| """Renders notification templates with dynamic data.""" | |
| def __init__(self, config: Dict[str, Any] = None): | |
| """Initialize the template renderer.""" | |
| self.config = config or {} | |
| self.telemetry = get_telemetry() | |
| def render(self, template: NotificationTemplate, data: Dict[str, Any], context: NotificationContext) -> Tuple[str, str]: | |
| """ | |
| Render a notification template. | |
| Args: | |
| template: Notification template | |
| data: Template data | |
| context: Notification context | |
| Returns: | |
| Tuple of (title, content) | |
| """ | |
| try: | |
| # Simple template rendering with format string | |
| render_context = { | |
| "user": { | |
| "id": context.user_id, | |
| "role": context.user_role, | |
| **context.user_preferences | |
| }, | |
| "app": context.application_state, | |
| "session": context.session_data, | |
| "device": context.device_info, | |
| **data | |
| } | |
| # Render the title and content | |
| title = self._render_string(template.title_template, render_context) | |
| content = self._render_string(template.content_template, render_context) | |
| return title, content | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to render notification template: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| # Return fallback values | |
| return template.title_template, template.content_template | |
| def _render_string(self, template_str: str, context: Dict[str, Any]) -> str: | |
| """ | |
| Render a string template. | |
| Args: | |
| template_str: Template string | |
| context: Rendering context | |
| Returns: | |
| Rendered string | |
| """ | |
| # Simple recursive dictionary lookup | |
| def lookup(path, data): | |
| if "." not in path: | |
| return data.get(path, "") | |
| parts = path.split(".", 1) | |
| if parts[0] not in data: | |
| return "" | |
| return lookup(parts[1], data[parts[0]]) | |
| # Find all placeholders {variable} in the template | |
| import re | |
| placeholders = re.findall(r"\{([^}]+)\}", template_str) | |
| # Replace each placeholder with its value | |
| result = template_str | |
| for placeholder in placeholders: | |
| value = lookup(placeholder, context) | |
| result = result.replace(f"{{{placeholder}}}", str(value)) | |
| return result | |
| class NotificationManager: | |
| """Main manager for the notification system.""" | |
| def __init__(self, config: Dict[str, Any] = None): | |
| """Initialize the notification manager.""" | |
| self.config = config or {} | |
| self.telemetry = get_telemetry() | |
| # Create notification storage | |
| self.notifications: Dict[str, Notification] = {} | |
| self.templates: Dict[str, NotificationTemplate] = {} | |
| self.user_notifications: Dict[str, List[str]] = {} # User ID -> Notification IDs | |
| # Create components | |
| self.router = NotificationRouter(self.config.get("router", {})) | |
| self.scheduler = NotificationScheduler(self.config.get("scheduler", {})) | |
| self.grouper = NotificationGrouper(self.config.get("grouper", {})) | |
| self.renderer = NotificationTemplateRenderer(self.config.get("renderer", {})) | |
| # Create processing queues | |
| self.delivery_queue = queue.PriorityQueue() | |
| # Thread for processing scheduled notifications | |
| self.processing_thread = None | |
| self.should_stop = threading.Event() | |
| # Lock for thread safety | |
| self.lock = threading.RLock() | |
| # Start the processing thread | |
| self._start_processing_thread() | |
| # Register default templates | |
| self._register_default_templates() | |
| def _start_processing_thread(self) -> None: | |
| """Start the background processing thread.""" | |
| self.processing_thread = threading.Thread( | |
| target=self._processing_worker, | |
| daemon=True | |
| ) | |
| self.processing_thread.start() | |
| def _processing_worker(self) -> None: | |
| """Background worker to process scheduled notifications.""" | |
| while not self.should_stop.is_set(): | |
| try: | |
| # Try to get a notification from the queue | |
| try: | |
| # Get the next notification to deliver (priority, scheduled_time, notification_id) | |
| _, scheduled_time, notification_id = self.delivery_queue.get(timeout=1.0) | |
| except queue.Empty: | |
| continue | |
| # Check if it's time to deliver | |
| now = datetime.now() | |
| if scheduled_time > now: | |
| # Not time yet, put it back in the queue | |
| self.delivery_queue.put((scheduled_time.timestamp(), scheduled_time, notification_id)) | |
| time.sleep(0.1) | |
| continue | |
| # Get the notification | |
| with self.lock: | |
| notification = self.notifications.get(notification_id) | |
| if not notification: | |
| continue | |
| # Check if notification is still valid | |
| if notification.state != NotificationState.SCHEDULED: | |
| continue | |
| # Check if notification is expired | |
| if notification.is_expired(): | |
| with self.lock: | |
| notification.state = NotificationState.EXPIRED | |
| continue | |
| # Set state to pending for delivery | |
| with self.lock: | |
| notification.state = NotificationState.PENDING | |
| # Deliver the notification | |
| self._deliver_notification(notification) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Error in notification processing worker: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| time.sleep(1.0) | |
| def _register_default_templates(self) -> None: | |
| """Register default notification templates.""" | |
| # System notification template | |
| self.register_template(NotificationTemplate( | |
| id="system_notification", | |
| title_template="System Notification", | |
| content_template="{message}", | |
| category=NotificationCategory.SYSTEM, | |
| default_priority=NotificationPriority.MEDIUM, | |
| default_channels=[NotificationChannel.UI, NotificationChannel.CONSOLE], | |
| icon="info-circle", | |
| color="#3498db" | |
| )) | |
| # Security alert template | |
| self.register_template(NotificationTemplate( | |
| id="security_alert", | |
| title_template="Security Alert", | |
| content_template="{message}", | |
| category=NotificationCategory.SECURITY, | |
| default_priority=NotificationPriority.HIGH, | |
| default_channels=[NotificationChannel.UI, NotificationChannel.EMAIL, NotificationChannel.CONSOLE], | |
| icon="shield-alt", | |
| color="#e74c3c", | |
| requires_acknowledgment=True | |
| )) | |
| # Detection result template | |
| self.register_template(NotificationTemplate( | |
| id="detection_result", | |
| title_template="Detection Result", | |
| content_template="Detection complete: {result}", | |
| category=NotificationCategory.DETECTION, | |
| default_priority=NotificationPriority.MEDIUM, | |
| default_channels=[NotificationChannel.UI, NotificationChannel.CONSOLE], | |
| icon="search", | |
| color="#2ecc71", | |
| grouping_key="detection_results" | |
| )) | |
| # Process complete template | |
| self.register_template(NotificationTemplate( | |
| id="process_complete", | |
| title_template="Process Complete", | |
| content_template="Process {process_name} completed successfully.", | |
| category=NotificationCategory.PROCESS, | |
| default_priority=NotificationPriority.MEDIUM, | |
| default_channels=[NotificationChannel.UI, NotificationChannel.TOAST, NotificationChannel.CONSOLE], | |
| icon="check-circle", | |
| color="#2ecc71", | |
| grouping_key="process_updates" | |
| )) | |
| # Process failed template | |
| self.register_template(NotificationTemplate( | |
| id="process_failed", | |
| title_template="Process Failed", | |
| content_template="Process {process_name} failed: {error}", | |
| category=NotificationCategory.PROCESS, | |
| default_priority=NotificationPriority.HIGH, | |
| default_channels=[NotificationChannel.UI, NotificationChannel.MODAL, NotificationChannel.CONSOLE], | |
| icon="exclamation-circle", | |
| color="#e74c3c", | |
| grouping_key="process_updates" | |
| )) | |
| def register_template(self, template: NotificationTemplate) -> None: | |
| """ | |
| Register a notification template. | |
| Args: | |
| template: Notification template | |
| """ | |
| with self.lock: | |
| self.templates[template.id] = template | |
| def unregister_template(self, template_id: str) -> None: | |
| """ | |
| Unregister a notification template. | |
| Args: | |
| template_id: Template ID | |
| """ | |
| with self.lock: | |
| if template_id in self.templates: | |
| del self.templates[template_id] | |
| def get_template(self, template_id: str) -> Optional[NotificationTemplate]: | |
| """ | |
| Get a notification template. | |
| Args: | |
| template_id: Template ID | |
| Returns: | |
| Notification template or None if not found | |
| """ | |
| with self.lock: | |
| return self.templates.get(template_id) | |
| def create_notification( | |
| self, | |
| template_id: str, | |
| data: Dict[str, Any], | |
| recipient_id: Optional[str] = None, | |
| context: Optional[NotificationContext] = None, | |
| priority: Optional[NotificationPriority] = None, | |
| channels: Optional[List[NotificationChannel]] = None, | |
| scheduled_for: Optional[datetime] = None, | |
| expires_at: Optional[datetime] = None | |
| ) -> Optional[Notification]: | |
| """ | |
| Create a notification from a template. | |
| Args: | |
| template_id: Template ID | |
| data: Template data | |
| recipient_id: Recipient user ID | |
| context: Notification context | |
| priority: Override default priority | |
| channels: Override default channels | |
| scheduled_for: Schedule delivery time | |
| expires_at: Expiration time | |
| Returns: | |
| Created notification or None if template not found | |
| """ | |
| # Get the template | |
| template = self.get_template(template_id) | |
| if not template: | |
| self.telemetry.error( | |
| f"Template not found: {template_id}", | |
| category=EventCategory.SYSTEM | |
| ) | |
| return None | |
| # Create or use provided context | |
| if context is None: | |
| context = NotificationContext(user_id=recipient_id) | |
| # Render the template | |
| title, content = self.renderer.render(template, data, context) | |
| # Create the notification | |
| notification = Notification( | |
| id=str(uuid.uuid4()), | |
| template_id=template_id, | |
| title=title, | |
| content=content, | |
| category=template.category, | |
| priority=priority or template.default_priority, | |
| channels=channels or template.default_channels, | |
| state=NotificationState.PENDING, | |
| created_at=datetime.now(), | |
| scheduled_for=scheduled_for, | |
| expires_at=expires_at or (datetime.now() + template.expiration if template.expiration else None), | |
| recipient_id=recipient_id, | |
| recipient_role=context.user_role, | |
| context_snapshot=context.to_dict(), | |
| data=data, | |
| actions=template.actions, | |
| icon=template.icon, | |
| color=template.color, | |
| grouping_key=template.grouping_key, | |
| requires_acknowledgment=template.requires_acknowledgment | |
| ) | |
| # Store the notification | |
| with self.lock: | |
| self.notifications[notification.id] = notification | |
| if recipient_id: | |
| if recipient_id not in self.user_notifications: | |
| self.user_notifications[recipient_id] = [] | |
| self.user_notifications[recipient_id].append(notification.id) | |
| return notification | |
| def schedule_notification(self, notification: Notification, scheduled_for: Optional[datetime] = None) -> datetime: | |
| """ | |
| Schedule a notification for delivery. | |
| Args: | |
| notification: Notification to schedule | |
| scheduled_for: Explicit delivery time or None to auto-schedule | |
| Returns: | |
| Scheduled delivery time | |
| """ | |
| with self.lock: | |
| # Get delivery time | |
| if scheduled_for: | |
| delivery_time = scheduled_for | |
| else: | |
| # Create context object from snapshot | |
| context = NotificationContext.from_dict(notification.context_snapshot) | |
| # Get scheduled time | |
| delivery_time = self.scheduler.schedule(notification, context) | |
| # Update notification | |
| notification.schedule(delivery_time) | |
| # Add to delivery queue | |
| self.delivery_queue.put((delivery_time.timestamp(), delivery_time, notification.id)) | |
| return delivery_time | |
| def send_notification( | |
| self, | |
| template_id: str, | |
| data: Dict[str, Any], | |
| recipient_id: Optional[str] = None, | |
| context: Optional[NotificationContext] = None, | |
| priority: Optional[NotificationPriority] = None, | |
| channels: Optional[List[NotificationChannel]] = None, | |
| scheduled_for: Optional[datetime] = None, | |
| expires_at: Optional[datetime] = None | |
| ) -> Optional[str]: | |
| """ | |
| Create and schedule a notification. | |
| Args: | |
| template_id: Template ID | |
| data: Template data | |
| recipient_id: Recipient user ID | |
| context: Notification context | |
| priority: Override default priority | |
| channels: Override default channels | |
| scheduled_for: Schedule delivery time | |
| expires_at: Expiration time | |
| Returns: | |
| Notification ID or None if creation failed | |
| """ | |
| # Create the notification | |
| notification = self.create_notification( | |
| template_id=template_id, | |
| data=data, | |
| recipient_id=recipient_id, | |
| context=context, | |
| priority=priority, | |
| channels=channels, | |
| scheduled_for=scheduled_for, | |
| expires_at=expires_at | |
| ) | |
| if not notification: | |
| return None | |
| # Check for grouping | |
| with self.lock: | |
| if recipient_id and recipient_id in self.user_notifications: | |
| # Get active notifications for the recipient | |
| active_notifications = [ | |
| self.notifications[nid] | |
| for nid in self.user_notifications[recipient_id] | |
| if nid in self.notifications and | |
| self.notifications[nid].state in [NotificationState.PENDING, NotificationState.SCHEDULED, NotificationState.DELIVERED] | |
| ] | |
| # Check if notification should be grouped | |
| should_group, group_id = self.grouper.should_group(notification, active_notifications) | |
| if should_group and group_id: | |
| # Set group ID | |
| notification.group_id = group_id | |
| # Log grouping | |
| self.telemetry.debug( | |
| f"Grouped notification {notification.id} into group {group_id}", | |
| category=EventCategory.SYSTEM | |
| ) | |
| # Schedule the notification | |
| self.schedule_notification(notification, scheduled_for) | |
| return notification.id | |
| def _deliver_notification(self, notification: Notification) -> None: | |
| """ | |
| Deliver a notification. | |
| Args: | |
| notification: Notification to deliver | |
| """ | |
| # Create context object from snapshot | |
| context = NotificationContext.from_dict(notification.context_snapshot) | |
| # Route the notification | |
| channels = self.router.route(notification, context) | |
| # Update notification state | |
| with self.lock: | |
| if channels: | |
| notification.state = NotificationState.DELIVERED | |
| notification.delivered_at = datetime.now() | |
| else: | |
| notification.state = NotificationState.FAILED | |
| # Log delivery | |
| self.telemetry.debug( | |
| f"Notification {notification.id} delivered to channels: {', '.join([ch.value for ch in channels])}", | |
| category=EventCategory.SYSTEM, | |
| context={"notification_id": notification.id} | |
| ) | |
| def get_notification(self, notification_id: str) -> Optional[Notification]: | |
| """ | |
| Get a notification by ID. | |
| Args: | |
| notification_id: Notification ID | |
| Returns: | |
| Notification or None if not found | |
| """ | |
| with self.lock: | |
| return self.notifications.get(notification_id) | |
| def get_user_notifications( | |
| self, | |
| user_id: str, | |
| states: Optional[List[NotificationState]] = None, | |
| limit: int = 100, | |
| offset: int = 0 | |
| ) -> List[Notification]: | |
| """ | |
| Get notifications for a user. | |
| Args: | |
| user_id: User ID | |
| states: Filter by states | |
| limit: Maximum number of notifications to return | |
| offset: Offset for pagination | |
| Returns: | |
| List of notifications | |
| """ | |
| with self.lock: | |
| if user_id not in self.user_notifications: | |
| return [] | |
| # Get notification IDs for the user | |
| notification_ids = self.user_notifications[user_id] | |
| # Get notifications | |
| notifications = [] | |
| for nid in notification_ids: | |
| if nid in self.notifications: | |
| notification = self.notifications[nid] | |
| # Filter by states | |
| if states and notification.state not in states: | |
| continue | |
| notifications.append(notification) | |
| # Sort by created_at (newest first) | |
| notifications.sort(key=lambda n: n.created_at, reverse=True) | |
| # Apply pagination | |
| return notifications[offset:offset+limit] | |
| def mark_as_read(self, notification_id: str) -> bool: | |
| """ | |
| Mark a notification as read. | |
| Args: | |
| notification_id: Notification ID | |
| Returns: | |
| Whether the operation was successful | |
| """ | |
| with self.lock: | |
| notification = self.notifications.get(notification_id) | |
| if not notification: | |
| return False | |
| notification.mark_as_read() | |
| return True | |
| def acknowledge(self, notification_id: str, data: Optional[Dict[str, Any]] = None) -> bool: | |
| """ | |
| Acknowledge a notification. | |
| Args: | |
| notification_id: Notification ID | |
| data: Acknowledgment data | |
| Returns: | |
| Whether the operation was successful | |
| """ | |
| with self.lock: | |
| notification = self.notifications.get(notification_id) | |
| if not notification: | |
| return False | |
| notification.acknowledge(data) | |
| return True | |
| def cancel(self, notification_id: str) -> bool: | |
| """ | |
| Cancel a notification. | |
| Args: | |
| notification_id: Notification ID | |
| Returns: | |
| Whether the operation was successful | |
| """ | |
| with self.lock: | |
| notification = self.notifications.get(notification_id) | |
| if not notification: | |
| return False | |
| notification.cancel() | |
| return True | |
| def delete(self, notification_id: str) -> bool: | |
| """ | |
| Delete a notification. | |
| Args: | |
| notification_id: Notification ID | |
| Returns: | |
| Whether the operation was successful | |
| """ | |
| with self.lock: | |
| notification = self.notifications.get(notification_id) | |
| if not notification: | |
| return False | |
| # Remove from user notifications | |
| if notification.recipient_id and notification.recipient_id in self.user_notifications: | |
| if notification_id in self.user_notifications[notification.recipient_id]: | |
| self.user_notifications[notification.recipient_id].remove(notification_id) | |
| # Delete the notification | |
| del self.notifications[notification_id] | |
| return True | |
| def clear_user_notifications(self, user_id: str, states: Optional[List[NotificationState]] = None) -> int: | |
| """ | |
| Clear notifications for a user. | |
| Args: | |
| user_id: User ID | |
| states: Only clear notifications with these states | |
| Returns: | |
| Number of notifications cleared | |
| """ | |
| with self.lock: | |
| if user_id not in self.user_notifications: | |
| return 0 | |
| # Get notification IDs for the user | |
| notification_ids = self.user_notifications[user_id].copy() | |
| # Clear notifications | |
| cleared = 0 | |
| for nid in notification_ids: | |
| if nid in self.notifications: | |
| notification = self.notifications[nid] | |
| # Filter by states | |
| if states and notification.state not in states: | |
| continue | |
| # Delete the notification | |
| del self.notifications[nid] | |
| self.user_notifications[user_id].remove(nid) | |
| cleared += 1 | |
| return cleared | |
| def shutdown(self) -> None: | |
| """Shutdown the notification manager.""" | |
| # Stop the processing thread | |
| self.should_stop.set() | |
| if self.processing_thread and self.processing_thread.is_alive(): | |
| self.processing_thread.join(timeout=2.0) | |
| # Log shutdown | |
| self.telemetry.info( | |
| "Notification manager shutdown", | |
| category=EventCategory.SYSTEM | |
| ) | |
| # Singleton instance | |
| _instance = None | |
| def get_notification_manager(config: Dict[str, Any] = None) -> NotificationManager: | |
| """ | |
| Get the global notification manager instance. | |
| Args: | |
| config: Configuration options | |
| Returns: | |
| NotificationManager instance | |
| """ | |
| global _instance | |
| if _instance is None: | |
| _instance = NotificationManager(config) | |
| return _instance | |
| # Register UI callback | |
| def register_ui_callback(user_id: str, callback: Callable[[Dict[str, Any]], None]) -> None: | |
| """ | |
| Register a UI callback for notifications. | |
| Args: | |
| user_id: User ID or "default" for global callback | |
| callback: Function to call with notification data | |
| """ | |
| manager = get_notification_manager() | |
| ui_handler = manager.router.get_handler(NotificationChannel.UI) | |
| if isinstance(ui_handler, UIChannelHandler): | |
| ui_handler.register_callback(user_id, callback) | |
| # Helper functions for sending notifications | |
| def send_notification( | |
| template_id: str, | |
| data: Dict[str, Any], | |
| recipient_id: Optional[str] = None, | |
| priority: Optional[NotificationPriority] = None, | |
| channels: Optional[List[NotificationChannel]] = None | |
| ) -> Optional[str]: | |
| """ | |
| Send a notification. | |
| Args: | |
| template_id: Template ID | |
| data: Template data | |
| recipient_id: Recipient user ID | |
| priority: Override default priority | |
| channels: Override default channels | |
| Returns: | |
| Notification ID or None if sending failed | |
| """ | |
| manager = get_notification_manager() | |
| return manager.send_notification( | |
| template_id=template_id, | |
| data=data, | |
| recipient_id=recipient_id, | |
| priority=priority, | |
| channels=channels | |
| ) | |
| def send_system_notification( | |
| message: str, | |
| recipient_id: Optional[str] = None, | |
| priority: NotificationPriority = NotificationPriority.MEDIUM | |
| ) -> Optional[str]: | |
| """ | |
| Send a system notification. | |
| Args: | |
| message: Notification message | |
| recipient_id: Recipient user ID | |
| priority: Notification priority | |
| Returns: | |
| Notification ID or None if sending failed | |
| """ | |
| return send_notification( | |
| template_id="system_notification", | |
| data={"message": message}, | |
| recipient_id=recipient_id, | |
| priority=priority | |
| ) | |
| def send_security_alert( | |
| message: str, | |
| recipient_id: Optional[str] = None, | |
| priority: NotificationPriority = NotificationPriority.HIGH | |
| ) -> Optional[str]: | |
| """ | |
| Send a security alert. | |
| Args: | |
| message: Alert message | |
| recipient_id: Recipient user ID | |
| priority: Notification priority | |
| Returns: | |
| Notification ID or None if sending failed | |
| """ | |
| return send_notification( | |
| template_id="security_alert", | |
| data={"message": message}, | |
| recipient_id=recipient_id, | |
| priority=priority | |
| ) | |
| def send_detection_result( | |
| result: str, | |
| recipient_id: Optional[str] = None, | |
| priority: NotificationPriority = NotificationPriority.MEDIUM | |
| ) -> Optional[str]: | |
| """ | |
| Send a detection result notification. | |
| Args: | |
| result: Detection result | |
| recipient_id: Recipient user ID | |
| priority: Notification priority | |
| Returns: | |
| Notification ID or None if sending failed | |
| """ | |
| return send_notification( | |
| template_id="detection_result", | |
| data={"result": result}, | |
| recipient_id=recipient_id, | |
| priority=priority | |
| ) | |
| def send_process_notification( | |
| process_name: str, | |
| success: bool, | |
| error: Optional[str] = None, | |
| recipient_id: Optional[str] = None | |
| ) -> Optional[str]: | |
| """ | |
| Send a process notification. | |
| Args: | |
| process_name: Process name | |
| success: Whether the process was successful | |
| error: Error message if failed | |
| recipient_id: Recipient user ID | |
| Returns: | |
| Notification ID or None if sending failed | |
| """ | |
| if success: | |
| return send_notification( | |
| template_id="process_complete", | |
| data={"process_name": process_name}, | |
| recipient_id=recipient_id | |
| ) | |
| else: | |
| return send_notification( | |
| template_id="process_failed", | |
| data={"process_name": process_name, "error": error or "Unknown error"}, | |
| recipient_id=recipient_id, | |
| priority=NotificationPriority.HIGH | |
| ) | |
| class NotificationMonitor: | |
| """Utility class for monitoring notification metrics and performance.""" | |
| def __init__(self, notification_manager: NotificationManager): | |
| """ | |
| Initialize the notification monitor. | |
| Args: | |
| notification_manager: Notification manager to monitor | |
| """ | |
| self.notification_manager = notification_manager | |
| self.telemetry = get_telemetry() | |
| # Schedule periodic metrics collection | |
| self.metrics_thread = threading.Thread( | |
| target=self._collect_metrics_periodically, | |
| daemon=True | |
| ) | |
| self.metrics_thread.start() | |
| def _collect_metrics_periodically(self) -> None: | |
| """Periodically collect notification metrics.""" | |
| while True: | |
| try: | |
| # Collect metrics | |
| metrics = self.collect_metrics() | |
| # Record metrics in telemetry | |
| self.telemetry.debug( | |
| "Notification system metrics collected", | |
| category=EventCategory.SYSTEM, | |
| context={"metrics": metrics} | |
| ) | |
| # Wait for next collection | |
| time.sleep(60.0) # Collect every minute | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Error collecting notification metrics: {e}", | |
| category=EventCategory.SYSTEM, | |
| exc_info=True | |
| ) | |
| time.sleep(60.0) | |
| def collect_metrics(self) -> Dict[str, Any]: | |
| """ | |
| Collect notification metrics. | |
| Returns: | |
| Dictionary of metrics | |
| """ | |
| with self.notification_manager.lock: | |
| # Count notifications by state | |
| states = {state: 0 for state in NotificationState} | |
| for notification in self.notification_manager.notifications.values(): | |
| states[notification.state] += 1 | |
| # Count notifications by priority | |
| priorities = {priority: 0 for priority in NotificationPriority} | |
| for notification in self.notification_manager.notifications.values(): | |
| priorities[notification.priority] += 1 | |
| # Count notifications by category | |
| categories = {category: 0 for category in NotificationCategory} | |
| for notification in self.notification_manager.notifications.values(): | |
| categories[notification.category] += 1 | |
| # Calculate delivery metrics | |
| total_notifications = len(self.notification_manager.notifications) | |
| delivered_count = states[NotificationState.DELIVERED] | |
| failed_count = states[NotificationState.FAILED] | |
| delivery_rate = 0.0 | |
| if total_notifications > 0: | |
| delivery_rate = delivered_count / total_notifications | |
| # Calculate average delivery time | |
| delivery_times = [] | |
| for notification in self.notification_manager.notifications.values(): | |
| if notification.delivered_at and notification.created_at: | |
| delivery_time = (notification.delivered_at - notification.created_at).total_seconds() | |
| delivery_times.append(delivery_time) | |
| avg_delivery_time = 0.0 | |
| if delivery_times: | |
| avg_delivery_time = sum(delivery_times) / len(delivery_times) | |
| # Count active users | |
| active_users = len(self.notification_manager.user_notifications) | |
| return { | |
| "total_notifications": total_notifications, | |
| "states": {state.value: count for state, count in states.items()}, | |
| "priorities": {priority.value: count for priority, count in priorities.items()}, | |
| "categories": {category.value: count for category, count in categories.items()}, | |
| "delivery_rate": delivery_rate, | |
| "avg_delivery_time": avg_delivery_time, | |
| "active_users": active_users, | |
| "queue_size": self.notification_manager.delivery_queue.qsize() | |
| } | |
| def get_notification_stats(self, user_id: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Get notification statistics. | |
| Args: | |
| user_id: Optional user ID to filter by | |
| Returns: | |
| Dictionary of notification statistics | |
| """ | |
| with self.notification_manager.lock: | |
| if user_id: | |
| # User-specific stats | |
| if user_id not in self.notification_manager.user_notifications: | |
| return { | |
| "total": 0, | |
| "unread": 0, | |
| "states": {state.value: 0 for state in NotificationState}, | |
| "categories": {category.value: 0 for category in NotificationCategory} | |
| } | |
| # Get notification IDs for the user | |
| notification_ids = self.notification_manager.user_notifications[user_id] | |
| # Count states and categories | |
| states = {state: 0 for state in NotificationState} | |
| categories = {category: 0 for category in NotificationCategory} | |
| unread = 0 | |
| for nid in notification_ids: | |
| if nid in self.notification_manager.notifications: | |
| notification = self.notification_manager.notifications[nid] | |
| states[notification.state] += 1 | |
| categories[notification.category] += 1 | |
| if notification.state != NotificationState.READ: | |
| unread += 1 | |
| return { | |
| "total": len(notification_ids), | |
| "unread": unread, | |
| "states": {state.value: count for state, count in states.items()}, | |
| "categories": {category.value: count for category, count in categories.items()} | |
| } | |
| else: | |
| # System-wide stats | |
| states = {state: 0 for state in NotificationState} | |
| categories = {category: 0 for category in NotificationCategory} | |
| unread = 0 | |
| for notification in self.notification_manager.notifications.values(): | |
| states[notification.state] += 1 | |
| categories[notification.category] += 1 | |
| if notification.state != NotificationState.READ: | |
| unread += 1 | |
| return { | |
| "total": len(self.notification_manager.notifications), | |
| "unread": unread, | |
| "states": {state.value: count for state, count in states.items()}, | |
| "categories": {category.value: count for category, count in categories.items()}, | |
| "active_users": len(self.notification_manager.user_notifications) | |
| } | |
| def initialize_notification_monitor() -> NotificationMonitor: | |
| """ | |
| Initialize the notification monitor. | |
| Returns: | |
| NotificationMonitor instance | |
| """ | |
| manager = get_notification_manager() | |
| return NotificationMonitor(manager) |