""" Context-aware notification system with priority management for MorphGuard. This module provides a sophisticated notification system that intelligently: - Manages notification priorities based on content and context - Adapts delivery channels to user context and preferences - Groups related notifications to prevent overwhelming users - Schedules notifications based on user engagement patterns - Provides cognitive load analysis for optimal notification timing - Implements neural attention models for content optimization - Uses reinforcement learning for notification timing and channel selection """ import time import json import uuid import logging import threading import queue import random import math from enum import Enum from dataclasses import dataclass, field from typing import Dict, Any, List, Optional, Union, Callable, Set, Tuple from datetime import datetime, timedelta # Import from existing modules from src.telemetry import get_telemetry, EventCategory from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory # Define notification priorities class NotificationPriority(Enum): """Priority levels for notifications.""" CRITICAL = 0 # Immediate attention required (e.g., security incidents) HIGH = 1 # Important notifications (e.g., authentication required) MEDIUM = 2 # Standard notifications (e.g., process completed) LOW = 3 # Non-urgent notifications (e.g., minor updates) BACKGROUND = 4 # Informational notifications (e.g., usage statistics) # Define notification channels class NotificationChannel(Enum): """Delivery channels for notifications.""" UI = "ui" # In-app UI notifications TOAST = "toast" # Toast notifications MODAL = "modal" # Modal dialogs EMAIL = "email" # Email notifications SMS = "sms" # SMS notifications PUSH = "push" # Push notifications WEBHOOK = "webhook" # Webhook callbacks CONSOLE = "console" # Console logs (for developers) # Define notification states class NotificationState(Enum): """States of a notification in its lifecycle.""" PENDING = "pending" # Waiting to be delivered SCHEDULED = "scheduled" # Scheduled for future delivery DELIVERING = "delivering" # In the process of being delivered DELIVERED = "delivered" # Successfully delivered READ = "read" # Read/acknowledged by the recipient EXPIRED = "expired" # Expired before being read FAILED = "failed" # Failed to deliver CANCELLED = "cancelled" # Cancelled before delivery # Define notification categories class NotificationCategory(Enum): """Categories of notifications.""" SECURITY = "security" # Security-related notifications SYSTEM = "system" # System notifications USER = "user" # User-related notifications PROCESS = "process" # Process notifications ANALYTICS = "analytics" # Analytics and metrics notifications DETECTION = "detection" # Detection results VERIFICATION = "verification" # Verification results TRAINING = "training" # Training process notifications MAINTENANCE = "maintenance" # System maintenance notifications BILLING = "billing" # Billing and payment notifications @dataclass class NotificationContext: """Context information for a notification.""" user_id: Optional[str] = None user_role: Optional[str] = None user_preferences: Dict[str, Any] = field(default_factory=dict) application_state: Dict[str, Any] = field(default_factory=dict) device_info: Dict[str, Any] = field(default_factory=dict) location: Optional[Dict[str, Any]] = None session_data: Dict[str, Any] = field(default_factory=dict) interaction_history: List[Dict[str, Any]] = field(default_factory=list) cognitive_load: float = 0.0 # Estimated cognitive load (0.0-1.0) attention_score: float = 1.0 # Estimated user attention (0.0-1.0) engagement_level: float = 1.0 # Current engagement level (0.0-1.0) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "user_id": self.user_id, "user_role": self.user_role, "user_preferences": self.user_preferences, "application_state": self.application_state, "device_info": self.device_info, "location": self.location, "session_data": self.session_data, "cognitive_load": self.cognitive_load, "attention_score": self.attention_score, "engagement_level": self.engagement_level } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'NotificationContext': """Create from dictionary.""" return cls( user_id=data.get("user_id"), user_role=data.get("user_role"), user_preferences=data.get("user_preferences", {}), application_state=data.get("application_state", {}), device_info=data.get("device_info", {}), location=data.get("location"), session_data=data.get("session_data", {}), interaction_history=data.get("interaction_history", []), cognitive_load=data.get("cognitive_load", 0.0), attention_score=data.get("attention_score", 1.0), engagement_level=data.get("engagement_level", 1.0) ) @dataclass class NotificationTemplate: """Template for creating notifications.""" id: str title_template: str content_template: str category: NotificationCategory default_priority: NotificationPriority default_channels: List[NotificationChannel] icon: Optional[str] = None color: Optional[str] = None actions: List[Dict[str, Any]] = field(default_factory=list) expiration: Optional[timedelta] = None grouping_key: Optional[str] = None requires_acknowledgment: bool = False def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "id": self.id, "title_template": self.title_template, "content_template": self.content_template, "category": self.category.value, "default_priority": self.default_priority.value, "default_channels": [ch.value for ch in self.default_channels], "icon": self.icon, "color": self.color, "actions": self.actions, "expiration": str(self.expiration) if self.expiration else None, "grouping_key": self.grouping_key, "requires_acknowledgment": self.requires_acknowledgment } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'NotificationTemplate': """Create from dictionary.""" return cls( id=data["id"], title_template=data["title_template"], content_template=data["content_template"], category=NotificationCategory(data["category"]), default_priority=NotificationPriority(data["default_priority"]), default_channels=[NotificationChannel(ch) for ch in data["default_channels"]], icon=data.get("icon"), color=data.get("color"), actions=data.get("actions", []), expiration=timedelta(seconds=data["expiration_seconds"]) if "expiration_seconds" in data else None, grouping_key=data.get("grouping_key"), requires_acknowledgment=data.get("requires_acknowledgment", False) ) @dataclass class Notification: """A notification instance.""" id: str template_id: str title: str content: str category: NotificationCategory priority: NotificationPriority channels: List[NotificationChannel] state: NotificationState created_at: datetime scheduled_for: Optional[datetime] = None delivered_at: Optional[datetime] = None read_at: Optional[datetime] = None expires_at: Optional[datetime] = None recipient_id: Optional[str] = None recipient_role: Optional[str] = None context_snapshot: Dict[str, Any] = field(default_factory=dict) data: Dict[str, Any] = field(default_factory=dict) actions: List[Dict[str, Any]] = field(default_factory=list) icon: Optional[str] = None color: Optional[str] = None grouping_key: Optional[str] = None group_id: Optional[str] = None requires_acknowledgment: bool = False acknowledgment_data: Optional[Dict[str, Any]] = None delivery_attempts: int = 0 metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "id": self.id, "template_id": self.template_id, "title": self.title, "content": self.content, "category": self.category.value, "priority": self.priority.value, "channels": [ch.value for ch in self.channels], "state": self.state.value, "created_at": self.created_at.isoformat(), "scheduled_for": self.scheduled_for.isoformat() if self.scheduled_for else None, "delivered_at": self.delivered_at.isoformat() if self.delivered_at else None, "read_at": self.read_at.isoformat() if self.read_at else None, "expires_at": self.expires_at.isoformat() if self.expires_at else None, "recipient_id": self.recipient_id, "recipient_role": self.recipient_role, "context_snapshot": self.context_snapshot, "data": self.data, "actions": self.actions, "icon": self.icon, "color": self.color, "grouping_key": self.grouping_key, "group_id": self.group_id, "requires_acknowledgment": self.requires_acknowledgment, "acknowledgment_data": self.acknowledgment_data, "delivery_attempts": self.delivery_attempts, "metadata": self.metadata } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'Notification': """Create from dictionary.""" created_at = datetime.fromisoformat(data["created_at"]) scheduled_for = datetime.fromisoformat(data["scheduled_for"]) if data.get("scheduled_for") else None delivered_at = datetime.fromisoformat(data["delivered_at"]) if data.get("delivered_at") else None read_at = datetime.fromisoformat(data["read_at"]) if data.get("read_at") else None expires_at = datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None return cls( id=data["id"], template_id=data["template_id"], title=data["title"], content=data["content"], category=NotificationCategory(data["category"]), priority=NotificationPriority(data["priority"]), channels=[NotificationChannel(ch) for ch in data["channels"]], state=NotificationState(data["state"]), created_at=created_at, scheduled_for=scheduled_for, delivered_at=delivered_at, read_at=read_at, expires_at=expires_at, recipient_id=data.get("recipient_id"), recipient_role=data.get("recipient_role"), context_snapshot=data.get("context_snapshot", {}), data=data.get("data", {}), actions=data.get("actions", []), icon=data.get("icon"), color=data.get("color"), grouping_key=data.get("grouping_key"), group_id=data.get("group_id"), requires_acknowledgment=data.get("requires_acknowledgment", False), acknowledgment_data=data.get("acknowledgment_data"), delivery_attempts=data.get("delivery_attempts", 0), metadata=data.get("metadata", {}) ) def is_expired(self) -> bool: """Check if the notification is expired.""" return self.expires_at is not None and datetime.now() > self.expires_at def is_acknowledged(self) -> bool: """Check if the notification is acknowledged.""" return self.state == NotificationState.READ or self.acknowledgment_data is not None def mark_as_read(self) -> None: """Mark the notification as read.""" self.state = NotificationState.READ self.read_at = datetime.now() def acknowledge(self, data: Optional[Dict[str, Any]] = None) -> None: """Acknowledge the notification.""" self.state = NotificationState.READ self.read_at = datetime.now() self.acknowledgment_data = data or {"acknowledged_at": datetime.now().isoformat()} def schedule(self, delivery_time: datetime) -> None: """Schedule the notification for delivery.""" self.state = NotificationState.SCHEDULED self.scheduled_for = delivery_time def cancel(self) -> None: """Cancel the notification.""" self.state = NotificationState.CANCELLED def fail(self) -> None: """Mark the notification as failed.""" self.state = NotificationState.FAILED def deliver(self) -> None: """Mark the notification as delivered.""" self.state = NotificationState.DELIVERED self.delivered_at = datetime.now() self.delivery_attempts += 1 class ChannelHandler: """Base class for notification channel handlers.""" def __init__(self, config: Dict[str, Any] = None): """Initialize the channel handler.""" self.config = config or {} self.telemetry = get_telemetry() def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" return True def deliver(self, notification: Notification, context: NotificationContext) -> bool: """ Deliver the notification. Args: notification: The notification to deliver context: The notification context Returns: Whether the delivery was successful """ raise NotImplementedError("Subclasses must implement deliver method") class UIChannelHandler(ChannelHandler): """Handler for UI notifications.""" def __init__(self, config: Dict[str, Any] = None): """Initialize the UI channel handler.""" super().__init__(config) self.ui_callbacks: Dict[str, Callable] = {} def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" # Can handle if user is currently active in the application return ( NotificationChannel.UI in notification.channels and context.session_data.get("is_active", False) ) def deliver(self, notification: Notification, context: NotificationContext) -> bool: """Deliver the notification to the UI.""" try: # Get the UI callback for the recipient callback = self.ui_callbacks.get(context.user_id or "default") if callback: # Call the callback with the notification callback(notification.to_dict()) return True # Store notification for later delivery # This would typically be handled by a persistent store self.telemetry.debug( f"No UI callback registered for user {context.user_id}", category=EventCategory.SYSTEM ) return False except Exception as e: self.telemetry.error( f"Failed to deliver UI notification: {e}", category=EventCategory.SYSTEM, exc_info=True ) return False def register_callback(self, user_id: str, callback: Callable[[Dict[str, Any]], None]) -> None: """ Register a UI callback for a user. Args: user_id: User ID or "default" for global callback callback: Function to call with notification data """ self.ui_callbacks[user_id] = callback def unregister_callback(self, user_id: str) -> None: """ Unregister a UI callback. Args: user_id: User ID to unregister """ if user_id in self.ui_callbacks: del self.ui_callbacks[user_id] class ToastChannelHandler(ChannelHandler): """Handler for toast notifications.""" def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" # Can handle if toast channel is specified and user is active return ( NotificationChannel.TOAST in notification.channels and context.session_data.get("is_active", False) ) def deliver(self, notification: Notification, context: NotificationContext) -> bool: """Deliver the notification as a toast.""" try: # This would integrate with the frontend toast system # For now, just log the delivery self.telemetry.debug( f"Toast notification delivered: {notification.title}", category=EventCategory.SYSTEM, context={"notification_id": notification.id} ) return True except Exception as e: self.telemetry.error( f"Failed to deliver toast notification: {e}", category=EventCategory.SYSTEM, exc_info=True ) return False class ModalChannelHandler(ChannelHandler): """Handler for modal dialog notifications.""" def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" # Can handle if modal channel is specified and user is active # Critical notifications can always show modals return ( NotificationChannel.MODAL in notification.channels and (context.session_data.get("is_active", False) or notification.priority == NotificationPriority.CRITICAL) ) def deliver(self, notification: Notification, context: NotificationContext) -> bool: """Deliver the notification as a modal dialog.""" try: # This would integrate with the frontend modal system # For now, just log the delivery self.telemetry.debug( f"Modal notification delivered: {notification.title}", category=EventCategory.SYSTEM, context={"notification_id": notification.id} ) return True except Exception as e: self.telemetry.error( f"Failed to deliver modal notification: {e}", category=EventCategory.SYSTEM, exc_info=True ) return False class EmailChannelHandler(ChannelHandler): """Handler for email notifications.""" def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" # Can handle if email channel is specified and user has email email_available = ( context.user_id is not None and context.user_preferences.get("email") is not None ) return NotificationChannel.EMAIL in notification.channels and email_available def deliver(self, notification: Notification, context: NotificationContext) -> bool: """Deliver the notification via email.""" try: # This would integrate with an email service # For now, just log the delivery recipient_email = context.user_preferences.get("email") self.telemetry.debug( f"Email notification delivered to {recipient_email}: {notification.title}", category=EventCategory.SYSTEM, context={"notification_id": notification.id, "recipient": recipient_email} ) return True except Exception as e: self.telemetry.error( f"Failed to deliver email notification: {e}", category=EventCategory.SYSTEM, exc_info=True ) return False class SMSChannelHandler(ChannelHandler): """Handler for SMS notifications.""" def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" # Can handle if SMS channel is specified and user has phone number sms_available = ( context.user_id is not None and context.user_preferences.get("phone") is not None ) return NotificationChannel.SMS in notification.channels and sms_available def deliver(self, notification: Notification, context: NotificationContext) -> bool: """Deliver the notification via SMS.""" try: # This would integrate with an SMS service # For now, just log the delivery recipient_phone = context.user_preferences.get("phone") self.telemetry.debug( f"SMS notification delivered to {recipient_phone}: {notification.title}", category=EventCategory.SYSTEM, context={"notification_id": notification.id, "recipient": recipient_phone} ) return True except Exception as e: self.telemetry.error( f"Failed to deliver SMS notification: {e}", category=EventCategory.SYSTEM, exc_info=True ) return False class PushChannelHandler(ChannelHandler): """Handler for push notifications.""" def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" # Can handle if push channel is specified and user has push token push_available = ( context.user_id is not None and context.user_preferences.get("push_token") is not None ) return NotificationChannel.PUSH in notification.channels and push_available def deliver(self, notification: Notification, context: NotificationContext) -> bool: """Deliver the notification via push notification.""" try: # This would integrate with a push notification service # For now, just log the delivery push_token = context.user_preferences.get("push_token") self.telemetry.debug( f"Push notification delivered: {notification.title}", category=EventCategory.SYSTEM, context={"notification_id": notification.id, "token": push_token} ) return True except Exception as e: self.telemetry.error( f"Failed to deliver push notification: {e}", category=EventCategory.SYSTEM, exc_info=True ) return False class WebhookChannelHandler(ChannelHandler): """Handler for webhook notifications.""" def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" # Can handle if webhook channel is specified and webhook URL is configured webhook_configured = ( NotificationChannel.WEBHOOK in notification.channels and (notification.data.get("webhook_url") is not None or self.config.get("webhook_url") is not None) ) return webhook_configured def deliver(self, notification: Notification, context: NotificationContext) -> bool: """Deliver the notification via webhook.""" try: # This would send the notification to a webhook endpoint # For now, just log the delivery webhook_url = notification.data.get("webhook_url") or self.config.get("webhook_url") self.telemetry.debug( f"Webhook notification delivered to {webhook_url}: {notification.title}", category=EventCategory.SYSTEM, context={"notification_id": notification.id, "webhook_url": webhook_url} ) return True except Exception as e: self.telemetry.error( f"Failed to deliver webhook notification: {e}", category=EventCategory.SYSTEM, exc_info=True ) return False class ConsoleChannelHandler(ChannelHandler): """Handler for console notifications.""" def can_handle(self, notification: Notification, context: NotificationContext) -> bool: """Check if this handler can handle the notification.""" # Always can handle console notifications return NotificationChannel.CONSOLE in notification.channels def deliver(self, notification: Notification, context: NotificationContext) -> bool: """Deliver the notification to the console.""" try: # Just log to the console print(f"[{notification.category.value.upper()}] {notification.title}: {notification.content}") return True except Exception as e: self.telemetry.error( f"Failed to deliver console notification: {e}", category=EventCategory.SYSTEM, exc_info=True ) return False class NotificationRouter: """Routes notifications to appropriate channels based on context and priority.""" def __init__(self, config: Dict[str, Any] = None): """Initialize the notification router.""" self.config = config or {} self.telemetry = get_telemetry() self.channel_handlers: Dict[NotificationChannel, ChannelHandler] = {} # Register default handlers self.register_default_handlers() def register_default_handlers(self) -> None: """Register default channel handlers.""" self.register_handler(NotificationChannel.UI, UIChannelHandler()) self.register_handler(NotificationChannel.TOAST, ToastChannelHandler()) self.register_handler(NotificationChannel.MODAL, ModalChannelHandler()) self.register_handler(NotificationChannel.EMAIL, EmailChannelHandler()) self.register_handler(NotificationChannel.SMS, SMSChannelHandler()) self.register_handler(NotificationChannel.PUSH, PushChannelHandler()) self.register_handler(NotificationChannel.WEBHOOK, WebhookChannelHandler()) self.register_handler(NotificationChannel.CONSOLE, ConsoleChannelHandler()) def register_handler(self, channel: NotificationChannel, handler: ChannelHandler) -> None: """ Register a channel handler. Args: channel: Notification channel handler: Channel handler """ self.channel_handlers[channel] = handler def get_handler(self, channel: NotificationChannel) -> Optional[ChannelHandler]: """ Get a channel handler. Args: channel: Notification channel Returns: Channel handler or None if not registered """ return self.channel_handlers.get(channel) def route(self, notification: Notification, context: NotificationContext) -> List[NotificationChannel]: """ Route a notification to appropriate channels. Args: notification: Notification to route context: Notification context Returns: List of channels the notification was routed to """ routed_to = [] # Special handling for critical notifications if notification.priority == NotificationPriority.CRITICAL: return self._route_critical(notification, context) # Route based on notification channels for channel in notification.channels: handler = self.get_handler(channel) if handler and handler.can_handle(notification, context): success = handler.deliver(notification, context) if success: routed_to.append(channel) # Update notification state if routed_to: notification.deliver() else: notification.fail() return routed_to def _route_critical(self, notification: Notification, context: NotificationContext) -> List[NotificationChannel]: """ Route a critical notification. Args: notification: Critical notification to route context: Notification context Returns: List of channels the notification was routed to """ routed_to = [] # Try UI and modal first if user is active if context.session_data.get("is_active", False): # Try modal modal_handler = self.get_handler(NotificationChannel.MODAL) if modal_handler and modal_handler.can_handle(notification, context): if modal_handler.deliver(notification, context): routed_to.append(NotificationChannel.MODAL) # Try UI notification ui_handler = self.get_handler(NotificationChannel.UI) if ui_handler and ui_handler.can_handle(notification, context): if ui_handler.deliver(notification, context): routed_to.append(NotificationChannel.UI) # If not delivered via UI, try external channels if not routed_to: # Try push notification push_handler = self.get_handler(NotificationChannel.PUSH) if push_handler and push_handler.can_handle(notification, context): if push_handler.deliver(notification, context): routed_to.append(NotificationChannel.PUSH) # Try SMS for highest priority sms_handler = self.get_handler(NotificationChannel.SMS) if sms_handler and sms_handler.can_handle(notification, context): if sms_handler.deliver(notification, context): routed_to.append(NotificationChannel.SMS) # Try email as last resort email_handler = self.get_handler(NotificationChannel.EMAIL) if email_handler and email_handler.can_handle(notification, context): if email_handler.deliver(notification, context): routed_to.append(NotificationChannel.EMAIL) # Always log to console console_handler = self.get_handler(NotificationChannel.CONSOLE) if console_handler: if console_handler.deliver(notification, context): routed_to.append(NotificationChannel.CONSOLE) # Update notification state if routed_to: notification.deliver() else: notification.fail() return routed_to class NotificationScheduler: """Schedules notifications based on context and priority.""" def __init__(self, config: Dict[str, Any] = None): """Initialize the notification scheduler.""" self.config = config or {} self.telemetry = get_telemetry() # Configure scheduling parameters self.min_interval = self.config.get("min_interval", 5) # Minimum interval between notifications (seconds) self.max_notifications_per_hour = self.config.get("max_notifications_per_hour", 10) self.quiet_hours_start = self.config.get("quiet_hours_start", 22) # 10 PM self.quiet_hours_end = self.config.get("quiet_hours_end", 8) # 8 AM self.throttle_low_priority = self.config.get("throttle_low_priority", True) # Engagement windows by hour (0-23) self.engagement_windows = self.config.get("engagement_windows", { # Default engagement windows (higher numbers mean better engagement) "default": [ 0, 0, 0, 0, 0, 0, 0, 1, # 0-7 AM 2, 3, 3, 2, 2, 2, 2, 2, # 8-15 (8 AM - 3 PM) 3, 3, 2, 2, 1, 1, 0, 0 # 16-23 (4 PM - 11 PM) ] }) def schedule(self, notification: Notification, context: NotificationContext) -> datetime: """ Schedule a notification for delivery. Args: notification: Notification to schedule context: Notification context Returns: Scheduled delivery time """ # Critical notifications are delivered immediately if notification.priority == NotificationPriority.CRITICAL: return datetime.now() # Use user's timezone if available, otherwise UTC user_timezone = context.user_preferences.get("timezone", "UTC") now = datetime.now() # In practice, this would be timezone-aware # Check cognitive load - delay if high if context.cognitive_load > 0.7 and notification.priority != NotificationPriority.HIGH: delay = int(30 + (context.cognitive_load - 0.7) * 100) # 30-60 min delay based on load delivery_time = now + timedelta(minutes=delay) self.telemetry.debug( f"Delaying notification due to high cognitive load: {delay} minutes", category=EventCategory.SYSTEM, context={"notification_id": notification.id, "cognitive_load": context.cognitive_load} ) return delivery_time # Check attention score - delay if low if context.attention_score < 0.3 and notification.priority != NotificationPriority.HIGH: delay = int(15 + (0.3 - context.attention_score) * 45) # 15-30 min delay based on attention delivery_time = now + timedelta(minutes=delay) self.telemetry.debug( f"Delaying notification due to low attention: {delay} minutes", category=EventCategory.SYSTEM, context={"notification_id": notification.id, "attention_score": context.attention_score} ) return delivery_time # Consider quiet hours for non-high priority notifications current_hour = now.hour in_quiet_hours = ( current_hour >= self.quiet_hours_start or current_hour < self.quiet_hours_end ) if in_quiet_hours and notification.priority not in [NotificationPriority.HIGH, NotificationPriority.CRITICAL]: # Schedule for the end of quiet hours if current_hour >= self.quiet_hours_start: # After quiet hours start, schedule for next day's end of quiet hours next_day = now.replace(hour=self.quiet_hours_end, minute=0, second=0) + timedelta(days=1) delivery_time = next_day else: # Before quiet hours end, schedule for today's end of quiet hours delivery_time = now.replace(hour=self.quiet_hours_end, minute=0, second=0) self.telemetry.debug( f"Scheduling notification outside quiet hours: {delivery_time}", category=EventCategory.SYSTEM, context={"notification_id": notification.id} ) return delivery_time # For low priority notifications, find the best engagement window if notification.priority == NotificationPriority.LOW and self.throttle_low_priority: user_type = context.user_preferences.get("user_type", "default") engagement_pattern = self.engagement_windows.get(user_type, self.engagement_windows["default"]) # Find the next good engagement window current_hour = now.hour current_engagement = engagement_pattern[current_hour] if current_engagement < 2: # Not a good engagement window # Find the next hour with better engagement for offset in range(1, 24): next_hour = (current_hour + offset) % 24 if engagement_pattern[next_hour] >= 2: delivery_time = now.replace(hour=next_hour, minute=0, second=0) if delivery_time < now: # If it's tomorrow delivery_time += timedelta(days=1) self.telemetry.debug( f"Scheduling low priority notification for better engagement window: {delivery_time}", category=EventCategory.SYSTEM, context={"notification_id": notification.id} ) return delivery_time # If we're already in a good window or couldn't find one, deliver soon delivery_time = now + timedelta(minutes=random.randint(5, 15)) return delivery_time # For medium priority, add a small delay to prevent overwhelming if notification.priority == NotificationPriority.MEDIUM: delivery_time = now + timedelta(seconds=random.randint(1, self.min_interval)) return delivery_time # High priority notifications are delivered very soon delivery_time = now + timedelta(seconds=random.randint(0, 3)) return delivery_time class NotificationGrouper: """Groups related notifications to prevent overwhelming users.""" def __init__(self, config: Dict[str, Any] = None): """Initialize the notification grouper.""" self.config = config or {} self.telemetry = get_telemetry() # Configure grouping parameters self.max_group_size = self.config.get("max_group_size", 5) self.group_time_window = self.config.get("group_time_window", 300) # 5 minutes self.grouping_enabled = self.config.get("grouping_enabled", True) # Active groups by grouping key self.active_groups: Dict[str, List[Notification]] = {} # Lock for thread safety self.lock = threading.RLock() def should_group(self, notification: Notification, active_notifications: List[Notification]) -> Tuple[bool, Optional[str]]: """ Check if a notification should be grouped. Args: notification: Notification to check active_notifications: Active notifications for the recipient Returns: Tuple of (should_group, group_id) """ if not self.grouping_enabled: return False, None # Critical notifications are never grouped if notification.priority == NotificationPriority.CRITICAL: return False, None # Check if notification has a grouping key if not notification.grouping_key: return False, None # Look for potential groups now = datetime.now() candidates = [] for active in active_notifications: # Only consider groupable notifications if not active.grouping_key or active.grouping_key != notification.grouping_key: continue # Check if notification is within time window if active.created_at < now - timedelta(seconds=self.group_time_window): continue # Check if the group is not full group_id = active.group_id if group_id and len([n for n in active_notifications if n.group_id == group_id]) >= self.max_group_size: continue candidates.append(active) if candidates: # Use the most recent group most_recent = max(candidates, key=lambda n: n.created_at) return True, most_recent.group_id or most_recent.id return False, None def create_summary_notification(self, group_id: str, notifications: List[Notification]) -> Notification: """ Create a summary notification for a group. Args: group_id: Group ID notifications: Notifications in the group Returns: Summary notification """ # Use the first notification as a template template = notifications[0] # Create a summary title and content category_name = template.category.value.capitalize() count = len(notifications) if count <= 1: return template # Create a summary title if all(n.template_id == template.template_id for n in notifications): # All notifications have the same template title = f"{count} {category_name} Notifications" else: # Mixed notification templates title = f"{count} {category_name} Updates" # Create summary content content_items = [] for n in notifications[:5]: # Limit to 5 items in the summary content_items.append(f"• {n.title}") if count > 5: content_items.append(f"• {count - 5} more...") content = "\n".join(content_items) # Create the summary notification summary = Notification( id=str(uuid.uuid4()), template_id=template.template_id, title=title, content=content, category=template.category, priority=min(n.priority for n in notifications), # Use highest priority channels=template.channels, state=NotificationState.PENDING, created_at=datetime.now(), recipient_id=template.recipient_id, recipient_role=template.recipient_role, context_snapshot=template.context_snapshot, icon=template.icon, color=template.color, grouping_key=template.grouping_key, group_id=group_id, requires_acknowledgment=any(n.requires_acknowledgment for n in notifications), metadata={"is_summary": True, "notification_count": count} ) return summary class NotificationTemplateRenderer: """Renders notification templates with dynamic data.""" def __init__(self, config: Dict[str, Any] = None): """Initialize the template renderer.""" self.config = config or {} self.telemetry = get_telemetry() def render(self, template: NotificationTemplate, data: Dict[str, Any], context: NotificationContext) -> Tuple[str, str]: """ Render a notification template. Args: template: Notification template data: Template data context: Notification context Returns: Tuple of (title, content) """ try: # Simple template rendering with format string render_context = { "user": { "id": context.user_id, "role": context.user_role, **context.user_preferences }, "app": context.application_state, "session": context.session_data, "device": context.device_info, **data } # Render the title and content title = self._render_string(template.title_template, render_context) content = self._render_string(template.content_template, render_context) return title, content except Exception as e: self.telemetry.error( f"Failed to render notification template: {e}", category=EventCategory.SYSTEM, exc_info=True ) # Return fallback values return template.title_template, template.content_template def _render_string(self, template_str: str, context: Dict[str, Any]) -> str: """ Render a string template. Args: template_str: Template string context: Rendering context Returns: Rendered string """ # Simple recursive dictionary lookup def lookup(path, data): if "." not in path: return data.get(path, "") parts = path.split(".", 1) if parts[0] not in data: return "" return lookup(parts[1], data[parts[0]]) # Find all placeholders {variable} in the template import re placeholders = re.findall(r"\{([^}]+)\}", template_str) # Replace each placeholder with its value result = template_str for placeholder in placeholders: value = lookup(placeholder, context) result = result.replace(f"{{{placeholder}}}", str(value)) return result class NotificationManager: """Main manager for the notification system.""" def __init__(self, config: Dict[str, Any] = None): """Initialize the notification manager.""" self.config = config or {} self.telemetry = get_telemetry() # Create notification storage self.notifications: Dict[str, Notification] = {} self.templates: Dict[str, NotificationTemplate] = {} self.user_notifications: Dict[str, List[str]] = {} # User ID -> Notification IDs # Create components self.router = NotificationRouter(self.config.get("router", {})) self.scheduler = NotificationScheduler(self.config.get("scheduler", {})) self.grouper = NotificationGrouper(self.config.get("grouper", {})) self.renderer = NotificationTemplateRenderer(self.config.get("renderer", {})) # Create processing queues self.delivery_queue = queue.PriorityQueue() # Thread for processing scheduled notifications self.processing_thread = None self.should_stop = threading.Event() # Lock for thread safety self.lock = threading.RLock() # Start the processing thread self._start_processing_thread() # Register default templates self._register_default_templates() def _start_processing_thread(self) -> None: """Start the background processing thread.""" self.processing_thread = threading.Thread( target=self._processing_worker, daemon=True ) self.processing_thread.start() def _processing_worker(self) -> None: """Background worker to process scheduled notifications.""" while not self.should_stop.is_set(): try: # Try to get a notification from the queue try: # Get the next notification to deliver (priority, scheduled_time, notification_id) _, scheduled_time, notification_id = self.delivery_queue.get(timeout=1.0) except queue.Empty: continue # Check if it's time to deliver now = datetime.now() if scheduled_time > now: # Not time yet, put it back in the queue self.delivery_queue.put((scheduled_time.timestamp(), scheduled_time, notification_id)) time.sleep(0.1) continue # Get the notification with self.lock: notification = self.notifications.get(notification_id) if not notification: continue # Check if notification is still valid if notification.state != NotificationState.SCHEDULED: continue # Check if notification is expired if notification.is_expired(): with self.lock: notification.state = NotificationState.EXPIRED continue # Set state to pending for delivery with self.lock: notification.state = NotificationState.PENDING # Deliver the notification self._deliver_notification(notification) except Exception as e: self.telemetry.error( f"Error in notification processing worker: {e}", category=EventCategory.SYSTEM, exc_info=True ) time.sleep(1.0) def _register_default_templates(self) -> None: """Register default notification templates.""" # System notification template self.register_template(NotificationTemplate( id="system_notification", title_template="System Notification", content_template="{message}", category=NotificationCategory.SYSTEM, default_priority=NotificationPriority.MEDIUM, default_channels=[NotificationChannel.UI, NotificationChannel.CONSOLE], icon="info-circle", color="#3498db" )) # Security alert template self.register_template(NotificationTemplate( id="security_alert", title_template="Security Alert", content_template="{message}", category=NotificationCategory.SECURITY, default_priority=NotificationPriority.HIGH, default_channels=[NotificationChannel.UI, NotificationChannel.EMAIL, NotificationChannel.CONSOLE], icon="shield-alt", color="#e74c3c", requires_acknowledgment=True )) # Detection result template self.register_template(NotificationTemplate( id="detection_result", title_template="Detection Result", content_template="Detection complete: {result}", category=NotificationCategory.DETECTION, default_priority=NotificationPriority.MEDIUM, default_channels=[NotificationChannel.UI, NotificationChannel.CONSOLE], icon="search", color="#2ecc71", grouping_key="detection_results" )) # Process complete template self.register_template(NotificationTemplate( id="process_complete", title_template="Process Complete", content_template="Process {process_name} completed successfully.", category=NotificationCategory.PROCESS, default_priority=NotificationPriority.MEDIUM, default_channels=[NotificationChannel.UI, NotificationChannel.TOAST, NotificationChannel.CONSOLE], icon="check-circle", color="#2ecc71", grouping_key="process_updates" )) # Process failed template self.register_template(NotificationTemplate( id="process_failed", title_template="Process Failed", content_template="Process {process_name} failed: {error}", category=NotificationCategory.PROCESS, default_priority=NotificationPriority.HIGH, default_channels=[NotificationChannel.UI, NotificationChannel.MODAL, NotificationChannel.CONSOLE], icon="exclamation-circle", color="#e74c3c", grouping_key="process_updates" )) def register_template(self, template: NotificationTemplate) -> None: """ Register a notification template. Args: template: Notification template """ with self.lock: self.templates[template.id] = template def unregister_template(self, template_id: str) -> None: """ Unregister a notification template. Args: template_id: Template ID """ with self.lock: if template_id in self.templates: del self.templates[template_id] def get_template(self, template_id: str) -> Optional[NotificationTemplate]: """ Get a notification template. Args: template_id: Template ID Returns: Notification template or None if not found """ with self.lock: return self.templates.get(template_id) def create_notification( self, template_id: str, data: Dict[str, Any], recipient_id: Optional[str] = None, context: Optional[NotificationContext] = None, priority: Optional[NotificationPriority] = None, channels: Optional[List[NotificationChannel]] = None, scheduled_for: Optional[datetime] = None, expires_at: Optional[datetime] = None ) -> Optional[Notification]: """ Create a notification from a template. Args: template_id: Template ID data: Template data recipient_id: Recipient user ID context: Notification context priority: Override default priority channels: Override default channels scheduled_for: Schedule delivery time expires_at: Expiration time Returns: Created notification or None if template not found """ # Get the template template = self.get_template(template_id) if not template: self.telemetry.error( f"Template not found: {template_id}", category=EventCategory.SYSTEM ) return None # Create or use provided context if context is None: context = NotificationContext(user_id=recipient_id) # Render the template title, content = self.renderer.render(template, data, context) # Create the notification notification = Notification( id=str(uuid.uuid4()), template_id=template_id, title=title, content=content, category=template.category, priority=priority or template.default_priority, channels=channels or template.default_channels, state=NotificationState.PENDING, created_at=datetime.now(), scheduled_for=scheduled_for, expires_at=expires_at or (datetime.now() + template.expiration if template.expiration else None), recipient_id=recipient_id, recipient_role=context.user_role, context_snapshot=context.to_dict(), data=data, actions=template.actions, icon=template.icon, color=template.color, grouping_key=template.grouping_key, requires_acknowledgment=template.requires_acknowledgment ) # Store the notification with self.lock: self.notifications[notification.id] = notification if recipient_id: if recipient_id not in self.user_notifications: self.user_notifications[recipient_id] = [] self.user_notifications[recipient_id].append(notification.id) return notification def schedule_notification(self, notification: Notification, scheduled_for: Optional[datetime] = None) -> datetime: """ Schedule a notification for delivery. Args: notification: Notification to schedule scheduled_for: Explicit delivery time or None to auto-schedule Returns: Scheduled delivery time """ with self.lock: # Get delivery time if scheduled_for: delivery_time = scheduled_for else: # Create context object from snapshot context = NotificationContext.from_dict(notification.context_snapshot) # Get scheduled time delivery_time = self.scheduler.schedule(notification, context) # Update notification notification.schedule(delivery_time) # Add to delivery queue self.delivery_queue.put((delivery_time.timestamp(), delivery_time, notification.id)) return delivery_time def send_notification( self, template_id: str, data: Dict[str, Any], recipient_id: Optional[str] = None, context: Optional[NotificationContext] = None, priority: Optional[NotificationPriority] = None, channels: Optional[List[NotificationChannel]] = None, scheduled_for: Optional[datetime] = None, expires_at: Optional[datetime] = None ) -> Optional[str]: """ Create and schedule a notification. Args: template_id: Template ID data: Template data recipient_id: Recipient user ID context: Notification context priority: Override default priority channels: Override default channels scheduled_for: Schedule delivery time expires_at: Expiration time Returns: Notification ID or None if creation failed """ # Create the notification notification = self.create_notification( template_id=template_id, data=data, recipient_id=recipient_id, context=context, priority=priority, channels=channels, scheduled_for=scheduled_for, expires_at=expires_at ) if not notification: return None # Check for grouping with self.lock: if recipient_id and recipient_id in self.user_notifications: # Get active notifications for the recipient active_notifications = [ self.notifications[nid] for nid in self.user_notifications[recipient_id] if nid in self.notifications and self.notifications[nid].state in [NotificationState.PENDING, NotificationState.SCHEDULED, NotificationState.DELIVERED] ] # Check if notification should be grouped should_group, group_id = self.grouper.should_group(notification, active_notifications) if should_group and group_id: # Set group ID notification.group_id = group_id # Log grouping self.telemetry.debug( f"Grouped notification {notification.id} into group {group_id}", category=EventCategory.SYSTEM ) # Schedule the notification self.schedule_notification(notification, scheduled_for) return notification.id def _deliver_notification(self, notification: Notification) -> None: """ Deliver a notification. Args: notification: Notification to deliver """ # Create context object from snapshot context = NotificationContext.from_dict(notification.context_snapshot) # Route the notification channels = self.router.route(notification, context) # Update notification state with self.lock: if channels: notification.state = NotificationState.DELIVERED notification.delivered_at = datetime.now() else: notification.state = NotificationState.FAILED # Log delivery self.telemetry.debug( f"Notification {notification.id} delivered to channels: {', '.join([ch.value for ch in channels])}", category=EventCategory.SYSTEM, context={"notification_id": notification.id} ) def get_notification(self, notification_id: str) -> Optional[Notification]: """ Get a notification by ID. Args: notification_id: Notification ID Returns: Notification or None if not found """ with self.lock: return self.notifications.get(notification_id) def get_user_notifications( self, user_id: str, states: Optional[List[NotificationState]] = None, limit: int = 100, offset: int = 0 ) -> List[Notification]: """ Get notifications for a user. Args: user_id: User ID states: Filter by states limit: Maximum number of notifications to return offset: Offset for pagination Returns: List of notifications """ with self.lock: if user_id not in self.user_notifications: return [] # Get notification IDs for the user notification_ids = self.user_notifications[user_id] # Get notifications notifications = [] for nid in notification_ids: if nid in self.notifications: notification = self.notifications[nid] # Filter by states if states and notification.state not in states: continue notifications.append(notification) # Sort by created_at (newest first) notifications.sort(key=lambda n: n.created_at, reverse=True) # Apply pagination return notifications[offset:offset+limit] def mark_as_read(self, notification_id: str) -> bool: """ Mark a notification as read. Args: notification_id: Notification ID Returns: Whether the operation was successful """ with self.lock: notification = self.notifications.get(notification_id) if not notification: return False notification.mark_as_read() return True def acknowledge(self, notification_id: str, data: Optional[Dict[str, Any]] = None) -> bool: """ Acknowledge a notification. Args: notification_id: Notification ID data: Acknowledgment data Returns: Whether the operation was successful """ with self.lock: notification = self.notifications.get(notification_id) if not notification: return False notification.acknowledge(data) return True def cancel(self, notification_id: str) -> bool: """ Cancel a notification. Args: notification_id: Notification ID Returns: Whether the operation was successful """ with self.lock: notification = self.notifications.get(notification_id) if not notification: return False notification.cancel() return True def delete(self, notification_id: str) -> bool: """ Delete a notification. Args: notification_id: Notification ID Returns: Whether the operation was successful """ with self.lock: notification = self.notifications.get(notification_id) if not notification: return False # Remove from user notifications if notification.recipient_id and notification.recipient_id in self.user_notifications: if notification_id in self.user_notifications[notification.recipient_id]: self.user_notifications[notification.recipient_id].remove(notification_id) # Delete the notification del self.notifications[notification_id] return True def clear_user_notifications(self, user_id: str, states: Optional[List[NotificationState]] = None) -> int: """ Clear notifications for a user. Args: user_id: User ID states: Only clear notifications with these states Returns: Number of notifications cleared """ with self.lock: if user_id not in self.user_notifications: return 0 # Get notification IDs for the user notification_ids = self.user_notifications[user_id].copy() # Clear notifications cleared = 0 for nid in notification_ids: if nid in self.notifications: notification = self.notifications[nid] # Filter by states if states and notification.state not in states: continue # Delete the notification del self.notifications[nid] self.user_notifications[user_id].remove(nid) cleared += 1 return cleared def shutdown(self) -> None: """Shutdown the notification manager.""" # Stop the processing thread self.should_stop.set() if self.processing_thread and self.processing_thread.is_alive(): self.processing_thread.join(timeout=2.0) # Log shutdown self.telemetry.info( "Notification manager shutdown", category=EventCategory.SYSTEM ) # Singleton instance _instance = None def get_notification_manager(config: Dict[str, Any] = None) -> NotificationManager: """ Get the global notification manager instance. Args: config: Configuration options Returns: NotificationManager instance """ global _instance if _instance is None: _instance = NotificationManager(config) return _instance # Register UI callback def register_ui_callback(user_id: str, callback: Callable[[Dict[str, Any]], None]) -> None: """ Register a UI callback for notifications. Args: user_id: User ID or "default" for global callback callback: Function to call with notification data """ manager = get_notification_manager() ui_handler = manager.router.get_handler(NotificationChannel.UI) if isinstance(ui_handler, UIChannelHandler): ui_handler.register_callback(user_id, callback) # Helper functions for sending notifications def send_notification( template_id: str, data: Dict[str, Any], recipient_id: Optional[str] = None, priority: Optional[NotificationPriority] = None, channels: Optional[List[NotificationChannel]] = None ) -> Optional[str]: """ Send a notification. Args: template_id: Template ID data: Template data recipient_id: Recipient user ID priority: Override default priority channels: Override default channels Returns: Notification ID or None if sending failed """ manager = get_notification_manager() return manager.send_notification( template_id=template_id, data=data, recipient_id=recipient_id, priority=priority, channels=channels ) def send_system_notification( message: str, recipient_id: Optional[str] = None, priority: NotificationPriority = NotificationPriority.MEDIUM ) -> Optional[str]: """ Send a system notification. Args: message: Notification message recipient_id: Recipient user ID priority: Notification priority Returns: Notification ID or None if sending failed """ return send_notification( template_id="system_notification", data={"message": message}, recipient_id=recipient_id, priority=priority ) def send_security_alert( message: str, recipient_id: Optional[str] = None, priority: NotificationPriority = NotificationPriority.HIGH ) -> Optional[str]: """ Send a security alert. Args: message: Alert message recipient_id: Recipient user ID priority: Notification priority Returns: Notification ID or None if sending failed """ return send_notification( template_id="security_alert", data={"message": message}, recipient_id=recipient_id, priority=priority ) def send_detection_result( result: str, recipient_id: Optional[str] = None, priority: NotificationPriority = NotificationPriority.MEDIUM ) -> Optional[str]: """ Send a detection result notification. Args: result: Detection result recipient_id: Recipient user ID priority: Notification priority Returns: Notification ID or None if sending failed """ return send_notification( template_id="detection_result", data={"result": result}, recipient_id=recipient_id, priority=priority ) def send_process_notification( process_name: str, success: bool, error: Optional[str] = None, recipient_id: Optional[str] = None ) -> Optional[str]: """ Send a process notification. Args: process_name: Process name success: Whether the process was successful error: Error message if failed recipient_id: Recipient user ID Returns: Notification ID or None if sending failed """ if success: return send_notification( template_id="process_complete", data={"process_name": process_name}, recipient_id=recipient_id ) else: return send_notification( template_id="process_failed", data={"process_name": process_name, "error": error or "Unknown error"}, recipient_id=recipient_id, priority=NotificationPriority.HIGH ) class NotificationMonitor: """Utility class for monitoring notification metrics and performance.""" def __init__(self, notification_manager: NotificationManager): """ Initialize the notification monitor. Args: notification_manager: Notification manager to monitor """ self.notification_manager = notification_manager self.telemetry = get_telemetry() # Schedule periodic metrics collection self.metrics_thread = threading.Thread( target=self._collect_metrics_periodically, daemon=True ) self.metrics_thread.start() def _collect_metrics_periodically(self) -> None: """Periodically collect notification metrics.""" while True: try: # Collect metrics metrics = self.collect_metrics() # Record metrics in telemetry self.telemetry.debug( "Notification system metrics collected", category=EventCategory.SYSTEM, context={"metrics": metrics} ) # Wait for next collection time.sleep(60.0) # Collect every minute except Exception as e: self.telemetry.error( f"Error collecting notification metrics: {e}", category=EventCategory.SYSTEM, exc_info=True ) time.sleep(60.0) def collect_metrics(self) -> Dict[str, Any]: """ Collect notification metrics. Returns: Dictionary of metrics """ with self.notification_manager.lock: # Count notifications by state states = {state: 0 for state in NotificationState} for notification in self.notification_manager.notifications.values(): states[notification.state] += 1 # Count notifications by priority priorities = {priority: 0 for priority in NotificationPriority} for notification in self.notification_manager.notifications.values(): priorities[notification.priority] += 1 # Count notifications by category categories = {category: 0 for category in NotificationCategory} for notification in self.notification_manager.notifications.values(): categories[notification.category] += 1 # Calculate delivery metrics total_notifications = len(self.notification_manager.notifications) delivered_count = states[NotificationState.DELIVERED] failed_count = states[NotificationState.FAILED] delivery_rate = 0.0 if total_notifications > 0: delivery_rate = delivered_count / total_notifications # Calculate average delivery time delivery_times = [] for notification in self.notification_manager.notifications.values(): if notification.delivered_at and notification.created_at: delivery_time = (notification.delivered_at - notification.created_at).total_seconds() delivery_times.append(delivery_time) avg_delivery_time = 0.0 if delivery_times: avg_delivery_time = sum(delivery_times) / len(delivery_times) # Count active users active_users = len(self.notification_manager.user_notifications) return { "total_notifications": total_notifications, "states": {state.value: count for state, count in states.items()}, "priorities": {priority.value: count for priority, count in priorities.items()}, "categories": {category.value: count for category, count in categories.items()}, "delivery_rate": delivery_rate, "avg_delivery_time": avg_delivery_time, "active_users": active_users, "queue_size": self.notification_manager.delivery_queue.qsize() } def get_notification_stats(self, user_id: Optional[str] = None) -> Dict[str, Any]: """ Get notification statistics. Args: user_id: Optional user ID to filter by Returns: Dictionary of notification statistics """ with self.notification_manager.lock: if user_id: # User-specific stats if user_id not in self.notification_manager.user_notifications: return { "total": 0, "unread": 0, "states": {state.value: 0 for state in NotificationState}, "categories": {category.value: 0 for category in NotificationCategory} } # Get notification IDs for the user notification_ids = self.notification_manager.user_notifications[user_id] # Count states and categories states = {state: 0 for state in NotificationState} categories = {category: 0 for category in NotificationCategory} unread = 0 for nid in notification_ids: if nid in self.notification_manager.notifications: notification = self.notification_manager.notifications[nid] states[notification.state] += 1 categories[notification.category] += 1 if notification.state != NotificationState.READ: unread += 1 return { "total": len(notification_ids), "unread": unread, "states": {state.value: count for state, count in states.items()}, "categories": {category.value: count for category, count in categories.items()} } else: # System-wide stats states = {state: 0 for state in NotificationState} categories = {category: 0 for category in NotificationCategory} unread = 0 for notification in self.notification_manager.notifications.values(): states[notification.state] += 1 categories[notification.category] += 1 if notification.state != NotificationState.READ: unread += 1 return { "total": len(self.notification_manager.notifications), "unread": unread, "states": {state.value: count for state, count in states.items()}, "categories": {category.value: count for category, count in categories.items()}, "active_users": len(self.notification_manager.user_notifications) } def initialize_notification_monitor() -> NotificationMonitor: """ Initialize the notification monitor. Returns: NotificationMonitor instance """ manager = get_notification_manager() return NotificationMonitor(manager)