Spaces:
Runtime error
Runtime error
chore(config,notification_queue): externalize notification template names to environment variables
aa78675 | """ | |
| Notification queue utility. | |
| Creates notification docs in MongoDB (notifications collection) and pushes | |
| notification_id to Redis queue for the notification-ms worker to consume. | |
| Templates are company-level. merchant_name is passed as a variable | |
| in template_data for rendering (if applicable). | |
| """ | |
| import json | |
| import uuid | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List | |
| from enum import Enum | |
| from app.core.logging import get_logger | |
| from app.core.config import settings | |
| from app.cache import get_redis | |
| from app.nosql import get_database | |
| logger = get_logger(__name__) | |
| NOTIFICATION_QUEUE_NAME = "notifications:queue" | |
| COLLECTION = "notifications" | |
| class NotificationChannel(str, Enum): | |
| """Supported notification channels.""" | |
| WHATSAPP = "whatsapp" | |
| SMS = "sms" | |
| EMAIL = "email" | |
| PUSH = "push" | |
| class NotificationQueue: | |
| """ | |
| Utility for queuing notifications via MongoDB + Redis. | |
| Creates a notification document in MongoDB and pushes its ID to Redis | |
| so the notification-ms worker picks it up. | |
| """ | |
| async def _enqueue( | |
| recipient: str, | |
| channels: List[str], | |
| template_name: str, | |
| template_data: Dict[str, Any], | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| priority: str = "normal", | |
| source: str = "scm-ms", | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ) -> bool: | |
| """ | |
| Create a notification document in MongoDB and push its ID to Redis. | |
| merchant_name stored at doc level for dispatcher to inject into template_data. | |
| Returns True if queued successfully. | |
| """ | |
| notification_id = str(uuid.uuid4()) | |
| now = datetime.utcnow() | |
| doc = { | |
| "notification_id": notification_id, | |
| "recipient": recipient, | |
| "channels": channels, | |
| "template_name": template_name, | |
| "template_data": template_data, | |
| "priority": priority, | |
| "status": "queued", | |
| "source": source, | |
| "merchant_id": merchant_id, | |
| "merchant_name": merchant_name, | |
| "metadata": metadata or {}, | |
| "channel_results": {}, | |
| "retry_count": 0, | |
| "created_at": now, | |
| "updated_at": now, | |
| "sent_at": None, | |
| } | |
| try: | |
| db = get_database() | |
| await db[COLLECTION].insert_one(doc) | |
| redis_client = get_redis() | |
| if redis_client: | |
| await redis_client.lpush( | |
| NOTIFICATION_QUEUE_NAME, | |
| json.dumps({"notification_id": notification_id}), | |
| ) | |
| logger.info( | |
| "Notification queued", | |
| extra={ | |
| "notification_id": notification_id, | |
| "template": template_name, | |
| "channels": channels, | |
| "recipient": recipient, | |
| }, | |
| ) | |
| else: | |
| logger.warning( | |
| f"Redis unavailable β notification {notification_id} saved but not queued" | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error("Failed to queue notification", exc_info=e) | |
| return False | |
| # ββ Convenience helpers ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def send_otp( | |
| recipient: str, | |
| otp: str, | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| channels: Optional[List[NotificationChannel]] = None, | |
| template_name: Optional[str] = None, | |
| ) -> bool: | |
| """Queue OTP notification via WhatsApp + SMS.""" | |
| if channels is None: | |
| channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] | |
| return await NotificationQueue._enqueue( | |
| recipient=recipient, | |
| channels=[ch.value for ch in channels], | |
| template_name=template_name or settings.NOTIFICATION_TEMPLATE_OTP_VERIFICATION, | |
| template_data={ | |
| "otp": otp, | |
| "expiry_minutes": "5", | |
| "merchant_name": merchant_name, | |
| }, | |
| merchant_id=merchant_id, | |
| merchant_name=merchant_name, | |
| priority="critical", | |
| ) | |
| async def send_credentials( | |
| recipient: str, | |
| name: str, | |
| username: str, | |
| password: str, | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| channels: Optional[List[NotificationChannel]] = None, | |
| template_name: Optional[str] = None, | |
| ) -> bool: | |
| """Queue welcome credentials notification via WhatsApp + SMS. | |
| Uses the credentials template (default: otp) with password mapped to the | |
| 'otp' parameter for reliable delivery. Template name is configurable via | |
| NOTIFICATION_TEMPLATE_CREDENTIALS env var. | |
| """ | |
| if channels is None: | |
| channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] | |
| return await NotificationQueue._enqueue( | |
| recipient=recipient, | |
| channels=[ch.value for ch in channels], | |
| template_name=template_name or settings.NOTIFICATION_TEMPLATE_CREDENTIALS, | |
| template_data={ | |
| "otp": password, | |
| "expiry_minutes": "0", # not applicable for credentials | |
| "merchant_name": merchant_name, | |
| }, | |
| merchant_id=merchant_id, | |
| merchant_name=merchant_name, | |
| priority="high", | |
| ) | |
| async def send_password_reset( | |
| recipient: str, | |
| name: str, | |
| new_password: str, | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| channels: Optional[List[NotificationChannel]] = None, | |
| template_name: Optional[str] = None, | |
| ) -> bool: | |
| """Queue password reset notification via WhatsApp + SMS. | |
| Template name is configurable via NOTIFICATION_TEMPLATE_PASSWORD_RESET env var. | |
| """ | |
| if channels is None: | |
| channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] | |
| return await NotificationQueue._enqueue( | |
| recipient=recipient, | |
| channels=[ch.value for ch in channels], | |
| template_name=template_name or settings.NOTIFICATION_TEMPLATE_PASSWORD_RESET, | |
| template_data={ | |
| "otp": new_password, | |
| "expiry_minutes": "0", # not applicable for password reset | |
| "merchant_name": merchant_name, | |
| }, | |
| merchant_id=merchant_id, | |
| merchant_name=merchant_name, | |
| priority="high", | |
| ) | |
| async def send_custom( | |
| recipient: str, | |
| template_name: str, | |
| data: Dict[str, Any], | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| channels: Optional[List[NotificationChannel]] = None, | |
| ) -> bool: | |
| """Queue custom notification with any template and data.""" | |
| if channels is None: | |
| channels = [NotificationChannel.WHATSAPP] | |
| return await NotificationQueue._enqueue( | |
| recipient=recipient, | |
| channels=[ch.value for ch in channels], | |
| template_name=template_name, | |
| template_data=data, | |
| merchant_id=merchant_id, | |
| merchant_name=merchant_name, | |
| ) | |
| async def get_queue_size() -> int: | |
| """Get current queue size for monitoring.""" | |
| redis_client = get_redis() | |
| if not redis_client: | |
| return 0 | |
| try: | |
| return await redis_client.llen(NOTIFICATION_QUEUE_NAME) | |
| except Exception as e: | |
| logger.error("Error getting notification queue size", exc_info=e) | |
| return 0 | |