MukeshKapoor25 commited on
Commit
47fd0cd
·
1 Parent(s): 6879d73

refactor(notification-ms): enhance structured logging across all channels

Browse files

- Migrate from f-string logging to structured logging with extra context fields
- Add event identifiers to all log statements for better observability
- Include contextual metadata (merchant_id, template_name, recipient, etc.) in logs
- Replace exception string interpolation with exc_info=True for proper stack traces
- Update cache service logging to use structured format with operation-specific events
- Enhance email channel logging with recipient and template context
- Improve push notification logging with provider message IDs and merchant tracking
- Update SMS channel logging with provider request IDs and error details
- Add WhatsApp channel structured logging with message IDs and status tracking
- Update worker and dispatcher service logging for consistency
- Add logging configuration options to .env.example (LOG_FORMAT, LOG_DIR, LOG_MAX_BYTES, LOG_BACKUP_COUNT)
- Improves log parsing, filtering, and correlation for production monitoring

.env.example CHANGED
@@ -3,6 +3,10 @@ APP_NAME=Notification Microservice
3
  APP_VERSION=1.0.0
4
  DEBUG=false
5
  LOG_LEVEL=INFO
 
 
 
 
6
  ROOT_PATH=
7
 
8
  # MongoDB (shared DB — merchant settings are read from scm_merchant_settings collection)
 
3
  APP_VERSION=1.0.0
4
  DEBUG=false
5
  LOG_LEVEL=INFO
6
+ # LOG_FORMAT=json
7
+ # LOG_DIR=logs
8
+ # LOG_MAX_BYTES=52428800
9
+ # LOG_BACKUP_COUNT=10
10
  ROOT_PATH=
11
 
12
  # MongoDB (shared DB — merchant settings are read from scm_merchant_settings collection)
app/cache.py CHANGED
@@ -31,7 +31,15 @@ def get_redis() -> Optional[redis.Redis]:
31
  pool_params["password"] = settings.REDIS_PASSWORD
32
  pool = redis.ConnectionPool(**pool_params)
33
  _redis_client = redis.Redis(connection_pool=pool)
34
- logger.info(f"Redis connected ({settings.REDIS_HOST}:{settings.REDIS_PORT})")
 
 
 
 
 
 
 
 
35
  return _redis_client
36
 
37
 
@@ -49,8 +57,8 @@ class CacheService:
49
  try:
50
  v = json.dumps(value) if not isinstance(value, str) else value
51
  return self.client.setex(key, ttl, v)
52
- except Exception as e:
53
- logger.error(f"Cache set error: {e}")
54
  return False
55
 
56
  async def get(self, key: str) -> Optional[Any]:
@@ -62,15 +70,19 @@ class CacheService:
62
  except json.JSONDecodeError:
63
  return value
64
  return None
65
- except Exception as e:
66
- logger.error(f"Cache get error: {e}")
67
  return None
68
 
69
  async def delete(self, key: str) -> bool:
70
  try:
71
  return self.client.delete(key) > 0
72
- except Exception as e:
73
- logger.error(f"Cache delete error: {e}")
 
 
 
 
74
  return False
75
 
76
 
 
31
  pool_params["password"] = settings.REDIS_PASSWORD
32
  pool = redis.ConnectionPool(**pool_params)
33
  _redis_client = redis.Redis(connection_pool=pool)
34
+ logger.info(
35
+ "Redis connected",
36
+ extra={
37
+ "event": "redis_connected",
38
+ "redis_host": settings.REDIS_HOST,
39
+ "redis_port": settings.REDIS_PORT,
40
+ "redis_db": settings.REDIS_DB,
41
+ },
42
+ )
43
  return _redis_client
44
 
45
 
 
57
  try:
58
  v = json.dumps(value) if not isinstance(value, str) else value
59
  return self.client.setex(key, ttl, v)
60
+ except Exception:
61
+ logger.error("Cache set failed", exc_info=True, extra={"event": "cache_set_failure", "key": key})
62
  return False
63
 
64
  async def get(self, key: str) -> Optional[Any]:
 
70
  except json.JSONDecodeError:
71
  return value
72
  return None
73
+ except Exception:
74
+ logger.error("Cache get failed", exc_info=True, extra={"event": "cache_get_failure", "key": key})
75
  return None
76
 
77
  async def delete(self, key: str) -> bool:
78
  try:
79
  return self.client.delete(key) > 0
80
+ except Exception:
81
+ logger.error(
82
+ "Cache delete failed",
83
+ exc_info=True,
84
+ extra={"event": "cache_delete_failure", "key": key},
85
+ )
86
  return False
87
 
88
 
app/channels/email.py CHANGED
@@ -81,11 +81,35 @@ class EmailChannel:
81
  start_tls=use_tls,
82
  timeout=30,
83
  )
84
- logger.info(f"Email sent to {recipient}, subject={subject}")
 
 
 
 
 
 
 
 
85
  return True, "Email sent", None
86
  except aiosmtplib.SMTPException as e:
87
- logger.error(f"SMTP error sending to {recipient}: {e}")
 
 
 
 
 
 
 
 
88
  return False, f"SMTP error: {e}", None
89
  except Exception as e:
90
- logger.error(f"Email error: {e}", exc_info=True)
 
 
 
 
 
 
 
 
91
  return False, f"Email error: {e}", None
 
81
  start_tls=use_tls,
82
  timeout=30,
83
  )
84
+ logger.info(
85
+ "Email sent",
86
+ extra={
87
+ "event": "email_send_success",
88
+ "recipient": recipient,
89
+ "template_name": template_name,
90
+ "subject": subject,
91
+ },
92
+ )
93
  return True, "Email sent", None
94
  except aiosmtplib.SMTPException as e:
95
+ logger.error(
96
+ "Email SMTP error",
97
+ extra={
98
+ "event": "email_send_failure",
99
+ "recipient": recipient,
100
+ "template_name": template_name,
101
+ "error": str(e),
102
+ },
103
+ )
104
  return False, f"SMTP error: {e}", None
105
  except Exception as e:
106
+ logger.error(
107
+ "Email send failed",
108
+ exc_info=True,
109
+ extra={
110
+ "event": "email_send_failure",
111
+ "recipient": recipient,
112
+ "template_name": template_name,
113
+ },
114
+ )
115
  return False, f"Email error: {e}", None
app/channels/push.py CHANGED
@@ -26,10 +26,17 @@ def _get_firebase_app(merchant_id: str, service_account_json: str):
26
  cred = credentials.Certificate(cred_dict)
27
  app = firebase_admin.initialize_app(cred, name=f"merchant_{merchant_id}")
28
  _firebase_apps[merchant_id] = app
29
- logger.info(f"Firebase app initialized for merchant {merchant_id}")
 
 
 
30
  return app
31
- except Exception as e:
32
- logger.error(f"Firebase init error for merchant {merchant_id}: {e}", exc_info=True)
 
 
 
 
33
  return None
34
 
35
 
@@ -82,9 +89,26 @@ class PushChannel:
82
  response = await loop.run_in_executor(
83
  None, lambda: messaging.send(message, app=app)
84
  )
85
- logger.info(f"Push sent to token {recipient[:20]}..., response={response}")
 
 
 
 
 
 
 
 
 
86
  return True, "Push notification sent", response
87
 
88
  except Exception as e:
89
- logger.error(f"Push error: {e}", exc_info=True)
 
 
 
 
 
 
 
 
90
  return False, f"Push error: {e}", None
 
26
  cred = credentials.Certificate(cred_dict)
27
  app = firebase_admin.initialize_app(cred, name=f"merchant_{merchant_id}")
28
  _firebase_apps[merchant_id] = app
29
+ logger.info(
30
+ "Firebase app initialized",
31
+ extra={"event": "firebase_init_success", "merchant_id": merchant_id},
32
+ )
33
  return app
34
+ except Exception:
35
+ logger.error(
36
+ "Firebase init failed",
37
+ exc_info=True,
38
+ extra={"event": "firebase_init_failure", "merchant_id": merchant_id},
39
+ )
40
  return None
41
 
42
 
 
89
  response = await loop.run_in_executor(
90
  None, lambda: messaging.send(message, app=app)
91
  )
92
+ logger.info(
93
+ "Push sent",
94
+ extra={
95
+ "event": "push_send_success",
96
+ "recipient_prefix": recipient[:20],
97
+ "template_name": template_name,
98
+ "provider_message_id": response,
99
+ "merchant_id": merchant_id,
100
+ },
101
+ )
102
  return True, "Push notification sent", response
103
 
104
  except Exception as e:
105
+ logger.error(
106
+ "Push send failed",
107
+ exc_info=True,
108
+ extra={
109
+ "event": "push_send_failure",
110
+ "template_name": template_name,
111
+ "merchant_id": merchant_id,
112
+ },
113
+ )
114
  return False, f"Push error: {e}", None
app/channels/sms.py CHANGED
@@ -85,12 +85,26 @@ class SMSChannel:
85
  if resp.status_code == 200 and data.get("return"):
86
  request_id = data.get("request_id", "")
87
  logger.info(
88
- f"SMS sent to {numbers}, request_id={request_id}"
 
 
 
 
 
 
89
  )
90
  return True, "SMS sent via Fast2SMS", request_id
91
 
92
  error_msg = data.get("message", "Unknown Fast2SMS error")
93
- logger.error(f"Fast2SMS error for {numbers}: {error_msg}")
 
 
 
 
 
 
 
 
94
  return False, f"Fast2SMS error: {error_msg}", None
95
 
96
  except httpx.TimeoutException:
@@ -98,5 +112,13 @@ class SMSChannel:
98
  except httpx.RequestError as e:
99
  return False, f"Fast2SMS network error: {e}", None
100
  except Exception as e:
101
- logger.error(f"SMS error to {numbers}: {e}", exc_info=True)
 
 
 
 
 
 
 
 
102
  return False, f"SMS error: {e}", None
 
85
  if resp.status_code == 200 and data.get("return"):
86
  request_id = data.get("request_id", "")
87
  logger.info(
88
+ "SMS sent",
89
+ extra={
90
+ "event": "sms_send_success",
91
+ "recipient": numbers,
92
+ "provider_request_id": request_id,
93
+ "template_name": template_name,
94
+ },
95
  )
96
  return True, "SMS sent via Fast2SMS", request_id
97
 
98
  error_msg = data.get("message", "Unknown Fast2SMS error")
99
+ logger.error(
100
+ "SMS provider returned error",
101
+ extra={
102
+ "event": "sms_send_failure",
103
+ "recipient": numbers,
104
+ "template_name": template_name,
105
+ "provider_error": error_msg,
106
+ },
107
+ )
108
  return False, f"Fast2SMS error: {error_msg}", None
109
 
110
  except httpx.TimeoutException:
 
112
  except httpx.RequestError as e:
113
  return False, f"Fast2SMS network error: {e}", None
114
  except Exception as e:
115
+ logger.error(
116
+ "SMS send failed",
117
+ exc_info=True,
118
+ extra={
119
+ "event": "sms_send_failure",
120
+ "recipient": numbers,
121
+ "template_name": template_name,
122
+ },
123
+ )
124
  return False, f"SMS error: {e}", None
app/channels/whatsapp.py CHANGED
@@ -83,14 +83,31 @@ class WhatsAppChannel:
83
  r = receivers[0]
84
  mid = r.get("localMessageId")
85
  if r.get("isValidWhatsAppNumber") and not r.get("errors"):
86
- logger.info(f"WhatsApp sent to {whatsapp_number}, mid={mid}")
 
 
 
 
 
 
 
 
87
  return True, "Sent via WhatsApp", mid
88
  err = (r.get("errors") or ["Invalid WhatsApp number"])[0]
89
  return False, f"WhatsApp error: {err}", None
90
  return False, "No receivers in WATI response", None
91
  return False, f"WATI error: {data.get('error', 'Unknown')}", None
92
 
93
- logger.error(f"WATI HTTP {resp.status_code}: {resp.text[:200]}")
 
 
 
 
 
 
 
 
 
94
  if resp.status_code == 401:
95
  return False, "WATI credentials invalid or expired", None
96
  if resp.status_code == 403:
@@ -101,8 +118,16 @@ class WhatsAppChannel:
101
  return False, "WhatsApp request timeout", None
102
  except httpx.RequestError as e:
103
  return False, f"WhatsApp network error: {e}", None
104
- except Exception as e:
105
- logger.error(f"WhatsApp unexpected error: {e}", exc_info=True)
 
 
 
 
 
 
 
 
106
  return False, "WhatsApp internal error", None
107
 
108
  @staticmethod
@@ -134,6 +159,10 @@ class WhatsAppChannel:
134
  d = resp.json()
135
  return True, d.get("optedIn", False), d.get("id")
136
  return False, False, None
137
- except Exception as e:
138
- logger.error(f"Opt-in check error: {e}")
 
 
 
 
139
  return False, False, None
 
83
  r = receivers[0]
84
  mid = r.get("localMessageId")
85
  if r.get("isValidWhatsAppNumber") and not r.get("errors"):
86
+ logger.info(
87
+ "WhatsApp sent",
88
+ extra={
89
+ "event": "whatsapp_send_success",
90
+ "recipient": whatsapp_number,
91
+ "template_name": template_name,
92
+ "provider_message_id": mid,
93
+ },
94
+ )
95
  return True, "Sent via WhatsApp", mid
96
  err = (r.get("errors") or ["Invalid WhatsApp number"])[0]
97
  return False, f"WhatsApp error: {err}", None
98
  return False, "No receivers in WATI response", None
99
  return False, f"WATI error: {data.get('error', 'Unknown')}", None
100
 
101
+ logger.error(
102
+ "WhatsApp provider HTTP error",
103
+ extra={
104
+ "event": "whatsapp_send_failure",
105
+ "status_code": resp.status_code,
106
+ "response_excerpt": resp.text[:200],
107
+ "recipient": whatsapp_number,
108
+ "template_name": template_name,
109
+ },
110
+ )
111
  if resp.status_code == 401:
112
  return False, "WATI credentials invalid or expired", None
113
  if resp.status_code == 403:
 
118
  return False, "WhatsApp request timeout", None
119
  except httpx.RequestError as e:
120
  return False, f"WhatsApp network error: {e}", None
121
+ except Exception:
122
+ logger.error(
123
+ "WhatsApp send failed",
124
+ exc_info=True,
125
+ extra={
126
+ "event": "whatsapp_send_failure",
127
+ "recipient": whatsapp_number,
128
+ "template_name": template_name,
129
+ },
130
+ )
131
  return False, "WhatsApp internal error", None
132
 
133
  @staticmethod
 
159
  d = resp.json()
160
  return True, d.get("optedIn", False), d.get("id")
161
  return False, False, None
162
+ except Exception:
163
+ logger.error(
164
+ "WhatsApp opt-in check failed",
165
+ exc_info=True,
166
+ extra={"event": "whatsapp_optin_check_failure", "recipient": number},
167
+ )
168
  return False, False, None
app/controllers/router.py CHANGED
@@ -40,7 +40,15 @@ async def send_notification(payload: SendNotificationRequest):
40
  channels=doc["channels"],
41
  )
42
  except Exception as e:
43
- logger.error(f"Send error: {e}", exc_info=True)
 
 
 
 
 
 
 
 
44
  raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
45
 
46
 
 
40
  channels=doc["channels"],
41
  )
42
  except Exception as e:
43
+ logger.error(
44
+ "Send notification failed",
45
+ exc_info=True,
46
+ extra={
47
+ "event": "notification_create_failure",
48
+ "merchant_id": payload.merchant_id,
49
+ "template_name": payload.template_name,
50
+ },
51
+ )
52
  raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
53
 
54
 
app/core/logging.py CHANGED
@@ -1,95 +1,151 @@
1
- """
2
- Structured logging for Notification Microservice.
3
- """
4
  import logging
5
  import logging.handlers
6
- import json
7
  import sys
 
 
8
  from pathlib import Path
9
- from datetime import datetime
 
 
 
 
 
 
10
 
11
 
12
  class JSONFormatter(logging.Formatter):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  def format(self, record: logging.LogRecord) -> str:
14
- log_data = {
15
- "timestamp": datetime.utcnow().isoformat(),
16
  "level": record.levelname,
17
  "logger": record.name,
18
  "message": record.getMessage(),
19
- "module": record.module,
20
- "function": record.funcName,
21
- "line": record.lineno,
22
  }
23
- if hasattr(record, "correlation_id") and record.correlation_id:
24
- log_data["correlation_id"] = record.correlation_id
25
- if record.exc_info:
26
- log_data["exception"] = self.formatException(record.exc_info)
27
- return json.dumps(log_data, default=str)
28
 
 
 
29
 
30
- class StructuredLogger:
31
- def __init__(self, name: str):
32
- self.logger = logging.getLogger(name)
33
 
34
- def _log(self, level: int, message: str, extra=None, exc_info=False):
35
- if extra:
36
- self.logger.log(level, message, extra={"extra": extra}, exc_info=exc_info)
37
- else:
38
- self.logger.log(level, message, exc_info=exc_info)
 
 
39
 
40
- def debug(self, message, extra=None):
41
- self._log(logging.DEBUG, message, extra)
42
 
43
- def info(self, message, extra=None):
44
- self._log(logging.INFO, message, extra)
45
 
46
- def warning(self, message, extra=None):
47
- self._log(logging.WARNING, message, extra)
48
 
49
- def error(self, message, extra=None, exc_info=False):
50
- self._log(logging.ERROR, message, extra, exc_info)
51
 
52
- def critical(self, message, extra=None, exc_info=False):
53
- self._log(logging.CRITICAL, message, extra, exc_info)
 
 
 
 
 
 
 
 
 
54
 
55
 
56
- def setup_logging(log_level: str = "INFO", log_dir: str = "logs"):
57
- log_path = Path(log_dir)
58
- log_path.mkdir(exist_ok=True)
 
59
 
60
- root_logger = logging.getLogger()
61
- root_logger.setLevel(getattr(logging, log_level.upper()))
62
- root_logger.handlers = []
63
 
64
- console_fmt = logging.Formatter(
65
- fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
66
- datefmt="%Y-%m-%d %H:%M:%S",
67
- )
68
- json_fmt = JSONFormatter()
69
 
70
- console = logging.StreamHandler(sys.stderr)
71
- console.setLevel(getattr(logging, log_level.upper()))
72
- console.setFormatter(console_fmt)
73
- root_logger.addHandler(console)
74
 
75
- file_all = logging.handlers.RotatingFileHandler(
76
- filename=log_path / "app.log", maxBytes=10 * 1024 * 1024, backupCount=10
 
 
 
 
77
  )
78
- file_all.setLevel(logging.DEBUG)
79
- file_all.setFormatter(json_fmt)
80
- root_logger.addHandler(file_all)
81
-
82
- file_err = logging.handlers.RotatingFileHandler(
83
- filename=log_path / "app_errors.log", maxBytes=10 * 1024 * 1024, backupCount=10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  )
85
- file_err.setLevel(logging.ERROR)
86
- file_err.setFormatter(json_fmt)
87
- root_logger.addHandler(file_err)
88
-
89
- logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
90
- logging.getLogger("motor").setLevel(logging.WARNING)
91
- logging.getLogger("httpx").setLevel(logging.WARNING)
92
 
93
 
94
- def get_logger(name: str) -> StructuredLogger:
95
- return StructuredLogger(name)
 
 
1
+ """Production-grade logging configuration for Notification Microservice."""
2
+ import json
 
3
  import logging
4
  import logging.handlers
5
+ import os
6
  import sys
7
+ import traceback
8
+ from datetime import datetime, timezone
9
  from pathlib import Path
10
+ from typing import Any, Dict
11
+
12
+
13
+ SERVICE_NAME = "notification-ms"
14
+ LOG_DIR = Path(os.getenv("LOG_DIR", "logs"))
15
+ LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", 50 * 1024 * 1024))
16
+ LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", "10"))
17
 
18
 
19
  class JSONFormatter(logging.Formatter):
20
+ """Emit log records as single-line JSON objects."""
21
+
22
+ RESERVED = frozenset(
23
+ {
24
+ "args",
25
+ "created",
26
+ "exc_info",
27
+ "exc_text",
28
+ "filename",
29
+ "funcName",
30
+ "levelname",
31
+ "levelno",
32
+ "lineno",
33
+ "message",
34
+ "module",
35
+ "msecs",
36
+ "msg",
37
+ "name",
38
+ "pathname",
39
+ "process",
40
+ "processName",
41
+ "relativeCreated",
42
+ "stack_info",
43
+ "thread",
44
+ "threadName",
45
+ }
46
+ )
47
+
48
  def format(self, record: logging.LogRecord) -> str:
49
+ payload: Dict[str, Any] = {
50
+ "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
51
  "level": record.levelname,
52
  "logger": record.name,
53
  "message": record.getMessage(),
54
+ "service": SERVICE_NAME,
55
+ "pid": record.process,
 
56
  }
 
 
 
 
 
57
 
58
+ if record.levelno >= logging.WARNING:
59
+ payload["caller"] = f"{record.pathname}:{record.lineno}"
60
 
61
+ for key, val in record.__dict__.items():
62
+ if key not in self.RESERVED and not key.startswith("_"):
63
+ payload[key] = val
64
 
65
+ if record.exc_info and record.exc_info[0] is not None:
66
+ exc_type, exc_value, exc_tb = record.exc_info
67
+ payload["exception"] = {
68
+ "type": exc_type.__name__,
69
+ "message": str(exc_value),
70
+ "stacktrace": traceback.format_exception(exc_type, exc_value, exc_tb),
71
+ }
72
 
73
+ return json.dumps(payload, default=str)
 
74
 
 
 
75
 
76
+ class ConsoleFormatter(logging.Formatter):
77
+ """Readable formatter for local development."""
78
 
79
+ FMT = "%(asctime)s %(levelname)-8s %(name)s — %(message)s"
 
80
 
81
+ def format(self, record: logging.LogRecord) -> str:
82
+ formatter = logging.Formatter(self.FMT, datefmt="%Y-%m-%d %H:%M:%S")
83
+ result = formatter.format(record)
84
+ extras = {
85
+ k: v
86
+ for k, v in record.__dict__.items()
87
+ if k not in JSONFormatter.RESERVED and not k.startswith("_") and k != "color_message"
88
+ }
89
+ if extras:
90
+ result += f" {extras}"
91
+ return result
92
 
93
 
94
+ def setup_logging(log_level: str = "INFO") -> None:
95
+ """Configure root logger with JSON/console and rotating file handlers."""
96
+ numeric_level = getattr(logging, log_level.upper(), logging.INFO)
97
+ log_format = os.getenv("LOG_FORMAT", "json").lower()
98
 
99
+ LOG_DIR.mkdir(parents=True, exist_ok=True)
 
 
100
 
101
+ root = logging.getLogger()
102
+ root.setLevel(numeric_level)
103
+ root.handlers.clear()
 
 
104
 
105
+ stdout_handler = logging.StreamHandler(sys.stdout)
106
+ stdout_handler.setLevel(numeric_level)
107
+ stdout_handler.setFormatter(ConsoleFormatter() if log_format == "console" else JSONFormatter())
108
+ root.addHandler(stdout_handler)
109
 
110
+ app_log_path = LOG_DIR / "app.log"
111
+ file_handler = logging.handlers.RotatingFileHandler(
112
+ filename=app_log_path,
113
+ maxBytes=LOG_MAX_BYTES,
114
+ backupCount=LOG_BACKUP_COUNT,
115
+ encoding="utf-8",
116
  )
117
+ file_handler.setLevel(numeric_level)
118
+ file_handler.setFormatter(JSONFormatter())
119
+ root.addHandler(file_handler)
120
+
121
+ error_log_path = LOG_DIR / "error.log"
122
+ error_handler = logging.handlers.RotatingFileHandler(
123
+ filename=error_log_path,
124
+ maxBytes=LOG_MAX_BYTES,
125
+ backupCount=LOG_BACKUP_COUNT,
126
+ encoding="utf-8",
127
+ )
128
+ error_handler.setLevel(logging.ERROR)
129
+ error_handler.setFormatter(JSONFormatter())
130
+ root.addHandler(error_handler)
131
+
132
+ for noisy in ("motor", "pymongo", "httpx", "uvicorn.access", "aioredis"):
133
+ logging.getLogger(noisy).setLevel(logging.WARNING)
134
+
135
+ startup_logger = logging.getLogger(SERVICE_NAME)
136
+ startup_logger.info(
137
+ "Logging initialised",
138
+ extra={
139
+ "event": "logging_init",
140
+ "log_level": log_level.upper(),
141
+ "log_format": log_format,
142
+ "log_dir": str(LOG_DIR.resolve()),
143
+ "app_log": str(app_log_path.resolve()),
144
+ "error_log": str(error_log_path.resolve()),
145
+ },
146
  )
 
 
 
 
 
 
 
147
 
148
 
149
+ def get_logger(name: str) -> logging.Logger:
150
+ """Return a standard logger for the given module name."""
151
+ return logging.getLogger(name)
app/main.py CHANGED
@@ -25,15 +25,15 @@ _worker = NotificationWorker()
25
 
26
  @asynccontextmanager
27
  async def lifespan(app: FastAPI):
28
- logger.info("Starting Notification Microservice")
29
  await connect_to_mongo()
30
  await _worker.start()
31
- logger.info("Notification Microservice ready")
32
  yield
33
- logger.info("Shutting down Notification Microservice")
34
  await _worker.stop()
35
  await close_mongo_connection()
36
- logger.info("Notification Microservice stopped")
37
 
38
 
39
  app = FastAPI(
@@ -53,18 +53,33 @@ app.add_middleware(CorrelationIdMiddleware)
53
  async def log_requests(request: Request, call_next):
54
  correlation_id = get_correlation_id()
55
  start = time.time()
56
- logger.info(f"→ {request.method} {request.url.path}", extra={"correlation_id": correlation_id})
57
  try:
58
  response = await call_next(request)
59
- elapsed = time.time() - start
60
  logger.info(
61
- f" {request.method} {request.url.path} {response.status_code} ({elapsed:.3f}s)",
62
- extra={"correlation_id": correlation_id},
 
 
 
 
 
 
 
63
  )
64
- response.headers["X-Process-Time"] = f"{elapsed:.3f}"
65
  return response
66
- except Exception as e:
67
- logger.error(f"Request failed: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
68
  raise
69
 
70
 
@@ -92,7 +107,16 @@ async def validation_handler(request: Request, exc: RequestValidationError):
92
  @app.exception_handler(Exception)
93
  async def general_handler(request: Request, exc: Exception):
94
  cid = get_correlation_id()
95
- logger.error(f"Unhandled: {exc}", exc_info=True, extra={"correlation_id": cid})
 
 
 
 
 
 
 
 
 
96
  return JSONResponse(
97
  status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
98
  content={"success": False, "error": "Internal Server Error", "correlation_id": cid},
 
25
 
26
  @asynccontextmanager
27
  async def lifespan(app: FastAPI):
28
+ logger.info("Service starting", extra={"event": "service_starting"})
29
  await connect_to_mongo()
30
  await _worker.start()
31
+ logger.info("Service ready", extra={"event": "service_ready"})
32
  yield
33
+ logger.info("Service stopping", extra={"event": "service_stopping"})
34
  await _worker.stop()
35
  await close_mongo_connection()
36
+ logger.info("Service stopped", extra={"event": "service_stopped"})
37
 
38
 
39
  app = FastAPI(
 
53
  async def log_requests(request: Request, call_next):
54
  correlation_id = get_correlation_id()
55
  start = time.time()
 
56
  try:
57
  response = await call_next(request)
58
+ duration_ms = round((time.time() - start) * 1000, 2)
59
  logger.info(
60
+ "HTTP request",
61
+ extra={
62
+ "event": "http_request",
63
+ "correlation_id": correlation_id,
64
+ "method": request.method,
65
+ "path": request.url.path,
66
+ "status_code": response.status_code,
67
+ "duration_ms": duration_ms,
68
+ },
69
  )
70
+ response.headers["X-Process-Time"] = str(round(duration_ms / 1000, 3))
71
  return response
72
+ except Exception:
73
+ logger.error(
74
+ "HTTP request failed",
75
+ exc_info=True,
76
+ extra={
77
+ "event": "http_request_failure",
78
+ "correlation_id": correlation_id,
79
+ "method": request.method,
80
+ "path": request.url.path,
81
+ },
82
+ )
83
  raise
84
 
85
 
 
107
  @app.exception_handler(Exception)
108
  async def general_handler(request: Request, exc: Exception):
109
  cid = get_correlation_id()
110
+ logger.error(
111
+ "Unhandled exception",
112
+ exc_info=True,
113
+ extra={
114
+ "event": "unhandled_exception",
115
+ "correlation_id": cid,
116
+ "method": request.method,
117
+ "path": request.url.path,
118
+ },
119
+ )
120
  return JSONResponse(
121
  status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
122
  content={"success": False, "error": "Internal Server Error", "correlation_id": cid},
app/nosql.py CHANGED
@@ -13,19 +13,32 @@ _database: AsyncIOMotorDatabase = None
13
 
14
  async def connect_to_mongo():
15
  global _client, _database
16
- logger.info(f"Connecting to MongoDB: {settings.MONGODB_DB_NAME}")
17
- _client = AsyncIOMotorClient(settings.MONGODB_URI)
18
- _database = _client[settings.MONGODB_DB_NAME]
19
- # Verify connection
20
- await _client.admin.command("ping")
21
- logger.info("MongoDB connected successfully")
 
 
 
 
 
 
 
 
 
 
 
 
 
22
 
23
 
24
  async def close_mongo_connection():
25
  global _client
26
  if _client:
27
  _client.close()
28
- logger.info("MongoDB connection closed")
29
 
30
 
31
  def get_database() -> AsyncIOMotorDatabase:
 
13
 
14
  async def connect_to_mongo():
15
  global _client, _database
16
+ logger.info(
17
+ "Connecting to MongoDB",
18
+ extra={"event": "mongodb_connect_start", "db_name": settings.MONGODB_DB_NAME},
19
+ )
20
+ try:
21
+ _client = AsyncIOMotorClient(settings.MONGODB_URI)
22
+ _database = _client[settings.MONGODB_DB_NAME]
23
+ await _client.admin.command("ping")
24
+ logger.info(
25
+ "MongoDB connected",
26
+ extra={"event": "mongodb_connected", "db_name": settings.MONGODB_DB_NAME},
27
+ )
28
+ except Exception:
29
+ logger.error(
30
+ "MongoDB connection failed",
31
+ exc_info=True,
32
+ extra={"event": "mongodb_connect_failure", "db_name": settings.MONGODB_DB_NAME},
33
+ )
34
+ raise
35
 
36
 
37
  async def close_mongo_connection():
38
  global _client
39
  if _client:
40
  _client.close()
41
+ logger.info("MongoDB connection closed", extra={"event": "mongodb_disconnected"})
42
 
43
 
44
  def get_database() -> AsyncIOMotorDatabase:
app/services/dispatcher.py CHANGED
@@ -117,7 +117,10 @@ class NotificationDispatcher:
117
  }
118
  },
119
  )
120
- logger.error(f"Notification {nid} failed: missing merchant_id")
 
 
 
121
  doc["status"] = NotificationStatus.FAILED.value
122
  return doc
123
 
@@ -136,7 +139,14 @@ class NotificationDispatcher:
136
  }
137
  },
138
  )
139
- logger.error(f"Notification {nid} failed: no merchant settings for {merchant_id}")
 
 
 
 
 
 
 
140
  doc["status"] = NotificationStatus.FAILED.value
141
  return doc
142
 
@@ -206,7 +216,16 @@ class NotificationDispatcher:
206
  elif channel_enum == NotificationChannel.WHATSAPP:
207
  # Use merchant-mapped WATI template name if available, else fall back to doc template_name
208
  wa_template_name = tpl_mapping.get("whatsapp_template_name") or doc["template_name"]
209
- logger.info(f"WhatsApp template resolved: {doc['template_name']} → {wa_template_name}")
 
 
 
 
 
 
 
 
 
210
  success, message, provider_id = await WhatsAppChannel.send(
211
  recipient=doc["recipient"],
212
  template_name=wa_template_name,
@@ -227,7 +246,16 @@ class NotificationDispatcher:
227
  elif channel_enum == NotificationChannel.SMS:
228
  # sms_template_id is the name of the template in notification_templates collection
229
  sms_tpl_name = tpl_mapping.get("sms_template_id") or doc["template_name"]
230
- logger.info(f"SMS template resolved: {doc['template_name']} → {sms_tpl_name}")
 
 
 
 
 
 
 
 
 
231
  resolved_data = await NotificationDispatcher._resolve_template(
232
  sms_tpl_name, "sms", raw_data
233
  )
@@ -267,7 +295,15 @@ class NotificationDispatcher:
267
 
268
  doc["status"] = final_status
269
  doc["channel_results"] = channel_results
270
- logger.info(f"Notification {nid} dispatched: status={final_status}")
 
 
 
 
 
 
 
 
271
  return doc
272
 
273
  @staticmethod
 
117
  }
118
  },
119
  )
120
+ logger.error(
121
+ "Dispatch failed: missing merchant_id",
122
+ extra={"event": "notification_dispatch_failure", "notification_id": nid},
123
+ )
124
  doc["status"] = NotificationStatus.FAILED.value
125
  return doc
126
 
 
139
  }
140
  },
141
  )
142
+ logger.error(
143
+ "Dispatch failed: merchant settings not found",
144
+ extra={
145
+ "event": "notification_dispatch_failure",
146
+ "notification_id": nid,
147
+ "merchant_id": merchant_id,
148
+ },
149
+ )
150
  doc["status"] = NotificationStatus.FAILED.value
151
  return doc
152
 
 
216
  elif channel_enum == NotificationChannel.WHATSAPP:
217
  # Use merchant-mapped WATI template name if available, else fall back to doc template_name
218
  wa_template_name = tpl_mapping.get("whatsapp_template_name") or doc["template_name"]
219
+ logger.info(
220
+ "WhatsApp template resolved",
221
+ extra={
222
+ "event": "template_resolve_success",
223
+ "notification_id": nid,
224
+ "channel": "whatsapp",
225
+ "template_name": doc["template_name"],
226
+ "resolved_template_name": wa_template_name,
227
+ },
228
+ )
229
  success, message, provider_id = await WhatsAppChannel.send(
230
  recipient=doc["recipient"],
231
  template_name=wa_template_name,
 
246
  elif channel_enum == NotificationChannel.SMS:
247
  # sms_template_id is the name of the template in notification_templates collection
248
  sms_tpl_name = tpl_mapping.get("sms_template_id") or doc["template_name"]
249
+ logger.info(
250
+ "SMS template resolved",
251
+ extra={
252
+ "event": "template_resolve_success",
253
+ "notification_id": nid,
254
+ "channel": "sms",
255
+ "template_name": doc["template_name"],
256
+ "resolved_template_name": sms_tpl_name,
257
+ },
258
+ )
259
  resolved_data = await NotificationDispatcher._resolve_template(
260
  sms_tpl_name, "sms", raw_data
261
  )
 
295
 
296
  doc["status"] = final_status
297
  doc["channel_results"] = channel_results
298
+ logger.info(
299
+ "Notification dispatched",
300
+ extra={
301
+ "event": "notification_dispatch_success" if any_success else "notification_dispatch_failure",
302
+ "notification_id": nid,
303
+ "merchant_id": merchant_id,
304
+ "status": final_status,
305
+ },
306
+ )
307
  return doc
308
 
309
  @staticmethod
app/services/notification_service.py CHANGED
@@ -35,9 +35,23 @@ class NotificationService:
35
  r = get_redis()
36
  if r:
37
  r.lpush(QUEUE_NAME, json.dumps({"notification_id": doc["notification_id"]}))
38
- logger.info(f"Notification {doc['notification_id']} queued")
 
 
 
 
 
 
 
39
  else:
40
- logger.warning(f"Redis unavailable — notification {doc['notification_id']} saved but not queued")
 
 
 
 
 
 
 
41
 
42
  return doc
43
 
@@ -55,7 +69,7 @@ class NotificationService:
55
  results.append(doc)
56
  queued += 1
57
  except Exception as e:
58
- logger.error(f"Batch queue error: {e}")
59
  failed += 1
60
  results.append({"error": str(e), "recipient": req.recipient})
61
  return queued, failed, results
@@ -122,5 +136,14 @@ class NotificationService:
122
  r = get_redis()
123
  if r:
124
  r.lpush(QUEUE_NAME, json.dumps({"notification_id": notification_id}))
 
 
 
 
 
 
 
 
 
125
 
126
  return await NotificationService.get_notification(notification_id)
 
35
  r = get_redis()
36
  if r:
37
  r.lpush(QUEUE_NAME, json.dumps({"notification_id": doc["notification_id"]}))
38
+ logger.info(
39
+ "Notification queued",
40
+ extra={
41
+ "event": "enqueued",
42
+ "notification_id": doc["notification_id"],
43
+ "queue": QUEUE_NAME,
44
+ },
45
+ )
46
  else:
47
+ logger.warning(
48
+ "Notification saved but queue unavailable",
49
+ extra={
50
+ "event": "enqueue_skipped",
51
+ "notification_id": doc["notification_id"],
52
+ "queue": QUEUE_NAME,
53
+ },
54
+ )
55
 
56
  return doc
57
 
 
69
  results.append(doc)
70
  queued += 1
71
  except Exception as e:
72
+ logger.error("Batch queue failed", exc_info=True, extra={"event": "batch_enqueue_failure"})
73
  failed += 1
74
  results.append({"error": str(e), "recipient": req.recipient})
75
  return queued, failed, results
 
136
  r = get_redis()
137
  if r:
138
  r.lpush(QUEUE_NAME, json.dumps({"notification_id": notification_id}))
139
+ logger.info(
140
+ "Notification re-queued",
141
+ extra={
142
+ "event": "enqueued",
143
+ "notification_id": notification_id,
144
+ "queue": QUEUE_NAME,
145
+ "operation": "retry",
146
+ },
147
+ )
148
 
149
  return await NotificationService.get_notification(notification_id)
app/services/worker.py CHANGED
@@ -30,7 +30,7 @@ class NotificationWorker:
30
  """Start the worker loop as a background task."""
31
  self._running = True
32
  self._task = asyncio.create_task(self._run())
33
- logger.info("Notification worker started")
34
 
35
  async def stop(self):
36
  """Gracefully stop the worker."""
@@ -41,7 +41,7 @@ class NotificationWorker:
41
  await self._task
42
  except asyncio.CancelledError:
43
  pass
44
- logger.info("Notification worker stopped")
45
 
46
  async def _run(self):
47
  """Main worker loop — poll Redis, dispatch each notification."""
@@ -49,22 +49,35 @@ class NotificationWorker:
49
  batch_size = settings.WORKER_BATCH_SIZE
50
 
51
  logger.info(
52
- f"Worker loop starting (poll_interval={poll_interval}s, batch_size={batch_size})"
 
 
 
 
 
 
53
  )
54
 
55
  while self._running:
56
  try:
57
  r = get_redis()
58
  if not r:
59
- logger.warning("Redis client not available, retrying in 5s")
 
 
 
60
  await asyncio.sleep(5)
61
  continue
62
 
63
  # Verify Redis connectivity on first iteration
64
  try:
65
  r.ping()
66
- except Exception as ping_err:
67
- logger.error(f"Redis ping failed: {ping_err}")
 
 
 
 
68
  await asyncio.sleep(5)
69
  continue
70
 
@@ -79,14 +92,25 @@ class NotificationWorker:
79
  payload = json.loads(raw)
80
  nid = payload.get("notification_id")
81
  if not nid:
82
- logger.warning(f"Queue item missing notification_id: {raw}")
 
 
 
83
  continue
84
 
 
 
 
 
 
85
  # Fetch full doc from MongoDB
86
  db = get_database()
87
  doc = await db[COLLECTION].find_one({"notification_id": nid})
88
  if not doc:
89
- logger.warning(f"Notification {nid} not found in DB, skipping")
 
 
 
90
  continue
91
 
92
  # Dispatch through channels
@@ -94,18 +118,24 @@ class NotificationWorker:
94
  processed += 1
95
 
96
  except json.JSONDecodeError:
97
- logger.error(f"Invalid JSON in queue: {raw}")
98
- except Exception as e:
99
- logger.error(f"Worker dispatch error: {e}", exc_info=True)
 
 
 
100
 
101
  if processed > 0:
102
- logger.info(f"Processed {processed} notification(s)")
 
 
 
103
 
104
  # Sleep between polls
105
  await asyncio.sleep(poll_interval)
106
 
107
  except asyncio.CancelledError:
108
  raise
109
- except Exception as e:
110
- logger.error(f"Worker loop error: {e}", exc_info=True)
111
  await asyncio.sleep(5)
 
30
  """Start the worker loop as a background task."""
31
  self._running = True
32
  self._task = asyncio.create_task(self._run())
33
+ logger.info("Worker started", extra={"event": "worker_started", "queue": QUEUE_NAME})
34
 
35
  async def stop(self):
36
  """Gracefully stop the worker."""
 
41
  await self._task
42
  except asyncio.CancelledError:
43
  pass
44
+ logger.info("Worker stopped", extra={"event": "worker_stopped", "queue": QUEUE_NAME})
45
 
46
  async def _run(self):
47
  """Main worker loop — poll Redis, dispatch each notification."""
 
49
  batch_size = settings.WORKER_BATCH_SIZE
50
 
51
  logger.info(
52
+ "Worker loop starting",
53
+ extra={
54
+ "event": "worker_loop_start",
55
+ "queue": QUEUE_NAME,
56
+ "poll_interval_seconds": poll_interval,
57
+ "batch_size": batch_size,
58
+ },
59
  )
60
 
61
  while self._running:
62
  try:
63
  r = get_redis()
64
  if not r:
65
+ logger.warning(
66
+ "Redis client unavailable",
67
+ extra={"event": "redis_unavailable", "retry_delay_seconds": 5},
68
+ )
69
  await asyncio.sleep(5)
70
  continue
71
 
72
  # Verify Redis connectivity on first iteration
73
  try:
74
  r.ping()
75
+ except Exception:
76
+ logger.error(
77
+ "Redis ping failed",
78
+ exc_info=True,
79
+ extra={"event": "redis_ping_failure", "retry_delay_seconds": 5},
80
+ )
81
  await asyncio.sleep(5)
82
  continue
83
 
 
92
  payload = json.loads(raw)
93
  nid = payload.get("notification_id")
94
  if not nid:
95
+ logger.warning(
96
+ "Queue item missing notification_id",
97
+ extra={"event": "dequeue_invalid_payload", "queue": QUEUE_NAME, "raw": raw},
98
+ )
99
  continue
100
 
101
+ logger.info(
102
+ "Queue item dequeued",
103
+ extra={"event": "dequeued", "queue": QUEUE_NAME, "notification_id": nid},
104
+ )
105
+
106
  # Fetch full doc from MongoDB
107
  db = get_database()
108
  doc = await db[COLLECTION].find_one({"notification_id": nid})
109
  if not doc:
110
+ logger.warning(
111
+ "Notification not found",
112
+ extra={"event": "notification_not_found", "notification_id": nid},
113
+ )
114
  continue
115
 
116
  # Dispatch through channels
 
118
  processed += 1
119
 
120
  except json.JSONDecodeError:
121
+ logger.error(
122
+ "Invalid JSON in queue",
123
+ extra={"event": "dequeue_invalid_json", "queue": QUEUE_NAME, "raw": raw},
124
+ )
125
+ except Exception:
126
+ logger.error("Worker dispatch error", exc_info=True, extra={"event": "dispatch_failure"})
127
 
128
  if processed > 0:
129
+ logger.info(
130
+ "Worker heartbeat",
131
+ extra={"event": "worker_heartbeat", "processed": processed, "queue": QUEUE_NAME},
132
+ )
133
 
134
  # Sleep between polls
135
  await asyncio.sleep(poll_interval)
136
 
137
  except asyncio.CancelledError:
138
  raise
139
+ except Exception:
140
+ logger.error("Worker loop error", exc_info=True, extra={"event": "worker_loop_failure"})
141
  await asyncio.sleep(5)