Spaces:
Sleeping
Sleeping
| """ | |
| Notification queue utility for auth-ms. | |
| Creates notification docs in MongoDB (notifications collection) and pushes | |
| notification_id to Redis queue for the notification-ms worker to consume. | |
| auth-ms does NOT handle WATI/SMTP directly β all delivery is delegated to | |
| notification-ms which resolves per-merchant channel credentials. | |
| """ | |
| import json | |
| import uuid | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List | |
| from enum import Enum | |
| from app.core.logging import get_logger | |
| from app.core.config import settings | |
| from app.cache import cache_service | |
| from app.nosql import get_database | |
| logger = get_logger(__name__) | |
| NOTIFICATION_QUEUE_NAME = "notifications:queue" | |
| COLLECTION = "notifications" | |
| class NotificationChannel(str, Enum): | |
| """Supported notification channels.""" | |
| WHATSAPP = "whatsapp" | |
| SMS = "sms" | |
| EMAIL = "email" | |
| PUSH = "push" | |
| class NotificationQueue: | |
| """ | |
| Enqueues notifications via MongoDB + Redis so notification-ms worker | |
| dispatches them through the correct channel providers using | |
| per-merchant credentials from scm_merchant_settings. | |
| """ | |
| async def _enqueue( | |
| recipient: str, | |
| channels: List[str], | |
| template_name: str, | |
| template_data: Dict[str, Any], | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| priority: str = "normal", | |
| source: str = "auth-ms", | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ) -> bool: | |
| """ | |
| Create a notification document in MongoDB and push its ID to Redis. | |
| merchant_name is stored at doc level so the dispatcher can inject it | |
| into template_data at render time. | |
| """ | |
| notification_id = str(uuid.uuid4()) | |
| now = datetime.utcnow() | |
| doc = { | |
| "notification_id": notification_id, | |
| "recipient": recipient, | |
| "channels": channels, | |
| "template_name": template_name, | |
| "template_data": template_data, | |
| "priority": priority, | |
| "status": "queued", | |
| "source": source, | |
| "merchant_id": merchant_id, | |
| "merchant_name": merchant_name, | |
| "metadata": metadata or {}, | |
| "channel_results": {}, | |
| "retry_count": 0, | |
| "created_at": now, | |
| "updated_at": now, | |
| "sent_at": None, | |
| } | |
| try: | |
| db = get_database() | |
| await db[COLLECTION].insert_one(doc) | |
| redis_client = cache_service.get_client() | |
| redis_client.lpush( | |
| NOTIFICATION_QUEUE_NAME, | |
| json.dumps({"notification_id": notification_id}), | |
| ) | |
| logger.info( | |
| "Notification queued", | |
| extra={ | |
| "notification_id": notification_id, | |
| "template": template_name, | |
| "channels": channels, | |
| "recipient": recipient, | |
| }, | |
| ) | |
| return True | |
| except Exception: | |
| logger.error( | |
| "Notification queue failed", | |
| exc_info=True, | |
| extra={"template": template_name, "recipient": recipient}, | |
| ) | |
| return False | |
| # ββ Convenience helpers ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def send_otp( | |
| recipient: str, | |
| otp: str, | |
| expiry_minutes: int = 5, | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| channels: Optional[List[NotificationChannel]] = None, | |
| template_name: Optional[str] = None, | |
| ) -> bool: | |
| """Queue OTP notification. Delivery handled by notification-ms.""" | |
| if channels is None: | |
| channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] | |
| return await NotificationQueue._enqueue( | |
| recipient=recipient, | |
| channels=[ch.value for ch in channels], | |
| template_name=template_name or settings.NOTIFICATION_TEMPLATE_OTP_VERIFICATION, | |
| template_data={ | |
| "otp": otp, | |
| "expiry_minutes": str(expiry_minutes), | |
| "merchant_name": merchant_name, | |
| }, | |
| merchant_id=merchant_id, | |
| merchant_name=merchant_name, | |
| priority="critical", | |
| ) | |
| async def send_credentials( | |
| recipient: str, | |
| name: str, | |
| username: str, | |
| password: str, | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| channels: Optional[List[NotificationChannel]] = None, | |
| template_name: Optional[str] = None, | |
| ) -> bool: | |
| """Queue welcome credentials notification. Delivery handled by notification-ms.""" | |
| if channels is None: | |
| channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] | |
| return await NotificationQueue._enqueue( | |
| recipient=recipient, | |
| channels=[ch.value for ch in channels], | |
| template_name=template_name or settings.NOTIFICATION_TEMPLATE_CREDENTIALS, | |
| template_data={ | |
| "otp": password, | |
| "expiry_minutes": "0", | |
| "merchant_name": merchant_name, | |
| }, | |
| merchant_id=merchant_id, | |
| merchant_name=merchant_name, | |
| priority="high", | |
| ) | |
| async def send_password_reset( | |
| recipient: str, | |
| name: str, | |
| new_password: str, | |
| merchant_id: str = "", | |
| merchant_name: str = "", | |
| channels: Optional[List[NotificationChannel]] = None, | |
| template_name: Optional[str] = None, | |
| ) -> bool: | |
| """Queue password reset notification. Delivery handled by notification-ms.""" | |
| if channels is None: | |
| channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS] | |
| return await NotificationQueue._enqueue( | |
| recipient=recipient, | |
| channels=[ch.value for ch in channels], | |
| template_name=template_name or settings.NOTIFICATION_TEMPLATE_PASSWORD_RESET, | |
| template_data={ | |
| "otp": new_password, | |
| "expiry_minutes": "0", | |
| "merchant_name": merchant_name, | |
| }, | |
| merchant_id=merchant_id, | |
| merchant_name=merchant_name, | |
| priority="high", | |
| ) | |