MorphGuard / src /notification_system.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
79.4 kB
"""
Context-aware notification system with priority management for MorphGuard.
This module provides a sophisticated notification system that intelligently:
- Manages notification priorities based on content and context
- Adapts delivery channels to user context and preferences
- Groups related notifications to prevent overwhelming users
- Schedules notifications based on user engagement patterns
- Provides cognitive load analysis for optimal notification timing
- Implements neural attention models for content optimization
- Uses reinforcement learning for notification timing and channel selection
"""
import time
import json
import uuid
import logging
import threading
import queue
import random
import math
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional, Union, Callable, Set, Tuple
from datetime import datetime, timedelta
# Import from existing modules
from src.telemetry import get_telemetry, EventCategory
from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory
# Define notification priorities
class NotificationPriority(Enum):
"""Priority levels for notifications."""
CRITICAL = 0 # Immediate attention required (e.g., security incidents)
HIGH = 1 # Important notifications (e.g., authentication required)
MEDIUM = 2 # Standard notifications (e.g., process completed)
LOW = 3 # Non-urgent notifications (e.g., minor updates)
BACKGROUND = 4 # Informational notifications (e.g., usage statistics)
# Define notification channels
class NotificationChannel(Enum):
"""Delivery channels for notifications."""
UI = "ui" # In-app UI notifications
TOAST = "toast" # Toast notifications
MODAL = "modal" # Modal dialogs
EMAIL = "email" # Email notifications
SMS = "sms" # SMS notifications
PUSH = "push" # Push notifications
WEBHOOK = "webhook" # Webhook callbacks
CONSOLE = "console" # Console logs (for developers)
# Define notification states
class NotificationState(Enum):
"""States of a notification in its lifecycle."""
PENDING = "pending" # Waiting to be delivered
SCHEDULED = "scheduled" # Scheduled for future delivery
DELIVERING = "delivering" # In the process of being delivered
DELIVERED = "delivered" # Successfully delivered
READ = "read" # Read/acknowledged by the recipient
EXPIRED = "expired" # Expired before being read
FAILED = "failed" # Failed to deliver
CANCELLED = "cancelled" # Cancelled before delivery
# Define notification categories
class NotificationCategory(Enum):
"""Categories of notifications."""
SECURITY = "security" # Security-related notifications
SYSTEM = "system" # System notifications
USER = "user" # User-related notifications
PROCESS = "process" # Process notifications
ANALYTICS = "analytics" # Analytics and metrics notifications
DETECTION = "detection" # Detection results
VERIFICATION = "verification" # Verification results
TRAINING = "training" # Training process notifications
MAINTENANCE = "maintenance" # System maintenance notifications
BILLING = "billing" # Billing and payment notifications
@dataclass
class NotificationContext:
"""Context information for a notification."""
user_id: Optional[str] = None
user_role: Optional[str] = None
user_preferences: Dict[str, Any] = field(default_factory=dict)
application_state: Dict[str, Any] = field(default_factory=dict)
device_info: Dict[str, Any] = field(default_factory=dict)
location: Optional[Dict[str, Any]] = None
session_data: Dict[str, Any] = field(default_factory=dict)
interaction_history: List[Dict[str, Any]] = field(default_factory=list)
cognitive_load: float = 0.0 # Estimated cognitive load (0.0-1.0)
attention_score: float = 1.0 # Estimated user attention (0.0-1.0)
engagement_level: float = 1.0 # Current engagement level (0.0-1.0)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"user_id": self.user_id,
"user_role": self.user_role,
"user_preferences": self.user_preferences,
"application_state": self.application_state,
"device_info": self.device_info,
"location": self.location,
"session_data": self.session_data,
"cognitive_load": self.cognitive_load,
"attention_score": self.attention_score,
"engagement_level": self.engagement_level
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'NotificationContext':
"""Create from dictionary."""
return cls(
user_id=data.get("user_id"),
user_role=data.get("user_role"),
user_preferences=data.get("user_preferences", {}),
application_state=data.get("application_state", {}),
device_info=data.get("device_info", {}),
location=data.get("location"),
session_data=data.get("session_data", {}),
interaction_history=data.get("interaction_history", []),
cognitive_load=data.get("cognitive_load", 0.0),
attention_score=data.get("attention_score", 1.0),
engagement_level=data.get("engagement_level", 1.0)
)
@dataclass
class NotificationTemplate:
"""Template for creating notifications."""
id: str
title_template: str
content_template: str
category: NotificationCategory
default_priority: NotificationPriority
default_channels: List[NotificationChannel]
icon: Optional[str] = None
color: Optional[str] = None
actions: List[Dict[str, Any]] = field(default_factory=list)
expiration: Optional[timedelta] = None
grouping_key: Optional[str] = None
requires_acknowledgment: bool = False
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"id": self.id,
"title_template": self.title_template,
"content_template": self.content_template,
"category": self.category.value,
"default_priority": self.default_priority.value,
"default_channels": [ch.value for ch in self.default_channels],
"icon": self.icon,
"color": self.color,
"actions": self.actions,
"expiration": str(self.expiration) if self.expiration else None,
"grouping_key": self.grouping_key,
"requires_acknowledgment": self.requires_acknowledgment
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'NotificationTemplate':
"""Create from dictionary."""
return cls(
id=data["id"],
title_template=data["title_template"],
content_template=data["content_template"],
category=NotificationCategory(data["category"]),
default_priority=NotificationPriority(data["default_priority"]),
default_channels=[NotificationChannel(ch) for ch in data["default_channels"]],
icon=data.get("icon"),
color=data.get("color"),
actions=data.get("actions", []),
expiration=timedelta(seconds=data["expiration_seconds"]) if "expiration_seconds" in data else None,
grouping_key=data.get("grouping_key"),
requires_acknowledgment=data.get("requires_acknowledgment", False)
)
@dataclass
class Notification:
"""A notification instance."""
id: str
template_id: str
title: str
content: str
category: NotificationCategory
priority: NotificationPriority
channels: List[NotificationChannel]
state: NotificationState
created_at: datetime
scheduled_for: Optional[datetime] = None
delivered_at: Optional[datetime] = None
read_at: Optional[datetime] = None
expires_at: Optional[datetime] = None
recipient_id: Optional[str] = None
recipient_role: Optional[str] = None
context_snapshot: Dict[str, Any] = field(default_factory=dict)
data: Dict[str, Any] = field(default_factory=dict)
actions: List[Dict[str, Any]] = field(default_factory=list)
icon: Optional[str] = None
color: Optional[str] = None
grouping_key: Optional[str] = None
group_id: Optional[str] = None
requires_acknowledgment: bool = False
acknowledgment_data: Optional[Dict[str, Any]] = None
delivery_attempts: int = 0
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"id": self.id,
"template_id": self.template_id,
"title": self.title,
"content": self.content,
"category": self.category.value,
"priority": self.priority.value,
"channels": [ch.value for ch in self.channels],
"state": self.state.value,
"created_at": self.created_at.isoformat(),
"scheduled_for": self.scheduled_for.isoformat() if self.scheduled_for else None,
"delivered_at": self.delivered_at.isoformat() if self.delivered_at else None,
"read_at": self.read_at.isoformat() if self.read_at else None,
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"recipient_id": self.recipient_id,
"recipient_role": self.recipient_role,
"context_snapshot": self.context_snapshot,
"data": self.data,
"actions": self.actions,
"icon": self.icon,
"color": self.color,
"grouping_key": self.grouping_key,
"group_id": self.group_id,
"requires_acknowledgment": self.requires_acknowledgment,
"acknowledgment_data": self.acknowledgment_data,
"delivery_attempts": self.delivery_attempts,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Notification':
"""Create from dictionary."""
created_at = datetime.fromisoformat(data["created_at"])
scheduled_for = datetime.fromisoformat(data["scheduled_for"]) if data.get("scheduled_for") else None
delivered_at = datetime.fromisoformat(data["delivered_at"]) if data.get("delivered_at") else None
read_at = datetime.fromisoformat(data["read_at"]) if data.get("read_at") else None
expires_at = datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None
return cls(
id=data["id"],
template_id=data["template_id"],
title=data["title"],
content=data["content"],
category=NotificationCategory(data["category"]),
priority=NotificationPriority(data["priority"]),
channels=[NotificationChannel(ch) for ch in data["channels"]],
state=NotificationState(data["state"]),
created_at=created_at,
scheduled_for=scheduled_for,
delivered_at=delivered_at,
read_at=read_at,
expires_at=expires_at,
recipient_id=data.get("recipient_id"),
recipient_role=data.get("recipient_role"),
context_snapshot=data.get("context_snapshot", {}),
data=data.get("data", {}),
actions=data.get("actions", []),
icon=data.get("icon"),
color=data.get("color"),
grouping_key=data.get("grouping_key"),
group_id=data.get("group_id"),
requires_acknowledgment=data.get("requires_acknowledgment", False),
acknowledgment_data=data.get("acknowledgment_data"),
delivery_attempts=data.get("delivery_attempts", 0),
metadata=data.get("metadata", {})
)
def is_expired(self) -> bool:
"""Check if the notification is expired."""
return self.expires_at is not None and datetime.now() > self.expires_at
def is_acknowledged(self) -> bool:
"""Check if the notification is acknowledged."""
return self.state == NotificationState.READ or self.acknowledgment_data is not None
def mark_as_read(self) -> None:
"""Mark the notification as read."""
self.state = NotificationState.READ
self.read_at = datetime.now()
def acknowledge(self, data: Optional[Dict[str, Any]] = None) -> None:
"""Acknowledge the notification."""
self.state = NotificationState.READ
self.read_at = datetime.now()
self.acknowledgment_data = data or {"acknowledged_at": datetime.now().isoformat()}
def schedule(self, delivery_time: datetime) -> None:
"""Schedule the notification for delivery."""
self.state = NotificationState.SCHEDULED
self.scheduled_for = delivery_time
def cancel(self) -> None:
"""Cancel the notification."""
self.state = NotificationState.CANCELLED
def fail(self) -> None:
"""Mark the notification as failed."""
self.state = NotificationState.FAILED
def deliver(self) -> None:
"""Mark the notification as delivered."""
self.state = NotificationState.DELIVERED
self.delivered_at = datetime.now()
self.delivery_attempts += 1
class ChannelHandler:
"""Base class for notification channel handlers."""
def __init__(self, config: Dict[str, Any] = None):
"""Initialize the channel handler."""
self.config = config or {}
self.telemetry = get_telemetry()
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
return True
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""
Deliver the notification.
Args:
notification: The notification to deliver
context: The notification context
Returns:
Whether the delivery was successful
"""
raise NotImplementedError("Subclasses must implement deliver method")
class UIChannelHandler(ChannelHandler):
"""Handler for UI notifications."""
def __init__(self, config: Dict[str, Any] = None):
"""Initialize the UI channel handler."""
super().__init__(config)
self.ui_callbacks: Dict[str, Callable] = {}
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
# Can handle if user is currently active in the application
return (
NotificationChannel.UI in notification.channels and
context.session_data.get("is_active", False)
)
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""Deliver the notification to the UI."""
try:
# Get the UI callback for the recipient
callback = self.ui_callbacks.get(context.user_id or "default")
if callback:
# Call the callback with the notification
callback(notification.to_dict())
return True
# Store notification for later delivery
# This would typically be handled by a persistent store
self.telemetry.debug(
f"No UI callback registered for user {context.user_id}",
category=EventCategory.SYSTEM
)
return False
except Exception as e:
self.telemetry.error(
f"Failed to deliver UI notification: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
return False
def register_callback(self, user_id: str, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Register a UI callback for a user.
Args:
user_id: User ID or "default" for global callback
callback: Function to call with notification data
"""
self.ui_callbacks[user_id] = callback
def unregister_callback(self, user_id: str) -> None:
"""
Unregister a UI callback.
Args:
user_id: User ID to unregister
"""
if user_id in self.ui_callbacks:
del self.ui_callbacks[user_id]
class ToastChannelHandler(ChannelHandler):
"""Handler for toast notifications."""
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
# Can handle if toast channel is specified and user is active
return (
NotificationChannel.TOAST in notification.channels and
context.session_data.get("is_active", False)
)
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""Deliver the notification as a toast."""
try:
# This would integrate with the frontend toast system
# For now, just log the delivery
self.telemetry.debug(
f"Toast notification delivered: {notification.title}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id}
)
return True
except Exception as e:
self.telemetry.error(
f"Failed to deliver toast notification: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
return False
class ModalChannelHandler(ChannelHandler):
"""Handler for modal dialog notifications."""
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
# Can handle if modal channel is specified and user is active
# Critical notifications can always show modals
return (
NotificationChannel.MODAL in notification.channels and
(context.session_data.get("is_active", False) or
notification.priority == NotificationPriority.CRITICAL)
)
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""Deliver the notification as a modal dialog."""
try:
# This would integrate with the frontend modal system
# For now, just log the delivery
self.telemetry.debug(
f"Modal notification delivered: {notification.title}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id}
)
return True
except Exception as e:
self.telemetry.error(
f"Failed to deliver modal notification: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
return False
class EmailChannelHandler(ChannelHandler):
"""Handler for email notifications."""
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
# Can handle if email channel is specified and user has email
email_available = (
context.user_id is not None and
context.user_preferences.get("email") is not None
)
return NotificationChannel.EMAIL in notification.channels and email_available
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""Deliver the notification via email."""
try:
# This would integrate with an email service
# For now, just log the delivery
recipient_email = context.user_preferences.get("email")
self.telemetry.debug(
f"Email notification delivered to {recipient_email}: {notification.title}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id, "recipient": recipient_email}
)
return True
except Exception as e:
self.telemetry.error(
f"Failed to deliver email notification: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
return False
class SMSChannelHandler(ChannelHandler):
"""Handler for SMS notifications."""
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
# Can handle if SMS channel is specified and user has phone number
sms_available = (
context.user_id is not None and
context.user_preferences.get("phone") is not None
)
return NotificationChannel.SMS in notification.channels and sms_available
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""Deliver the notification via SMS."""
try:
# This would integrate with an SMS service
# For now, just log the delivery
recipient_phone = context.user_preferences.get("phone")
self.telemetry.debug(
f"SMS notification delivered to {recipient_phone}: {notification.title}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id, "recipient": recipient_phone}
)
return True
except Exception as e:
self.telemetry.error(
f"Failed to deliver SMS notification: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
return False
class PushChannelHandler(ChannelHandler):
"""Handler for push notifications."""
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
# Can handle if push channel is specified and user has push token
push_available = (
context.user_id is not None and
context.user_preferences.get("push_token") is not None
)
return NotificationChannel.PUSH in notification.channels and push_available
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""Deliver the notification via push notification."""
try:
# This would integrate with a push notification service
# For now, just log the delivery
push_token = context.user_preferences.get("push_token")
self.telemetry.debug(
f"Push notification delivered: {notification.title}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id, "token": push_token}
)
return True
except Exception as e:
self.telemetry.error(
f"Failed to deliver push notification: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
return False
class WebhookChannelHandler(ChannelHandler):
"""Handler for webhook notifications."""
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
# Can handle if webhook channel is specified and webhook URL is configured
webhook_configured = (
NotificationChannel.WEBHOOK in notification.channels and
(notification.data.get("webhook_url") is not None or
self.config.get("webhook_url") is not None)
)
return webhook_configured
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""Deliver the notification via webhook."""
try:
# This would send the notification to a webhook endpoint
# For now, just log the delivery
webhook_url = notification.data.get("webhook_url") or self.config.get("webhook_url")
self.telemetry.debug(
f"Webhook notification delivered to {webhook_url}: {notification.title}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id, "webhook_url": webhook_url}
)
return True
except Exception as e:
self.telemetry.error(
f"Failed to deliver webhook notification: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
return False
class ConsoleChannelHandler(ChannelHandler):
"""Handler for console notifications."""
def can_handle(self, notification: Notification, context: NotificationContext) -> bool:
"""Check if this handler can handle the notification."""
# Always can handle console notifications
return NotificationChannel.CONSOLE in notification.channels
def deliver(self, notification: Notification, context: NotificationContext) -> bool:
"""Deliver the notification to the console."""
try:
# Just log to the console
print(f"[{notification.category.value.upper()}] {notification.title}: {notification.content}")
return True
except Exception as e:
self.telemetry.error(
f"Failed to deliver console notification: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
return False
class NotificationRouter:
"""Routes notifications to appropriate channels based on context and priority."""
def __init__(self, config: Dict[str, Any] = None):
"""Initialize the notification router."""
self.config = config or {}
self.telemetry = get_telemetry()
self.channel_handlers: Dict[NotificationChannel, ChannelHandler] = {}
# Register default handlers
self.register_default_handlers()
def register_default_handlers(self) -> None:
"""Register default channel handlers."""
self.register_handler(NotificationChannel.UI, UIChannelHandler())
self.register_handler(NotificationChannel.TOAST, ToastChannelHandler())
self.register_handler(NotificationChannel.MODAL, ModalChannelHandler())
self.register_handler(NotificationChannel.EMAIL, EmailChannelHandler())
self.register_handler(NotificationChannel.SMS, SMSChannelHandler())
self.register_handler(NotificationChannel.PUSH, PushChannelHandler())
self.register_handler(NotificationChannel.WEBHOOK, WebhookChannelHandler())
self.register_handler(NotificationChannel.CONSOLE, ConsoleChannelHandler())
def register_handler(self, channel: NotificationChannel, handler: ChannelHandler) -> None:
"""
Register a channel handler.
Args:
channel: Notification channel
handler: Channel handler
"""
self.channel_handlers[channel] = handler
def get_handler(self, channel: NotificationChannel) -> Optional[ChannelHandler]:
"""
Get a channel handler.
Args:
channel: Notification channel
Returns:
Channel handler or None if not registered
"""
return self.channel_handlers.get(channel)
def route(self, notification: Notification, context: NotificationContext) -> List[NotificationChannel]:
"""
Route a notification to appropriate channels.
Args:
notification: Notification to route
context: Notification context
Returns:
List of channels the notification was routed to
"""
routed_to = []
# Special handling for critical notifications
if notification.priority == NotificationPriority.CRITICAL:
return self._route_critical(notification, context)
# Route based on notification channels
for channel in notification.channels:
handler = self.get_handler(channel)
if handler and handler.can_handle(notification, context):
success = handler.deliver(notification, context)
if success:
routed_to.append(channel)
# Update notification state
if routed_to:
notification.deliver()
else:
notification.fail()
return routed_to
def _route_critical(self, notification: Notification, context: NotificationContext) -> List[NotificationChannel]:
"""
Route a critical notification.
Args:
notification: Critical notification to route
context: Notification context
Returns:
List of channels the notification was routed to
"""
routed_to = []
# Try UI and modal first if user is active
if context.session_data.get("is_active", False):
# Try modal
modal_handler = self.get_handler(NotificationChannel.MODAL)
if modal_handler and modal_handler.can_handle(notification, context):
if modal_handler.deliver(notification, context):
routed_to.append(NotificationChannel.MODAL)
# Try UI notification
ui_handler = self.get_handler(NotificationChannel.UI)
if ui_handler and ui_handler.can_handle(notification, context):
if ui_handler.deliver(notification, context):
routed_to.append(NotificationChannel.UI)
# If not delivered via UI, try external channels
if not routed_to:
# Try push notification
push_handler = self.get_handler(NotificationChannel.PUSH)
if push_handler and push_handler.can_handle(notification, context):
if push_handler.deliver(notification, context):
routed_to.append(NotificationChannel.PUSH)
# Try SMS for highest priority
sms_handler = self.get_handler(NotificationChannel.SMS)
if sms_handler and sms_handler.can_handle(notification, context):
if sms_handler.deliver(notification, context):
routed_to.append(NotificationChannel.SMS)
# Try email as last resort
email_handler = self.get_handler(NotificationChannel.EMAIL)
if email_handler and email_handler.can_handle(notification, context):
if email_handler.deliver(notification, context):
routed_to.append(NotificationChannel.EMAIL)
# Always log to console
console_handler = self.get_handler(NotificationChannel.CONSOLE)
if console_handler:
if console_handler.deliver(notification, context):
routed_to.append(NotificationChannel.CONSOLE)
# Update notification state
if routed_to:
notification.deliver()
else:
notification.fail()
return routed_to
class NotificationScheduler:
"""Schedules notifications based on context and priority."""
def __init__(self, config: Dict[str, Any] = None):
"""Initialize the notification scheduler."""
self.config = config or {}
self.telemetry = get_telemetry()
# Configure scheduling parameters
self.min_interval = self.config.get("min_interval", 5) # Minimum interval between notifications (seconds)
self.max_notifications_per_hour = self.config.get("max_notifications_per_hour", 10)
self.quiet_hours_start = self.config.get("quiet_hours_start", 22) # 10 PM
self.quiet_hours_end = self.config.get("quiet_hours_end", 8) # 8 AM
self.throttle_low_priority = self.config.get("throttle_low_priority", True)
# Engagement windows by hour (0-23)
self.engagement_windows = self.config.get("engagement_windows", {
# Default engagement windows (higher numbers mean better engagement)
"default": [
0, 0, 0, 0, 0, 0, 0, 1, # 0-7 AM
2, 3, 3, 2, 2, 2, 2, 2, # 8-15 (8 AM - 3 PM)
3, 3, 2, 2, 1, 1, 0, 0 # 16-23 (4 PM - 11 PM)
]
})
def schedule(self, notification: Notification, context: NotificationContext) -> datetime:
"""
Schedule a notification for delivery.
Args:
notification: Notification to schedule
context: Notification context
Returns:
Scheduled delivery time
"""
# Critical notifications are delivered immediately
if notification.priority == NotificationPriority.CRITICAL:
return datetime.now()
# Use user's timezone if available, otherwise UTC
user_timezone = context.user_preferences.get("timezone", "UTC")
now = datetime.now() # In practice, this would be timezone-aware
# Check cognitive load - delay if high
if context.cognitive_load > 0.7 and notification.priority != NotificationPriority.HIGH:
delay = int(30 + (context.cognitive_load - 0.7) * 100) # 30-60 min delay based on load
delivery_time = now + timedelta(minutes=delay)
self.telemetry.debug(
f"Delaying notification due to high cognitive load: {delay} minutes",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id, "cognitive_load": context.cognitive_load}
)
return delivery_time
# Check attention score - delay if low
if context.attention_score < 0.3 and notification.priority != NotificationPriority.HIGH:
delay = int(15 + (0.3 - context.attention_score) * 45) # 15-30 min delay based on attention
delivery_time = now + timedelta(minutes=delay)
self.telemetry.debug(
f"Delaying notification due to low attention: {delay} minutes",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id, "attention_score": context.attention_score}
)
return delivery_time
# Consider quiet hours for non-high priority notifications
current_hour = now.hour
in_quiet_hours = (
current_hour >= self.quiet_hours_start or
current_hour < self.quiet_hours_end
)
if in_quiet_hours and notification.priority not in [NotificationPriority.HIGH, NotificationPriority.CRITICAL]:
# Schedule for the end of quiet hours
if current_hour >= self.quiet_hours_start:
# After quiet hours start, schedule for next day's end of quiet hours
next_day = now.replace(hour=self.quiet_hours_end, minute=0, second=0) + timedelta(days=1)
delivery_time = next_day
else:
# Before quiet hours end, schedule for today's end of quiet hours
delivery_time = now.replace(hour=self.quiet_hours_end, minute=0, second=0)
self.telemetry.debug(
f"Scheduling notification outside quiet hours: {delivery_time}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id}
)
return delivery_time
# For low priority notifications, find the best engagement window
if notification.priority == NotificationPriority.LOW and self.throttle_low_priority:
user_type = context.user_preferences.get("user_type", "default")
engagement_pattern = self.engagement_windows.get(user_type, self.engagement_windows["default"])
# Find the next good engagement window
current_hour = now.hour
current_engagement = engagement_pattern[current_hour]
if current_engagement < 2: # Not a good engagement window
# Find the next hour with better engagement
for offset in range(1, 24):
next_hour = (current_hour + offset) % 24
if engagement_pattern[next_hour] >= 2:
delivery_time = now.replace(hour=next_hour, minute=0, second=0)
if delivery_time < now: # If it's tomorrow
delivery_time += timedelta(days=1)
self.telemetry.debug(
f"Scheduling low priority notification for better engagement window: {delivery_time}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id}
)
return delivery_time
# If we're already in a good window or couldn't find one, deliver soon
delivery_time = now + timedelta(minutes=random.randint(5, 15))
return delivery_time
# For medium priority, add a small delay to prevent overwhelming
if notification.priority == NotificationPriority.MEDIUM:
delivery_time = now + timedelta(seconds=random.randint(1, self.min_interval))
return delivery_time
# High priority notifications are delivered very soon
delivery_time = now + timedelta(seconds=random.randint(0, 3))
return delivery_time
class NotificationGrouper:
"""Groups related notifications to prevent overwhelming users."""
def __init__(self, config: Dict[str, Any] = None):
"""Initialize the notification grouper."""
self.config = config or {}
self.telemetry = get_telemetry()
# Configure grouping parameters
self.max_group_size = self.config.get("max_group_size", 5)
self.group_time_window = self.config.get("group_time_window", 300) # 5 minutes
self.grouping_enabled = self.config.get("grouping_enabled", True)
# Active groups by grouping key
self.active_groups: Dict[str, List[Notification]] = {}
# Lock for thread safety
self.lock = threading.RLock()
def should_group(self, notification: Notification, active_notifications: List[Notification]) -> Tuple[bool, Optional[str]]:
"""
Check if a notification should be grouped.
Args:
notification: Notification to check
active_notifications: Active notifications for the recipient
Returns:
Tuple of (should_group, group_id)
"""
if not self.grouping_enabled:
return False, None
# Critical notifications are never grouped
if notification.priority == NotificationPriority.CRITICAL:
return False, None
# Check if notification has a grouping key
if not notification.grouping_key:
return False, None
# Look for potential groups
now = datetime.now()
candidates = []
for active in active_notifications:
# Only consider groupable notifications
if not active.grouping_key or active.grouping_key != notification.grouping_key:
continue
# Check if notification is within time window
if active.created_at < now - timedelta(seconds=self.group_time_window):
continue
# Check if the group is not full
group_id = active.group_id
if group_id and len([n for n in active_notifications if n.group_id == group_id]) >= self.max_group_size:
continue
candidates.append(active)
if candidates:
# Use the most recent group
most_recent = max(candidates, key=lambda n: n.created_at)
return True, most_recent.group_id or most_recent.id
return False, None
def create_summary_notification(self, group_id: str, notifications: List[Notification]) -> Notification:
"""
Create a summary notification for a group.
Args:
group_id: Group ID
notifications: Notifications in the group
Returns:
Summary notification
"""
# Use the first notification as a template
template = notifications[0]
# Create a summary title and content
category_name = template.category.value.capitalize()
count = len(notifications)
if count <= 1:
return template
# Create a summary title
if all(n.template_id == template.template_id for n in notifications):
# All notifications have the same template
title = f"{count} {category_name} Notifications"
else:
# Mixed notification templates
title = f"{count} {category_name} Updates"
# Create summary content
content_items = []
for n in notifications[:5]: # Limit to 5 items in the summary
content_items.append(f"• {n.title}")
if count > 5:
content_items.append(f"• {count - 5} more...")
content = "\n".join(content_items)
# Create the summary notification
summary = Notification(
id=str(uuid.uuid4()),
template_id=template.template_id,
title=title,
content=content,
category=template.category,
priority=min(n.priority for n in notifications), # Use highest priority
channels=template.channels,
state=NotificationState.PENDING,
created_at=datetime.now(),
recipient_id=template.recipient_id,
recipient_role=template.recipient_role,
context_snapshot=template.context_snapshot,
icon=template.icon,
color=template.color,
grouping_key=template.grouping_key,
group_id=group_id,
requires_acknowledgment=any(n.requires_acknowledgment for n in notifications),
metadata={"is_summary": True, "notification_count": count}
)
return summary
class NotificationTemplateRenderer:
"""Renders notification templates with dynamic data."""
def __init__(self, config: Dict[str, Any] = None):
"""Initialize the template renderer."""
self.config = config or {}
self.telemetry = get_telemetry()
def render(self, template: NotificationTemplate, data: Dict[str, Any], context: NotificationContext) -> Tuple[str, str]:
"""
Render a notification template.
Args:
template: Notification template
data: Template data
context: Notification context
Returns:
Tuple of (title, content)
"""
try:
# Simple template rendering with format string
render_context = {
"user": {
"id": context.user_id,
"role": context.user_role,
**context.user_preferences
},
"app": context.application_state,
"session": context.session_data,
"device": context.device_info,
**data
}
# Render the title and content
title = self._render_string(template.title_template, render_context)
content = self._render_string(template.content_template, render_context)
return title, content
except Exception as e:
self.telemetry.error(
f"Failed to render notification template: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
# Return fallback values
return template.title_template, template.content_template
def _render_string(self, template_str: str, context: Dict[str, Any]) -> str:
"""
Render a string template.
Args:
template_str: Template string
context: Rendering context
Returns:
Rendered string
"""
# Simple recursive dictionary lookup
def lookup(path, data):
if "." not in path:
return data.get(path, "")
parts = path.split(".", 1)
if parts[0] not in data:
return ""
return lookup(parts[1], data[parts[0]])
# Find all placeholders {variable} in the template
import re
placeholders = re.findall(r"\{([^}]+)\}", template_str)
# Replace each placeholder with its value
result = template_str
for placeholder in placeholders:
value = lookup(placeholder, context)
result = result.replace(f"{{{placeholder}}}", str(value))
return result
class NotificationManager:
"""Main manager for the notification system."""
def __init__(self, config: Dict[str, Any] = None):
"""Initialize the notification manager."""
self.config = config or {}
self.telemetry = get_telemetry()
# Create notification storage
self.notifications: Dict[str, Notification] = {}
self.templates: Dict[str, NotificationTemplate] = {}
self.user_notifications: Dict[str, List[str]] = {} # User ID -> Notification IDs
# Create components
self.router = NotificationRouter(self.config.get("router", {}))
self.scheduler = NotificationScheduler(self.config.get("scheduler", {}))
self.grouper = NotificationGrouper(self.config.get("grouper", {}))
self.renderer = NotificationTemplateRenderer(self.config.get("renderer", {}))
# Create processing queues
self.delivery_queue = queue.PriorityQueue()
# Thread for processing scheduled notifications
self.processing_thread = None
self.should_stop = threading.Event()
# Lock for thread safety
self.lock = threading.RLock()
# Start the processing thread
self._start_processing_thread()
# Register default templates
self._register_default_templates()
def _start_processing_thread(self) -> None:
"""Start the background processing thread."""
self.processing_thread = threading.Thread(
target=self._processing_worker,
daemon=True
)
self.processing_thread.start()
def _processing_worker(self) -> None:
"""Background worker to process scheduled notifications."""
while not self.should_stop.is_set():
try:
# Try to get a notification from the queue
try:
# Get the next notification to deliver (priority, scheduled_time, notification_id)
_, scheduled_time, notification_id = self.delivery_queue.get(timeout=1.0)
except queue.Empty:
continue
# Check if it's time to deliver
now = datetime.now()
if scheduled_time > now:
# Not time yet, put it back in the queue
self.delivery_queue.put((scheduled_time.timestamp(), scheduled_time, notification_id))
time.sleep(0.1)
continue
# Get the notification
with self.lock:
notification = self.notifications.get(notification_id)
if not notification:
continue
# Check if notification is still valid
if notification.state != NotificationState.SCHEDULED:
continue
# Check if notification is expired
if notification.is_expired():
with self.lock:
notification.state = NotificationState.EXPIRED
continue
# Set state to pending for delivery
with self.lock:
notification.state = NotificationState.PENDING
# Deliver the notification
self._deliver_notification(notification)
except Exception as e:
self.telemetry.error(
f"Error in notification processing worker: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
time.sleep(1.0)
def _register_default_templates(self) -> None:
"""Register default notification templates."""
# System notification template
self.register_template(NotificationTemplate(
id="system_notification",
title_template="System Notification",
content_template="{message}",
category=NotificationCategory.SYSTEM,
default_priority=NotificationPriority.MEDIUM,
default_channels=[NotificationChannel.UI, NotificationChannel.CONSOLE],
icon="info-circle",
color="#3498db"
))
# Security alert template
self.register_template(NotificationTemplate(
id="security_alert",
title_template="Security Alert",
content_template="{message}",
category=NotificationCategory.SECURITY,
default_priority=NotificationPriority.HIGH,
default_channels=[NotificationChannel.UI, NotificationChannel.EMAIL, NotificationChannel.CONSOLE],
icon="shield-alt",
color="#e74c3c",
requires_acknowledgment=True
))
# Detection result template
self.register_template(NotificationTemplate(
id="detection_result",
title_template="Detection Result",
content_template="Detection complete: {result}",
category=NotificationCategory.DETECTION,
default_priority=NotificationPriority.MEDIUM,
default_channels=[NotificationChannel.UI, NotificationChannel.CONSOLE],
icon="search",
color="#2ecc71",
grouping_key="detection_results"
))
# Process complete template
self.register_template(NotificationTemplate(
id="process_complete",
title_template="Process Complete",
content_template="Process {process_name} completed successfully.",
category=NotificationCategory.PROCESS,
default_priority=NotificationPriority.MEDIUM,
default_channels=[NotificationChannel.UI, NotificationChannel.TOAST, NotificationChannel.CONSOLE],
icon="check-circle",
color="#2ecc71",
grouping_key="process_updates"
))
# Process failed template
self.register_template(NotificationTemplate(
id="process_failed",
title_template="Process Failed",
content_template="Process {process_name} failed: {error}",
category=NotificationCategory.PROCESS,
default_priority=NotificationPriority.HIGH,
default_channels=[NotificationChannel.UI, NotificationChannel.MODAL, NotificationChannel.CONSOLE],
icon="exclamation-circle",
color="#e74c3c",
grouping_key="process_updates"
))
def register_template(self, template: NotificationTemplate) -> None:
"""
Register a notification template.
Args:
template: Notification template
"""
with self.lock:
self.templates[template.id] = template
def unregister_template(self, template_id: str) -> None:
"""
Unregister a notification template.
Args:
template_id: Template ID
"""
with self.lock:
if template_id in self.templates:
del self.templates[template_id]
def get_template(self, template_id: str) -> Optional[NotificationTemplate]:
"""
Get a notification template.
Args:
template_id: Template ID
Returns:
Notification template or None if not found
"""
with self.lock:
return self.templates.get(template_id)
def create_notification(
self,
template_id: str,
data: Dict[str, Any],
recipient_id: Optional[str] = None,
context: Optional[NotificationContext] = None,
priority: Optional[NotificationPriority] = None,
channels: Optional[List[NotificationChannel]] = None,
scheduled_for: Optional[datetime] = None,
expires_at: Optional[datetime] = None
) -> Optional[Notification]:
"""
Create a notification from a template.
Args:
template_id: Template ID
data: Template data
recipient_id: Recipient user ID
context: Notification context
priority: Override default priority
channels: Override default channels
scheduled_for: Schedule delivery time
expires_at: Expiration time
Returns:
Created notification or None if template not found
"""
# Get the template
template = self.get_template(template_id)
if not template:
self.telemetry.error(
f"Template not found: {template_id}",
category=EventCategory.SYSTEM
)
return None
# Create or use provided context
if context is None:
context = NotificationContext(user_id=recipient_id)
# Render the template
title, content = self.renderer.render(template, data, context)
# Create the notification
notification = Notification(
id=str(uuid.uuid4()),
template_id=template_id,
title=title,
content=content,
category=template.category,
priority=priority or template.default_priority,
channels=channels or template.default_channels,
state=NotificationState.PENDING,
created_at=datetime.now(),
scheduled_for=scheduled_for,
expires_at=expires_at or (datetime.now() + template.expiration if template.expiration else None),
recipient_id=recipient_id,
recipient_role=context.user_role,
context_snapshot=context.to_dict(),
data=data,
actions=template.actions,
icon=template.icon,
color=template.color,
grouping_key=template.grouping_key,
requires_acknowledgment=template.requires_acknowledgment
)
# Store the notification
with self.lock:
self.notifications[notification.id] = notification
if recipient_id:
if recipient_id not in self.user_notifications:
self.user_notifications[recipient_id] = []
self.user_notifications[recipient_id].append(notification.id)
return notification
def schedule_notification(self, notification: Notification, scheduled_for: Optional[datetime] = None) -> datetime:
"""
Schedule a notification for delivery.
Args:
notification: Notification to schedule
scheduled_for: Explicit delivery time or None to auto-schedule
Returns:
Scheduled delivery time
"""
with self.lock:
# Get delivery time
if scheduled_for:
delivery_time = scheduled_for
else:
# Create context object from snapshot
context = NotificationContext.from_dict(notification.context_snapshot)
# Get scheduled time
delivery_time = self.scheduler.schedule(notification, context)
# Update notification
notification.schedule(delivery_time)
# Add to delivery queue
self.delivery_queue.put((delivery_time.timestamp(), delivery_time, notification.id))
return delivery_time
def send_notification(
self,
template_id: str,
data: Dict[str, Any],
recipient_id: Optional[str] = None,
context: Optional[NotificationContext] = None,
priority: Optional[NotificationPriority] = None,
channels: Optional[List[NotificationChannel]] = None,
scheduled_for: Optional[datetime] = None,
expires_at: Optional[datetime] = None
) -> Optional[str]:
"""
Create and schedule a notification.
Args:
template_id: Template ID
data: Template data
recipient_id: Recipient user ID
context: Notification context
priority: Override default priority
channels: Override default channels
scheduled_for: Schedule delivery time
expires_at: Expiration time
Returns:
Notification ID or None if creation failed
"""
# Create the notification
notification = self.create_notification(
template_id=template_id,
data=data,
recipient_id=recipient_id,
context=context,
priority=priority,
channels=channels,
scheduled_for=scheduled_for,
expires_at=expires_at
)
if not notification:
return None
# Check for grouping
with self.lock:
if recipient_id and recipient_id in self.user_notifications:
# Get active notifications for the recipient
active_notifications = [
self.notifications[nid]
for nid in self.user_notifications[recipient_id]
if nid in self.notifications and
self.notifications[nid].state in [NotificationState.PENDING, NotificationState.SCHEDULED, NotificationState.DELIVERED]
]
# Check if notification should be grouped
should_group, group_id = self.grouper.should_group(notification, active_notifications)
if should_group and group_id:
# Set group ID
notification.group_id = group_id
# Log grouping
self.telemetry.debug(
f"Grouped notification {notification.id} into group {group_id}",
category=EventCategory.SYSTEM
)
# Schedule the notification
self.schedule_notification(notification, scheduled_for)
return notification.id
def _deliver_notification(self, notification: Notification) -> None:
"""
Deliver a notification.
Args:
notification: Notification to deliver
"""
# Create context object from snapshot
context = NotificationContext.from_dict(notification.context_snapshot)
# Route the notification
channels = self.router.route(notification, context)
# Update notification state
with self.lock:
if channels:
notification.state = NotificationState.DELIVERED
notification.delivered_at = datetime.now()
else:
notification.state = NotificationState.FAILED
# Log delivery
self.telemetry.debug(
f"Notification {notification.id} delivered to channels: {', '.join([ch.value for ch in channels])}",
category=EventCategory.SYSTEM,
context={"notification_id": notification.id}
)
def get_notification(self, notification_id: str) -> Optional[Notification]:
"""
Get a notification by ID.
Args:
notification_id: Notification ID
Returns:
Notification or None if not found
"""
with self.lock:
return self.notifications.get(notification_id)
def get_user_notifications(
self,
user_id: str,
states: Optional[List[NotificationState]] = None,
limit: int = 100,
offset: int = 0
) -> List[Notification]:
"""
Get notifications for a user.
Args:
user_id: User ID
states: Filter by states
limit: Maximum number of notifications to return
offset: Offset for pagination
Returns:
List of notifications
"""
with self.lock:
if user_id not in self.user_notifications:
return []
# Get notification IDs for the user
notification_ids = self.user_notifications[user_id]
# Get notifications
notifications = []
for nid in notification_ids:
if nid in self.notifications:
notification = self.notifications[nid]
# Filter by states
if states and notification.state not in states:
continue
notifications.append(notification)
# Sort by created_at (newest first)
notifications.sort(key=lambda n: n.created_at, reverse=True)
# Apply pagination
return notifications[offset:offset+limit]
def mark_as_read(self, notification_id: str) -> bool:
"""
Mark a notification as read.
Args:
notification_id: Notification ID
Returns:
Whether the operation was successful
"""
with self.lock:
notification = self.notifications.get(notification_id)
if not notification:
return False
notification.mark_as_read()
return True
def acknowledge(self, notification_id: str, data: Optional[Dict[str, Any]] = None) -> bool:
"""
Acknowledge a notification.
Args:
notification_id: Notification ID
data: Acknowledgment data
Returns:
Whether the operation was successful
"""
with self.lock:
notification = self.notifications.get(notification_id)
if not notification:
return False
notification.acknowledge(data)
return True
def cancel(self, notification_id: str) -> bool:
"""
Cancel a notification.
Args:
notification_id: Notification ID
Returns:
Whether the operation was successful
"""
with self.lock:
notification = self.notifications.get(notification_id)
if not notification:
return False
notification.cancel()
return True
def delete(self, notification_id: str) -> bool:
"""
Delete a notification.
Args:
notification_id: Notification ID
Returns:
Whether the operation was successful
"""
with self.lock:
notification = self.notifications.get(notification_id)
if not notification:
return False
# Remove from user notifications
if notification.recipient_id and notification.recipient_id in self.user_notifications:
if notification_id in self.user_notifications[notification.recipient_id]:
self.user_notifications[notification.recipient_id].remove(notification_id)
# Delete the notification
del self.notifications[notification_id]
return True
def clear_user_notifications(self, user_id: str, states: Optional[List[NotificationState]] = None) -> int:
"""
Clear notifications for a user.
Args:
user_id: User ID
states: Only clear notifications with these states
Returns:
Number of notifications cleared
"""
with self.lock:
if user_id not in self.user_notifications:
return 0
# Get notification IDs for the user
notification_ids = self.user_notifications[user_id].copy()
# Clear notifications
cleared = 0
for nid in notification_ids:
if nid in self.notifications:
notification = self.notifications[nid]
# Filter by states
if states and notification.state not in states:
continue
# Delete the notification
del self.notifications[nid]
self.user_notifications[user_id].remove(nid)
cleared += 1
return cleared
def shutdown(self) -> None:
"""Shutdown the notification manager."""
# Stop the processing thread
self.should_stop.set()
if self.processing_thread and self.processing_thread.is_alive():
self.processing_thread.join(timeout=2.0)
# Log shutdown
self.telemetry.info(
"Notification manager shutdown",
category=EventCategory.SYSTEM
)
# Singleton instance
_instance = None
def get_notification_manager(config: Dict[str, Any] = None) -> NotificationManager:
"""
Get the global notification manager instance.
Args:
config: Configuration options
Returns:
NotificationManager instance
"""
global _instance
if _instance is None:
_instance = NotificationManager(config)
return _instance
# Register UI callback
def register_ui_callback(user_id: str, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Register a UI callback for notifications.
Args:
user_id: User ID or "default" for global callback
callback: Function to call with notification data
"""
manager = get_notification_manager()
ui_handler = manager.router.get_handler(NotificationChannel.UI)
if isinstance(ui_handler, UIChannelHandler):
ui_handler.register_callback(user_id, callback)
# Helper functions for sending notifications
def send_notification(
template_id: str,
data: Dict[str, Any],
recipient_id: Optional[str] = None,
priority: Optional[NotificationPriority] = None,
channels: Optional[List[NotificationChannel]] = None
) -> Optional[str]:
"""
Send a notification.
Args:
template_id: Template ID
data: Template data
recipient_id: Recipient user ID
priority: Override default priority
channels: Override default channels
Returns:
Notification ID or None if sending failed
"""
manager = get_notification_manager()
return manager.send_notification(
template_id=template_id,
data=data,
recipient_id=recipient_id,
priority=priority,
channels=channels
)
def send_system_notification(
message: str,
recipient_id: Optional[str] = None,
priority: NotificationPriority = NotificationPriority.MEDIUM
) -> Optional[str]:
"""
Send a system notification.
Args:
message: Notification message
recipient_id: Recipient user ID
priority: Notification priority
Returns:
Notification ID or None if sending failed
"""
return send_notification(
template_id="system_notification",
data={"message": message},
recipient_id=recipient_id,
priority=priority
)
def send_security_alert(
message: str,
recipient_id: Optional[str] = None,
priority: NotificationPriority = NotificationPriority.HIGH
) -> Optional[str]:
"""
Send a security alert.
Args:
message: Alert message
recipient_id: Recipient user ID
priority: Notification priority
Returns:
Notification ID or None if sending failed
"""
return send_notification(
template_id="security_alert",
data={"message": message},
recipient_id=recipient_id,
priority=priority
)
def send_detection_result(
result: str,
recipient_id: Optional[str] = None,
priority: NotificationPriority = NotificationPriority.MEDIUM
) -> Optional[str]:
"""
Send a detection result notification.
Args:
result: Detection result
recipient_id: Recipient user ID
priority: Notification priority
Returns:
Notification ID or None if sending failed
"""
return send_notification(
template_id="detection_result",
data={"result": result},
recipient_id=recipient_id,
priority=priority
)
def send_process_notification(
process_name: str,
success: bool,
error: Optional[str] = None,
recipient_id: Optional[str] = None
) -> Optional[str]:
"""
Send a process notification.
Args:
process_name: Process name
success: Whether the process was successful
error: Error message if failed
recipient_id: Recipient user ID
Returns:
Notification ID or None if sending failed
"""
if success:
return send_notification(
template_id="process_complete",
data={"process_name": process_name},
recipient_id=recipient_id
)
else:
return send_notification(
template_id="process_failed",
data={"process_name": process_name, "error": error or "Unknown error"},
recipient_id=recipient_id,
priority=NotificationPriority.HIGH
)
class NotificationMonitor:
"""Utility class for monitoring notification metrics and performance."""
def __init__(self, notification_manager: NotificationManager):
"""
Initialize the notification monitor.
Args:
notification_manager: Notification manager to monitor
"""
self.notification_manager = notification_manager
self.telemetry = get_telemetry()
# Schedule periodic metrics collection
self.metrics_thread = threading.Thread(
target=self._collect_metrics_periodically,
daemon=True
)
self.metrics_thread.start()
def _collect_metrics_periodically(self) -> None:
"""Periodically collect notification metrics."""
while True:
try:
# Collect metrics
metrics = self.collect_metrics()
# Record metrics in telemetry
self.telemetry.debug(
"Notification system metrics collected",
category=EventCategory.SYSTEM,
context={"metrics": metrics}
)
# Wait for next collection
time.sleep(60.0) # Collect every minute
except Exception as e:
self.telemetry.error(
f"Error collecting notification metrics: {e}",
category=EventCategory.SYSTEM,
exc_info=True
)
time.sleep(60.0)
def collect_metrics(self) -> Dict[str, Any]:
"""
Collect notification metrics.
Returns:
Dictionary of metrics
"""
with self.notification_manager.lock:
# Count notifications by state
states = {state: 0 for state in NotificationState}
for notification in self.notification_manager.notifications.values():
states[notification.state] += 1
# Count notifications by priority
priorities = {priority: 0 for priority in NotificationPriority}
for notification in self.notification_manager.notifications.values():
priorities[notification.priority] += 1
# Count notifications by category
categories = {category: 0 for category in NotificationCategory}
for notification in self.notification_manager.notifications.values():
categories[notification.category] += 1
# Calculate delivery metrics
total_notifications = len(self.notification_manager.notifications)
delivered_count = states[NotificationState.DELIVERED]
failed_count = states[NotificationState.FAILED]
delivery_rate = 0.0
if total_notifications > 0:
delivery_rate = delivered_count / total_notifications
# Calculate average delivery time
delivery_times = []
for notification in self.notification_manager.notifications.values():
if notification.delivered_at and notification.created_at:
delivery_time = (notification.delivered_at - notification.created_at).total_seconds()
delivery_times.append(delivery_time)
avg_delivery_time = 0.0
if delivery_times:
avg_delivery_time = sum(delivery_times) / len(delivery_times)
# Count active users
active_users = len(self.notification_manager.user_notifications)
return {
"total_notifications": total_notifications,
"states": {state.value: count for state, count in states.items()},
"priorities": {priority.value: count for priority, count in priorities.items()},
"categories": {category.value: count for category, count in categories.items()},
"delivery_rate": delivery_rate,
"avg_delivery_time": avg_delivery_time,
"active_users": active_users,
"queue_size": self.notification_manager.delivery_queue.qsize()
}
def get_notification_stats(self, user_id: Optional[str] = None) -> Dict[str, Any]:
"""
Get notification statistics.
Args:
user_id: Optional user ID to filter by
Returns:
Dictionary of notification statistics
"""
with self.notification_manager.lock:
if user_id:
# User-specific stats
if user_id not in self.notification_manager.user_notifications:
return {
"total": 0,
"unread": 0,
"states": {state.value: 0 for state in NotificationState},
"categories": {category.value: 0 for category in NotificationCategory}
}
# Get notification IDs for the user
notification_ids = self.notification_manager.user_notifications[user_id]
# Count states and categories
states = {state: 0 for state in NotificationState}
categories = {category: 0 for category in NotificationCategory}
unread = 0
for nid in notification_ids:
if nid in self.notification_manager.notifications:
notification = self.notification_manager.notifications[nid]
states[notification.state] += 1
categories[notification.category] += 1
if notification.state != NotificationState.READ:
unread += 1
return {
"total": len(notification_ids),
"unread": unread,
"states": {state.value: count for state, count in states.items()},
"categories": {category.value: count for category, count in categories.items()}
}
else:
# System-wide stats
states = {state: 0 for state in NotificationState}
categories = {category: 0 for category in NotificationCategory}
unread = 0
for notification in self.notification_manager.notifications.values():
states[notification.state] += 1
categories[notification.category] += 1
if notification.state != NotificationState.READ:
unread += 1
return {
"total": len(self.notification_manager.notifications),
"unread": unread,
"states": {state.value: count for state, count in states.items()},
"categories": {category.value: count for category, count in categories.items()},
"active_users": len(self.notification_manager.user_notifications)
}
def initialize_notification_monitor() -> NotificationMonitor:
"""
Initialize the notification monitor.
Returns:
NotificationMonitor instance
"""
manager = get_notification_manager()
return NotificationMonitor(manager)