""" Notification queue utility. Creates notification docs in MongoDB (notifications collection) and pushes notification_id to Redis queue for the notification-ms worker to consume. Templates are company-level. merchant_name is passed as a variable in template_data for rendering (if applicable). """ import json import uuid from datetime import datetime from typing import Optional, Dict, Any, List from enum import Enum from app.core.logging import get_logger from app.core.config import settings from app.cache import get_redis from app.nosql import get_database logger = get_logger(__name__) NOTIFICATION_QUEUE_NAME = "notifications:queue" COLLECTION = "notifications" class NotificationChannel(str, Enum): """Supported notification channels.""" WHATSAPP = "whatsapp" SMS = "sms" EMAIL = "email" PUSH = "push" class NotificationQueue: """ Utility for queuing notifications via MongoDB + Redis. Creates a notification document in MongoDB and pushes its ID to Redis so the notification-ms worker picks it up. """ @staticmethod async def _enqueue( recipient: str, channels: List[str], template_name: str, template_data: Dict[str, Any], merchant_id: str = "", merchant_name: str = "", priority: str = "normal", source: str = "scm-ms", metadata: Optional[Dict[str, Any]] = None, ) -> bool: """ Create a notification document in MongoDB and push its ID to Redis. merchant_name stored at doc level for dispatcher to inject into template_data. Returns True if queued successfully. """ notification_id = str(uuid.uuid4()) now = datetime.utcnow() doc = { "notification_id": notification_id, "recipient": recipient, "channels": channels, "template_name": template_name, "template_data": template_data, "priority": priority, "status": "queued", "source": source, "merchant_id": merchant_id, "merchant_name": merchant_name, "metadata": metadata or {}, "channel_results": {}, "retry_count": 0, "created_at": now, "updated_at": now, "sent_at": None, } try: db = get_database() await db[COLLECTION].insert_one(doc) redis_client = get_redis() if redis_client: await redis_client.lpush( NOTIFICATION_QUEUE_NAME, json.dumps({"notification_id": notification_id}), ) logger.info( "Notification queued", extra={ "notification_id": notification_id, "template": template_name, "channels": channels, "recipient": recipient, }, ) else: logger.warning( f"Redis unavailable — notification {notification_id} saved but not queued" ) return True except Exception as e: logger.error("Failed to queue notification", exc_info=e) return False # ── Convenience helpers ────────────────────────────────────────────── @staticmethod async def send_otp( recipient: str, otp: str, merchant_id: str = "", merchant_name: str = "", channels: Optional[List[NotificationChannel]] = None, template_name: Optional[str] = None, ) -> bool: """Queue OTP notification via WhatsApp + SMS.""" if channels is None: channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] return await NotificationQueue._enqueue( recipient=recipient, channels=[ch.value for ch in channels], template_name=template_name or settings.NOTIFICATION_TEMPLATE_OTP_VERIFICATION, template_data={ "otp": otp, "expiry_minutes": "5", "merchant_name": merchant_name, }, merchant_id=merchant_id, merchant_name=merchant_name, priority="critical", ) @staticmethod async def send_credentials( recipient: str, name: str, username: str, password: str, merchant_id: str = "", merchant_name: str = "", channels: Optional[List[NotificationChannel]] = None, template_name: Optional[str] = None, ) -> bool: """Queue welcome credentials notification via WhatsApp + SMS. Uses the credentials template (default: otp) with password mapped to the 'otp' parameter for reliable delivery. Template name is configurable via NOTIFICATION_TEMPLATE_CREDENTIALS env var. """ if channels is None: channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] return await NotificationQueue._enqueue( recipient=recipient, channels=[ch.value for ch in channels], template_name=template_name or settings.NOTIFICATION_TEMPLATE_CREDENTIALS, template_data={ "otp": password, "expiry_minutes": "0", # not applicable for credentials "merchant_name": merchant_name, }, merchant_id=merchant_id, merchant_name=merchant_name, priority="high", ) @staticmethod async def send_password_reset( recipient: str, name: str, new_password: str, merchant_id: str = "", merchant_name: str = "", channels: Optional[List[NotificationChannel]] = None, template_name: Optional[str] = None, ) -> bool: """Queue password reset notification via WhatsApp + SMS. Template name is configurable via NOTIFICATION_TEMPLATE_PASSWORD_RESET env var. """ if channels is None: channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] return await NotificationQueue._enqueue( recipient=recipient, channels=[ch.value for ch in channels], template_name=template_name or settings.NOTIFICATION_TEMPLATE_PASSWORD_RESET, template_data={ "otp": new_password, "expiry_minutes": "0", # not applicable for password reset "merchant_name": merchant_name, }, merchant_id=merchant_id, merchant_name=merchant_name, priority="high", ) @staticmethod async def send_custom( recipient: str, template_name: str, data: Dict[str, Any], merchant_id: str = "", merchant_name: str = "", channels: Optional[List[NotificationChannel]] = None, ) -> bool: """Queue custom notification with any template and data.""" if channels is None: channels = [NotificationChannel.WHATSAPP] return await NotificationQueue._enqueue( recipient=recipient, channels=[ch.value for ch in channels], template_name=template_name, template_data=data, merchant_id=merchant_id, merchant_name=merchant_name, ) @staticmethod async def get_queue_size() -> int: """Get current queue size for monitoring.""" redis_client = get_redis() if not redis_client: return 0 try: return await redis_client.llen(NOTIFICATION_QUEUE_NAME) except Exception as e: logger.error("Error getting notification queue size", exc_info=e) return 0