File size: 6,485 Bytes
2a952d3
 
 
 
 
4e94087
 
2a952d3
 
 
 
 
4e94087
2a952d3
4e94087
2a952d3
 
 
 
 
4e94087
2a952d3
 
 
4e94087
 
 
 
 
 
 
 
2a952d3
 
 
 
 
 
 
 
 
 
 
 
 
4e94087
0ece69a
4e94087
2a952d3
 
 
 
 
0ece69a
 
2a952d3
 
 
 
 
 
 
 
 
 
 
 
 
 
0ece69a
2a952d3
 
 
 
 
 
 
 
 
 
 
 
 
4e94087
 
 
 
2a952d3
 
17ba139
2a952d3
 
4e94087
2a952d3
 
 
 
 
 
17ba139
 
 
 
4e94087
17ba139
2a952d3
 
 
 
 
 
 
 
 
 
0ece69a
4e94087
 
2a952d3
4e94087
2a952d3
4e94087
2a952d3
 
 
4e94087
 
0ece69a
 
 
 
 
2a952d3
0ece69a
2a952d3
 
 
 
4e94087
2a952d3
4e94087
2a952d3
 
 
0ece69a
4e94087
 
2a952d3
4e94087
2a952d3
4e94087
2a952d3
 
 
4e94087
 
2a952d3
4e94087
 
0ece69a
2a952d3
 
0ece69a
2a952d3
 
 
 
 
 
 
 
 
0ece69a
4e94087
 
2a952d3
4e94087
2a952d3
4e94087
2a952d3
 
 
4e94087
 
2a952d3
e47349e
4e94087
0ece69a
2a952d3
 
0ece69a
2a952d3
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""
Notification queue utility for auth-ms.
Creates notification docs in MongoDB (notifications collection) and pushes
notification_id to Redis queue for the notification-ms worker to consume.

auth-ms does NOT handle WATI/SMTP directly β€” all delivery is delegated to
notification-ms which resolves per-merchant channel credentials.
"""
import json
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
from app.core.logging import get_logger
from app.core.config import settings
from app.cache import cache_service
from app.nosql import get_database

logger = get_logger(__name__)

NOTIFICATION_QUEUE_NAME = "notifications:queue"
COLLECTION = "notifications"


class NotificationChannel(str, Enum):
    """Supported notification channels."""
    WHATSAPP = "whatsapp"
    SMS = "sms"
    EMAIL = "email"
    PUSH = "push"


class NotificationQueue:
    """
    Enqueues notifications via MongoDB + Redis so notification-ms worker
    dispatches them through the correct channel providers using
    per-merchant credentials from scm_merchant_settings.
    """

    @staticmethod
    async def _enqueue(
        recipient: str,
        channels: List[str],
        template_name: str,
        template_data: Dict[str, Any],
        merchant_id: str = "",
        merchant_name: str = "",
        priority: str = "normal",
        source: str = "auth-ms",
        metadata: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """
        Create a notification document in MongoDB and push its ID to Redis.
        merchant_name is stored at doc level so the dispatcher can inject it
        into template_data at render time.
        """
        notification_id = str(uuid.uuid4())
        now = datetime.utcnow()

        doc = {
            "notification_id": notification_id,
            "recipient": recipient,
            "channels": channels,
            "template_name": template_name,
            "template_data": template_data,
            "priority": priority,
            "status": "queued",
            "source": source,
            "merchant_id": merchant_id,
            "merchant_name": merchant_name,
            "metadata": metadata or {},
            "channel_results": {},
            "retry_count": 0,
            "created_at": now,
            "updated_at": now,
            "sent_at": None,
        }

        try:
            db = get_database()
            await db[COLLECTION].insert_one(doc)

            redis_client = cache_service.get_client()
            redis_client.lpush(
                NOTIFICATION_QUEUE_NAME,
                json.dumps({"notification_id": notification_id}),
            )

            logger.info(
                "Notification queued",
                extra={
                    "notification_id": notification_id,
                    "template": template_name,
                    "channels": channels,
                    "recipient": recipient,
                },
            )
            return True

        except Exception:
            logger.error(
                "Notification queue failed",
                exc_info=True,
                extra={"template": template_name, "recipient": recipient},
            )
            return False

    # ── Convenience helpers ──────────────────────────────────────────────

    @staticmethod
    async def send_otp(
        recipient: str,
        otp: str,
        expiry_minutes: int = 5,
        merchant_id: str = "",
        merchant_name: str = "",
        channels: Optional[List[NotificationChannel]] = None,
        template_name: Optional[str] = None,
    ) -> bool:
        """Queue OTP notification. Delivery handled by notification-ms."""
        if channels is None:
            channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]

        return await NotificationQueue._enqueue(
            recipient=recipient,
            channels=[ch.value for ch in channels],
            template_name=template_name or settings.NOTIFICATION_TEMPLATE_OTP_VERIFICATION,
            template_data={
                "otp": otp,
                "expiry_minutes": str(expiry_minutes),
                "merchant_name": merchant_name,
            },
            merchant_id=merchant_id,
            merchant_name=merchant_name,
            priority="critical",
        )

    @staticmethod
    async def send_credentials(
        recipient: str,
        name: str,
        username: str,
        password: str,
        merchant_id: str = "",
        merchant_name: str = "",
        channels: Optional[List[NotificationChannel]] = None,
        template_name: Optional[str] = None,
    ) -> bool:
        """Queue welcome credentials notification. Delivery handled by notification-ms."""
        if channels is None:
            channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]

        return await NotificationQueue._enqueue(
            recipient=recipient,
            channels=[ch.value for ch in channels],
            template_name=template_name or settings.NOTIFICATION_TEMPLATE_CREDENTIALS,
            template_data={
                "otp": password,
                "expiry_minutes": "0",
                "merchant_name": merchant_name,
            },
            merchant_id=merchant_id,
            merchant_name=merchant_name,
            priority="high",
        )

    @staticmethod
    async def send_password_reset(
        recipient: str,
        name: str,
        new_password: str,
        merchant_id: str = "",
        merchant_name: str = "",
        channels: Optional[List[NotificationChannel]] = None,
        template_name: Optional[str] = None,
    ) -> bool:
        """Queue password reset notification. Delivery handled by notification-ms."""
        if channels is None:
            channels = [NotificationChannel.WHATSAPP, NotificationChannel.SMS]

        return await NotificationQueue._enqueue(
            recipient=recipient,
            channels=[ch.value for ch in channels],
            template_name=template_name or settings.NOTIFICATION_TEMPLATE_PASSWORD_RESET,
            template_data={
                "otp": new_password,
                "expiry_minutes": "0",
                "merchant_name": merchant_name,
            },
            merchant_id=merchant_id,
            merchant_name=merchant_name,
            priority="high",
        )