cuatrolabs-scm-ms / app /utils /notification_queue.py
MukeshKapoor25's picture
chore(config,notification_queue): externalize notification template names to environment variables
aa78675
"""
Notification queue utility.
Creates notification docs in MongoDB (notifications collection) and pushes
notification_id to Redis queue for the notification-ms worker to consume.
Templates are company-level. merchant_name is passed as a variable
in template_data for rendering (if applicable).
"""
import json
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
from app.core.logging import get_logger
from app.core.config import settings
from app.cache import get_redis
from app.nosql import get_database
logger = get_logger(__name__)
NOTIFICATION_QUEUE_NAME = "notifications:queue"
COLLECTION = "notifications"
class NotificationChannel(str, Enum):
"""Supported notification channels."""
WHATSAPP = "whatsapp"
SMS = "sms"
EMAIL = "email"
PUSH = "push"
class NotificationQueue:
"""
Utility for queuing notifications via MongoDB + Redis.
Creates a notification document in MongoDB and pushes its ID to Redis
so the notification-ms worker picks it up.
"""
@staticmethod
async def _enqueue(
recipient: str,
channels: List[str],
template_name: str,
template_data: Dict[str, Any],
merchant_id: str = "",
merchant_name: str = "",
priority: str = "normal",
source: str = "scm-ms",
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Create a notification document in MongoDB and push its ID to Redis.
merchant_name stored at doc level for dispatcher to inject into template_data.
Returns True if queued successfully.
"""
notification_id = str(uuid.uuid4())
now = datetime.utcnow()
doc = {
"notification_id": notification_id,
"recipient": recipient,
"channels": channels,
"template_name": template_name,
"template_data": template_data,
"priority": priority,
"status": "queued",
"source": source,
"merchant_id": merchant_id,
"merchant_name": merchant_name,
"metadata": metadata or {},
"channel_results": {},
"retry_count": 0,
"created_at": now,
"updated_at": now,
"sent_at": None,
}
try:
db = get_database()
await db[COLLECTION].insert_one(doc)
redis_client = get_redis()
if redis_client:
await redis_client.lpush(
NOTIFICATION_QUEUE_NAME,
json.dumps({"notification_id": notification_id}),
)
logger.info(
"Notification queued",
extra={
"notification_id": notification_id,
"template": template_name,
"channels": channels,
"recipient": recipient,
},
)
else:
logger.warning(
f"Redis unavailable β€” notification {notification_id} saved but not queued"
)
return True
except Exception as e:
logger.error("Failed to queue notification", exc_info=e)
return False
# ── Convenience helpers ──────────────────────────────────────────────
@staticmethod
async def send_otp(
recipient: str,
otp: str,
merchant_id: str = "",
merchant_name: str = "",
channels: Optional[List[NotificationChannel]] = None,
template_name: Optional[str] = None,
) -> bool:
"""Queue OTP notification via WhatsApp + SMS."""
if channels is None:
channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]
return await NotificationQueue._enqueue(
recipient=recipient,
channels=[ch.value for ch in channels],
template_name=template_name or settings.NOTIFICATION_TEMPLATE_OTP_VERIFICATION,
template_data={
"otp": otp,
"expiry_minutes": "5",
"merchant_name": merchant_name,
},
merchant_id=merchant_id,
merchant_name=merchant_name,
priority="critical",
)
@staticmethod
async def send_credentials(
recipient: str,
name: str,
username: str,
password: str,
merchant_id: str = "",
merchant_name: str = "",
channels: Optional[List[NotificationChannel]] = None,
template_name: Optional[str] = None,
) -> bool:
"""Queue welcome credentials notification via WhatsApp + SMS.
Uses the credentials template (default: otp) with password mapped to the
'otp' parameter for reliable delivery. Template name is configurable via
NOTIFICATION_TEMPLATE_CREDENTIALS env var.
"""
if channels is None:
channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]
return await NotificationQueue._enqueue(
recipient=recipient,
channels=[ch.value for ch in channels],
template_name=template_name or settings.NOTIFICATION_TEMPLATE_CREDENTIALS,
template_data={
"otp": password,
"expiry_minutes": "0", # not applicable for credentials
"merchant_name": merchant_name,
},
merchant_id=merchant_id,
merchant_name=merchant_name,
priority="high",
)
@staticmethod
async def send_password_reset(
recipient: str,
name: str,
new_password: str,
merchant_id: str = "",
merchant_name: str = "",
channels: Optional[List[NotificationChannel]] = None,
template_name: Optional[str] = None,
) -> bool:
"""Queue password reset notification via WhatsApp + SMS.
Template name is configurable via NOTIFICATION_TEMPLATE_PASSWORD_RESET env var.
"""
if channels is None:
channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]
return await NotificationQueue._enqueue(
recipient=recipient,
channels=[ch.value for ch in channels],
template_name=template_name or settings.NOTIFICATION_TEMPLATE_PASSWORD_RESET,
template_data={
"otp": new_password,
"expiry_minutes": "0", # not applicable for password reset
"merchant_name": merchant_name,
},
merchant_id=merchant_id,
merchant_name=merchant_name,
priority="high",
)
@staticmethod
async def send_custom(
recipient: str,
template_name: str,
data: Dict[str, Any],
merchant_id: str = "",
merchant_name: str = "",
channels: Optional[List[NotificationChannel]] = None,
) -> bool:
"""Queue custom notification with any template and data."""
if channels is None:
channels = [NotificationChannel.WHATSAPP]
return await NotificationQueue._enqueue(
recipient=recipient,
channels=[ch.value for ch in channels],
template_name=template_name,
template_data=data,
merchant_id=merchant_id,
merchant_name=merchant_name,
)
@staticmethod
async def get_queue_size() -> int:
"""Get current queue size for monitoring."""
redis_client = get_redis()
if not redis_client:
return 0
try:
return await redis_client.llen(NOTIFICATION_QUEUE_NAME)
except Exception as e:
logger.error("Error getting notification queue size", exc_info=e)
return 0