swiftops-backend / src /app /services /notification_delivery.py
kamau1's picture
feat: unified sync notification system, realtime timesheets and payroll, simplified project role compensation
95005e1
"""
Notification Delivery - Tier 2: Background Notification Delivery
Handles delivery of notifications via external channels (WhatsApp, Email, SMS, Push).
Works with FastAPI BackgroundTasks for non-blocking delivery.
Architecture:
- Tier 1 (notification_creator.py): Creates notification records synchronously
- Tier 2 (This file): Delivers notifications via external channels asynchronously
Design Principles:
1. Non-blocking - Doesn't slow down API responses
2. Resilient - Handles failures gracefully, marks status
3. Configurable - Easy to enable/disable channels for MVP
4. Future-proof - Easy to migrate to Celery when needed
Usage:
from fastapi import BackgroundTasks
from app.services.notification_delivery import NotificationDelivery
# In endpoint
@router.post("/tickets/assign")
def assign_ticket(background_tasks: BackgroundTasks, ...):
# Create notification (Tier 1 - sync)
notification = NotificationCreator.create(
db=db,
user_id=agent.id,
title="Ticket Assigned",
message="You have been assigned...",
channel="whatsapp"
)
db.commit()
# Queue delivery (Tier 2 - async)
NotificationDelivery.queue_delivery(
background_tasks=background_tasks,
notification_id=notification.id
)
return response
Configuration:
Set in .env:
ENABLE_WHATSAPP_NOTIFICATIONS=false # Disable for MVP
ENABLE_EMAIL_NOTIFICATIONS=false # Disable for MVP
ENABLE_SMS_NOTIFICATIONS=false # Disable for MVP
"""
import logging
from uuid import UUID
from typing import Optional
from fastapi import BackgroundTasks
from app.core.database import SessionLocal
from app.models.notification import Notification, NotificationChannel, NotificationStatus
logger = logging.getLogger(__name__)
class NotificationDeliveryConfig:
"""
Configuration for notification delivery.
Allows enabling/disabling channels without code changes.
"""
# For MVP, all external channels are disabled
# Set to True in .env when ready to integrate
ENABLE_WHATSAPP = False
ENABLE_EMAIL = False
ENABLE_SMS = False
ENABLE_PUSH = False
# In-app notifications are always enabled (no external API)
ENABLE_IN_APP = True
@classmethod
def is_channel_enabled(cls, channel: NotificationChannel) -> bool:
"""Check if a delivery channel is enabled"""
channel_map = {
NotificationChannel.IN_APP: cls.ENABLE_IN_APP,
NotificationChannel.WHATSAPP: cls.ENABLE_WHATSAPP,
NotificationChannel.EMAIL: cls.ENABLE_EMAIL,
NotificationChannel.SMS: cls.ENABLE_SMS,
NotificationChannel.PUSH: cls.ENABLE_PUSH,
}
return channel_map.get(channel, False)
class NotificationDelivery:
"""
Handles delivery of notifications via external channels.
Designed to work with FastAPI BackgroundTasks (MVP) and Celery (future).
"""
@staticmethod
def queue_delivery(
background_tasks: BackgroundTasks,
notification_id: UUID
) -> None:
"""
Queue a notification for background delivery.
Args:
background_tasks: FastAPI BackgroundTasks instance
notification_id: ID of notification to deliver
Example:
NotificationDelivery.queue_delivery(
background_tasks=background_tasks,
notification_id=notification.id
)
"""
background_tasks.add_task(
NotificationDelivery._deliver_notification,
notification_id=notification_id
)
logger.debug(f"Queued notification {notification_id} for delivery")
@staticmethod
def queue_bulk_delivery(
background_tasks: BackgroundTasks,
notification_ids: list[UUID]
) -> None:
"""
Queue multiple notifications for background delivery.
Args:
background_tasks: FastAPI BackgroundTasks instance
notification_ids: List of notification IDs to deliver
Example:
NotificationDelivery.queue_bulk_delivery(
background_tasks=background_tasks,
notification_ids=[notif.id for notif in notifications]
)
"""
for notification_id in notification_ids:
NotificationDelivery.queue_delivery(
background_tasks=background_tasks,
notification_id=notification_id
)
logger.debug(f"Queued {len(notification_ids)} notifications for delivery")
@staticmethod
def _deliver_notification(notification_id: UUID) -> None:
"""
Internal method to deliver a notification.
Runs in background task.
Args:
notification_id: ID of notification to deliver
"""
db = SessionLocal()
try:
# Get notification
notification = db.query(Notification).filter(
Notification.id == notification_id
).first()
if not notification:
logger.error(f"Notification {notification_id} not found")
return
# Check if channel is enabled
if not NotificationDeliveryConfig.is_channel_enabled(notification.channel):
logger.info(
f"Channel {notification.channel.value} is disabled. "
f"Marking notification {notification_id} as skipped."
)
notification.status = NotificationStatus.SENT # Mark as sent to avoid retry
notification.additional_metadata = notification.additional_metadata or {}
notification.additional_metadata["delivery_skipped"] = True
notification.additional_metadata["skip_reason"] = "Channel disabled for MVP"
db.commit()
return
# In-app notifications don't need external delivery
if notification.channel == NotificationChannel.IN_APP:
notification.status = NotificationStatus.SENT
db.commit()
logger.debug(f"In-app notification {notification_id} marked as sent")
return
# Deliver via external channel
if notification.channel == NotificationChannel.WHATSAPP:
NotificationDelivery._deliver_whatsapp(db, notification)
elif notification.channel == NotificationChannel.EMAIL:
NotificationDelivery._deliver_email(db, notification)
elif notification.channel == NotificationChannel.SMS:
NotificationDelivery._deliver_sms(db, notification)
elif notification.channel == NotificationChannel.PUSH:
NotificationDelivery._deliver_push(db, notification)
else:
logger.warning(f"Unknown channel: {notification.channel}")
notification.status = NotificationStatus.FAILED
notification.error_message = f"Unknown channel: {notification.channel}"
db.commit()
except Exception as e:
logger.error(
f"Failed to deliver notification {notification_id}: {str(e)}",
exc_info=True
)
# Don't raise - background task should not fail
finally:
db.close()
@staticmethod
def _deliver_whatsapp(db, notification: Notification) -> None:
"""
Deliver notification via WhatsApp.
For MVP: This is a placeholder. When ready to integrate:
1. Set ENABLE_WHATSAPP = True in config
2. Implement actual WhatsApp API call (Twilio, Africa's Talking, etc.)
3. Handle API errors and retries
"""
try:
# TODO: Implement WhatsApp delivery
# Example:
# from app.services.whatsapp_service import WhatsAppService
# WhatsAppService.send_message(
# phone_number=notification.user.phone_number,
# message=notification.message
# )
# For now, just log
logger.info(
f"WhatsApp delivery placeholder for notification {notification.id}. "
f"Implement WhatsAppService when ready."
)
notification.status = NotificationStatus.SENT
notification.sent_at = notification.sent_at or notification.created_at
db.commit()
except Exception as e:
logger.error(f"WhatsApp delivery failed: {str(e)}", exc_info=True)
notification.status = NotificationStatus.FAILED
notification.error_message = str(e)
db.commit()
@staticmethod
def _deliver_email(db, notification: Notification) -> None:
"""
Deliver notification via Email.
For MVP: This is a placeholder. When ready to integrate:
1. Set ENABLE_EMAIL = True in config
2. Implement actual Email sending (SendGrid, AWS SES, etc.)
3. Handle API errors and retries
"""
try:
# TODO: Implement Email delivery
# Example:
# from app.services.email_service import EmailService
# EmailService.send_email(
# to_email=notification.user.email,
# subject=notification.title,
# body=notification.message
# )
logger.info(
f"Email delivery placeholder for notification {notification.id}. "
f"Implement EmailService when ready."
)
notification.status = NotificationStatus.SENT
notification.sent_at = notification.sent_at or notification.created_at
db.commit()
except Exception as e:
logger.error(f"Email delivery failed: {str(e)}", exc_info=True)
notification.status = NotificationStatus.FAILED
notification.error_message = str(e)
db.commit()
@staticmethod
def _deliver_sms(db, notification: Notification) -> None:
"""
Deliver notification via SMS.
For MVP: This is a placeholder. When ready to integrate:
1. Set ENABLE_SMS = True in config
2. Implement actual SMS sending (Twilio, Africa's Talking, etc.)
3. Handle API errors and retries
"""
try:
# TODO: Implement SMS delivery
logger.info(
f"SMS delivery placeholder for notification {notification.id}. "
f"Implement SMSService when ready."
)
notification.status = NotificationStatus.SENT
notification.sent_at = notification.sent_at or notification.created_at
db.commit()
except Exception as e:
logger.error(f"SMS delivery failed: {str(e)}", exc_info=True)
notification.status = NotificationStatus.FAILED
notification.error_message = str(e)
db.commit()
@staticmethod
def _deliver_push(db, notification: Notification) -> None:
"""
Deliver notification via Push Notification.
For MVP: This is a placeholder. When ready to integrate:
1. Set ENABLE_PUSH = True in config
2. Implement actual Push notification (Firebase, OneSignal, etc.)
3. Handle API errors and retries
"""
try:
# TODO: Implement Push notification delivery
logger.info(
f"Push notification delivery placeholder for notification {notification.id}. "
f"Implement PushService when ready."
)
notification.status = NotificationStatus.SENT
notification.sent_at = notification.sent_at or notification.created_at
db.commit()
except Exception as e:
logger.error(f"Push notification delivery failed: {str(e)}", exc_info=True)
notification.status = NotificationStatus.FAILED
notification.error_message = str(e)
db.commit()