cuatrolabs-auth-ms / app /utils /notification_queue.py
MukeshKapoor25's picture
notfication ms integration
4e94087
"""
Notification queue utility for auth-ms.
Creates notification docs in MongoDB (notifications collection) and pushes
notification_id to Redis queue for the notification-ms worker to consume.
auth-ms does NOT handle WATI/SMTP directly β€” all delivery is delegated to
notification-ms which resolves per-merchant channel credentials.
"""
import json
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
from app.core.logging import get_logger
from app.core.config import settings
from app.cache import cache_service
from app.nosql import get_database
logger = get_logger(__name__)
NOTIFICATION_QUEUE_NAME = "notifications:queue"
COLLECTION = "notifications"
class NotificationChannel(str, Enum):
"""Supported notification channels."""
WHATSAPP = "whatsapp"
SMS = "sms"
EMAIL = "email"
PUSH = "push"
class NotificationQueue:
"""
Enqueues notifications via MongoDB + Redis so notification-ms worker
dispatches them through the correct channel providers using
per-merchant credentials from scm_merchant_settings.
"""
@staticmethod
async def _enqueue(
recipient: str,
channels: List[str],
template_name: str,
template_data: Dict[str, Any],
merchant_id: str = "",
merchant_name: str = "",
priority: str = "normal",
source: str = "auth-ms",
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Create a notification document in MongoDB and push its ID to Redis.
merchant_name is stored at doc level so the dispatcher can inject it
into template_data at render time.
"""
notification_id = str(uuid.uuid4())
now = datetime.utcnow()
doc = {
"notification_id": notification_id,
"recipient": recipient,
"channels": channels,
"template_name": template_name,
"template_data": template_data,
"priority": priority,
"status": "queued",
"source": source,
"merchant_id": merchant_id,
"merchant_name": merchant_name,
"metadata": metadata or {},
"channel_results": {},
"retry_count": 0,
"created_at": now,
"updated_at": now,
"sent_at": None,
}
try:
db = get_database()
await db[COLLECTION].insert_one(doc)
redis_client = cache_service.get_client()
redis_client.lpush(
NOTIFICATION_QUEUE_NAME,
json.dumps({"notification_id": notification_id}),
)
logger.info(
"Notification queued",
extra={
"notification_id": notification_id,
"template": template_name,
"channels": channels,
"recipient": recipient,
},
)
return True
except Exception:
logger.error(
"Notification queue failed",
exc_info=True,
extra={"template": template_name, "recipient": recipient},
)
return False
# ── Convenience helpers ──────────────────────────────────────────────
@staticmethod
async def send_otp(
recipient: str,
otp: str,
expiry_minutes: int = 5,
merchant_id: str = "",
merchant_name: str = "",
channels: Optional[List[NotificationChannel]] = None,
template_name: Optional[str] = None,
) -> bool:
"""Queue OTP notification. Delivery handled by notification-ms."""
if channels is None:
channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]
return await NotificationQueue._enqueue(
recipient=recipient,
channels=[ch.value for ch in channels],
template_name=template_name or settings.NOTIFICATION_TEMPLATE_OTP_VERIFICATION,
template_data={
"otp": otp,
"expiry_minutes": str(expiry_minutes),
"merchant_name": merchant_name,
},
merchant_id=merchant_id,
merchant_name=merchant_name,
priority="critical",
)
@staticmethod
async def send_credentials(
recipient: str,
name: str,
username: str,
password: str,
merchant_id: str = "",
merchant_name: str = "",
channels: Optional[List[NotificationChannel]] = None,
template_name: Optional[str] = None,
) -> bool:
"""Queue welcome credentials notification. Delivery handled by notification-ms."""
if channels is None:
channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]
return await NotificationQueue._enqueue(
recipient=recipient,
channels=[ch.value for ch in channels],
template_name=template_name or settings.NOTIFICATION_TEMPLATE_CREDENTIALS,
template_data={
"otp": password,
"expiry_minutes": "0",
"merchant_name": merchant_name,
},
merchant_id=merchant_id,
merchant_name=merchant_name,
priority="high",
)
@staticmethod
async def send_password_reset(
recipient: str,
name: str,
new_password: str,
merchant_id: str = "",
merchant_name: str = "",
channels: Optional[List[NotificationChannel]] = None,
template_name: Optional[str] = None,
) -> bool:
"""Queue password reset notification. Delivery handled by notification-ms."""
if channels is None:
channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]
return await NotificationQueue._enqueue(
recipient=recipient,
channels=[ch.value for ch in channels],
template_name=template_name or settings.NOTIFICATION_TEMPLATE_PASSWORD_RESET,
template_data={
"otp": new_password,
"expiry_minutes": "0",
"merchant_name": merchant_name,
},
merchant_id=merchant_id,
merchant_name=merchant_name,
priority="high",
)