MukeshKapoor25 commited on
Commit
f2bfacd
·
1 Parent(s): c021ed8

feat(logging): implement production-grade structured logging with JSON and rotating files

Browse files

- Add JSONFormatter for structured JSON output compatible with log aggregators (ELK, CloudWatch, Datadog)
- Add ConsoleFormatter with color-coded output for local development
- Implement rotating file handlers with configurable size and backup limits
- Add separate error.log file for ERROR and CRITICAL level logs
- Create LoggerAdapter for injecting worker/entity context into log records
- Add structured startup/shutdown events with service metadata
- Extend .env.example with LOG_FORMAT, LOG_DIR, LOG_MAX_BYTES, and LOG_BACKUP_COUNT configuration options
- Update worker_manager.py to use new logging context for worker lifecycle events
- Update base_service.py to emit structured logs for sync operations
- Update main.py to initialize logging with startup event
- Enables production-ready log aggregation while maintaining human-readable console output for local development

.env.example CHANGED
@@ -24,3 +24,7 @@ UOM_WORKERS=2
24
 
25
  # Logging
26
  LOG_LEVEL=INFO
 
 
 
 
 
24
 
25
  # Logging
26
  LOG_LEVEL=INFO
27
+ # LOG_FORMAT=json # json (default, for production/log aggregators) | console (human-readable, for local dev)
28
+ # LOG_DIR=logs # directory for rotating log files (default: logs/)
29
+ # LOG_MAX_BYTES=52428800 # max size per log file in bytes (default: 50 MB)
30
+ # LOG_BACKUP_COUNT=10 # number of rotated backups to keep
app/core/logging.py CHANGED
@@ -1,19 +1,326 @@
1
  """
2
- Logging configuration.
 
 
 
 
 
 
 
 
3
  """
4
  import logging
 
5
  import sys
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
 
 
 
 
7
 
8
- def setup_logging(level: str = "INFO"):
9
- """Setup logging configuration."""
10
- logging.basicConfig(
11
- level=getattr(logging, level.upper()),
12
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
13
- handlers=[logging.StreamHandler(sys.stdout)]
 
 
14
  )
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
  def get_logger(name: str) -> logging.Logger:
18
- """Get logger instance."""
19
  return logging.getLogger(name)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ Production-grade logging configuration for Sync Worker Microservice.
3
+
4
+ Features:
5
+ - Structured JSON output for log aggregation (ELK, CloudWatch, Datadog, etc.)
6
+ - Rotating file handlers with size/backup limits
7
+ - Separate error log file
8
+ - Console handler with human-readable format for local dev
9
+ - Worker/entity context injection via LoggerAdapter
10
+ - Startup/shutdown structured events
11
  """
12
  import logging
13
+ import logging.handlers
14
  import sys
15
+ import os
16
+ import json
17
+ import traceback
18
+ from datetime import datetime, timezone
19
+ from typing import Optional, Any, Dict
20
+ from pathlib import Path
21
+
22
+
23
+ # ── Constants ────────────────────────────────────────────────────────────────
24
+
25
+ SERVICE_NAME = "sync-worker-ms"
26
+ LOG_DIR = Path(os.getenv("LOG_DIR", "logs"))
27
+ LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", 50 * 1024 * 1024)) # 50 MB
28
+ LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", "10"))
29
+
30
+
31
+ # ── JSON Formatter ────────────────────────────────────────────────────────────
32
+
33
+ class JSONFormatter(logging.Formatter):
34
+ """
35
+ Emit log records as single-line JSON objects.
36
+
37
+ Every record includes:
38
+ timestamp – ISO-8601 UTC
39
+ level – DEBUG / INFO / WARNING / ERROR / CRITICAL
40
+ logger – dotted module name
41
+ message – formatted log message
42
+ service – SERVICE_NAME constant
43
+ pid – process id
44
 
45
+ Any key/value pairs passed via ``extra={}`` are merged at the top level.
46
+ Exception info is serialised into ``exception.type``, ``exception.message``
47
+ and ``exception.stacktrace``.
48
+ """
49
 
50
+ RESERVED = frozenset(
51
+ {
52
+ "args", "created", "exc_info", "exc_text", "filename",
53
+ "funcName", "levelname", "levelno", "lineno", "message",
54
+ "module", "msecs", "msg", "name", "pathname", "process",
55
+ "processName", "relativeCreated", "stack_info", "thread",
56
+ "threadName",
57
+ }
58
  )
59
 
60
+ def format(self, record: logging.LogRecord) -> str:
61
+ # Base fields
62
+ payload: Dict[str, Any] = {
63
+ "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
64
+ "level": record.levelname,
65
+ "logger": record.name,
66
+ "message": record.getMessage(),
67
+ "service": SERVICE_NAME,
68
+ "pid": record.process,
69
+ }
70
+
71
+ # Merge caller location for WARNING and above
72
+ if record.levelno >= logging.WARNING:
73
+ payload["caller"] = f"{record.pathname}:{record.lineno}"
74
+
75
+ # Merge extra fields (skip stdlib internals)
76
+ for key, val in record.__dict__.items():
77
+ if key not in self.RESERVED and not key.startswith("_"):
78
+ payload[key] = val
79
+
80
+ # Serialise exception
81
+ if record.exc_info and record.exc_info[0] is not None:
82
+ exc_type, exc_value, exc_tb = record.exc_info
83
+ payload["exception"] = {
84
+ "type": exc_type.__name__,
85
+ "message": str(exc_value),
86
+ "stacktrace": traceback.format_exception(exc_type, exc_value, exc_tb),
87
+ }
88
+
89
+ try:
90
+ return json.dumps(payload, default=str)
91
+ except Exception:
92
+ # Fallback – never let the formatter crash the app
93
+ payload["message"] = str(record.getMessage())
94
+ return json.dumps(payload, default=str)
95
+
96
+
97
+ # ── Human-readable formatter (console / local dev) ───────────────────────────
98
+
99
+ class ConsoleFormatter(logging.Formatter):
100
+ """Coloured, human-readable formatter for local development."""
101
+
102
+ GREY = "\x1b[38;5;240m"
103
+ CYAN = "\x1b[36m"
104
+ YELLOW = "\x1b[33m"
105
+ RED = "\x1b[31m"
106
+ BOLD_RED = "\x1b[1;31m"
107
+ RESET = "\x1b[0m"
108
+
109
+ LEVEL_COLOURS = {
110
+ logging.DEBUG: GREY,
111
+ logging.INFO: CYAN,
112
+ logging.WARNING: YELLOW,
113
+ logging.ERROR: RED,
114
+ logging.CRITICAL: BOLD_RED,
115
+ }
116
+
117
+ FMT = "%(asctime)s %(levelname)-8s %(name)s — %(message)s"
118
+
119
+ def format(self, record: logging.LogRecord) -> str:
120
+ colour = self.LEVEL_COLOURS.get(record.levelno, self.RESET)
121
+ formatter = logging.Formatter(
122
+ f"{colour}{self.FMT}{self.RESET}",
123
+ datefmt="%Y-%m-%d %H:%M:%S",
124
+ )
125
+ result = formatter.format(record)
126
+
127
+ # Append extra context fields inline
128
+ extras = {
129
+ k: v for k, v in record.__dict__.items()
130
+ if k not in JSONFormatter.RESERVED and not k.startswith("_")
131
+ and k not in ("color_message",)
132
+ }
133
+ if extras:
134
+ result += f" {self.GREY}{extras}{self.RESET}"
135
+
136
+ return result
137
+
138
+
139
+ # ── Setup ─────────────────────────────────────────────────────────────────────
140
+
141
+ def setup_logging(level: str = "INFO") -> None:
142
+ """
143
+ Configure root logger with production-grade handlers.
144
+
145
+ Handlers created:
146
+ - stdout – JSON (production) or coloured console (LOG_FORMAT=console)
147
+ - app.log – rotating JSON, all levels ≥ level
148
+ - error.log – rotating JSON, ERROR and above only
149
+
150
+ Args:
151
+ level: Minimum log level string (DEBUG / INFO / WARNING / ERROR)
152
+ """
153
+ numeric_level = getattr(logging, level.upper(), logging.INFO)
154
+ log_format = os.getenv("LOG_FORMAT", "json").lower()
155
+
156
+ # Ensure log directory exists
157
+ LOG_DIR.mkdir(parents=True, exist_ok=True)
158
+
159
+ root = logging.getLogger()
160
+ root.setLevel(numeric_level)
161
+
162
+ # Remove any handlers added by earlier basicConfig calls
163
+ root.handlers.clear()
164
+
165
+ # ── stdout handler ────────────────────────────────────────────────────────
166
+ stdout_handler = logging.StreamHandler(sys.stdout)
167
+ stdout_handler.setLevel(numeric_level)
168
+ if log_format == "console":
169
+ stdout_handler.setFormatter(ConsoleFormatter())
170
+ else:
171
+ stdout_handler.setFormatter(JSONFormatter())
172
+ root.addHandler(stdout_handler)
173
+
174
+ # ── Rotating app log (all levels) ─────────────────────────────────────────
175
+ app_log_path = LOG_DIR / "app.log"
176
+ file_handler = logging.handlers.RotatingFileHandler(
177
+ filename=app_log_path,
178
+ maxBytes=LOG_MAX_BYTES,
179
+ backupCount=LOG_BACKUP_COUNT,
180
+ encoding="utf-8",
181
+ )
182
+ file_handler.setLevel(numeric_level)
183
+ file_handler.setFormatter(JSONFormatter())
184
+ root.addHandler(file_handler)
185
+
186
+ # ── Rotating error log (ERROR+) ───────────────────────────────────────────
187
+ error_log_path = LOG_DIR / "error.log"
188
+ error_handler = logging.handlers.RotatingFileHandler(
189
+ filename=error_log_path,
190
+ maxBytes=LOG_MAX_BYTES,
191
+ backupCount=LOG_BACKUP_COUNT,
192
+ encoding="utf-8",
193
+ )
194
+ error_handler.setLevel(logging.ERROR)
195
+ error_handler.setFormatter(JSONFormatter())
196
+ root.addHandler(error_handler)
197
+
198
+ # Silence noisy third-party loggers
199
+ for noisy in ("motor", "pymongo", "asyncpg", "sqlalchemy.engine", "aioredis"):
200
+ logging.getLogger(noisy).setLevel(logging.WARNING)
201
+
202
+ # Emit a structured startup event so log pipelines can detect restarts
203
+ startup_logger = logging.getLogger(SERVICE_NAME)
204
+ startup_logger.info(
205
+ "Logging initialised",
206
+ extra={
207
+ "event": "logging_init",
208
+ "log_level": level.upper(),
209
+ "log_format": log_format,
210
+ "log_dir": str(LOG_DIR.resolve()),
211
+ "app_log": str(app_log_path.resolve()),
212
+ "error_log": str(error_log_path.resolve()),
213
+ },
214
+ )
215
+
216
+
217
+ # ── Logger factory ────────────────────────────────────────────────────────────
218
 
219
  def get_logger(name: str) -> logging.Logger:
220
+ """Return a standard Logger for the given module name."""
221
  return logging.getLogger(name)
222
+
223
+
224
+ def get_worker_logger(name: str, entity_type: str, worker_id: int) -> logging.LoggerAdapter:
225
+ """
226
+ Return a LoggerAdapter that automatically injects worker context into every
227
+ log record, so you never have to repeat entity_type / worker_id in extra={}.
228
+
229
+ Usage::
230
+
231
+ logger = get_worker_logger(__name__, "catalogue", 3)
232
+ logger.info("Processing item", extra={"entity_id": "abc-123"})
233
+ # → {"entity_type": "catalogue", "worker_id": 3, "entity_id": "abc-123", ...}
234
+ """
235
+ base = logging.getLogger(name)
236
+ return logging.LoggerAdapter(base, {"entity_type": entity_type, "worker_id": worker_id})
237
+
238
+
239
+ # ── Structured event helpers ──────────────────────────────────────────────────
240
+
241
+ def log_sync_start(logger: logging.Logger, entity_type: str, entity_id: str, operation: str, worker_id: int) -> None:
242
+ logger.info(
243
+ "Sync started",
244
+ extra={
245
+ "event": "sync_start",
246
+ "entity_type": entity_type,
247
+ "entity_id": entity_id,
248
+ "operation": operation,
249
+ "worker_id": worker_id,
250
+ },
251
+ )
252
+
253
+
254
+ def log_sync_success(
255
+ logger: logging.Logger,
256
+ entity_type: str,
257
+ entity_id: str,
258
+ operation: str,
259
+ worker_id: int,
260
+ duration_ms: float,
261
+ ) -> None:
262
+ logger.info(
263
+ "Sync succeeded",
264
+ extra={
265
+ "event": "sync_success",
266
+ "entity_type": entity_type,
267
+ "entity_id": entity_id,
268
+ "operation": operation,
269
+ "worker_id": worker_id,
270
+ "duration_ms": round(duration_ms, 2),
271
+ },
272
+ )
273
+
274
+
275
+ def log_sync_failure(
276
+ logger: logging.Logger,
277
+ entity_type: str,
278
+ entity_id: str,
279
+ operation: str,
280
+ worker_id: int,
281
+ duration_ms: float,
282
+ error: str,
283
+ attempt: int,
284
+ max_attempts: int,
285
+ ) -> None:
286
+ logger.error(
287
+ "Sync failed",
288
+ extra={
289
+ "event": "sync_failure",
290
+ "entity_type": entity_type,
291
+ "entity_id": entity_id,
292
+ "operation": operation,
293
+ "worker_id": worker_id,
294
+ "duration_ms": round(duration_ms, 2),
295
+ "error": error,
296
+ "attempt": attempt,
297
+ "max_attempts": max_attempts,
298
+ },
299
+ )
300
+
301
+
302
+ def log_worker_heartbeat(
303
+ logger: logging.Logger,
304
+ entity_type: str,
305
+ worker_id: int,
306
+ iteration: int,
307
+ queue_size: int,
308
+ ) -> None:
309
+ logger.info(
310
+ "Worker heartbeat",
311
+ extra={
312
+ "event": "worker_heartbeat",
313
+ "entity_type": entity_type,
314
+ "worker_id": worker_id,
315
+ "iteration": iteration,
316
+ "queue_size": queue_size,
317
+ },
318
+ )
319
+
320
+
321
+ def log_service_lifecycle(logger: logging.Logger, event: str, **kwargs: Any) -> None:
322
+ """Emit a structured lifecycle event (startup, shutdown, connection, etc.)."""
323
+ logger.info(
324
+ event.replace("_", " ").capitalize(),
325
+ extra={"event": event, **kwargs},
326
+ )
app/main.py CHANGED
@@ -7,7 +7,7 @@ No HTTP server, just pure queue processing workers.
7
  import asyncio
8
  import signal
9
  from app.core.config import settings
10
- from app.core.logging import setup_logging, get_logger
11
  from app.sync.worker_manager import WorkerManager
12
 
13
  # Setup logging
@@ -17,66 +17,62 @@ logger = get_logger(__name__)
17
 
18
  class SyncWorkerService:
19
  """Main sync worker service."""
20
-
21
  def __init__(self):
22
  self.worker_manager = None
23
  self.running = False
24
-
25
  async def start(self):
26
  """Start the sync worker service."""
27
- logger.info("=" * 70)
28
- logger.info("Starting Sync Worker Microservice")
29
- logger.info("=" * 70)
30
-
31
  try:
32
- # Initialize worker manager
33
  self.worker_manager = WorkerManager()
34
  await self.worker_manager.initialize()
35
-
36
- # Start all workers
37
  await self.worker_manager.start_all_workers()
38
-
39
  self.running = True
40
- logger.info(" Sync Worker Service started successfully")
41
- logger.info("=" * 70)
42
-
43
- # Keep running
44
  while self.running:
45
  await asyncio.sleep(1)
46
-
47
  except Exception as e:
48
- logger.error(f"Failed to start sync worker service: {e}", exc_info=True)
 
 
 
 
49
  raise
50
-
51
  async def stop(self):
52
  """Stop the sync worker service."""
53
- logger.info("Shutting down Sync Worker Service...")
54
  self.running = False
55
-
56
  if self.worker_manager:
57
  await self.worker_manager.stop_all_workers()
58
-
59
- logger.info(" Sync Worker Service stopped")
60
 
61
 
62
  async def main():
63
  """Main entry point."""
64
  service = SyncWorkerService()
65
-
66
- # Setup signal handlers for graceful shutdown
67
  loop = asyncio.get_running_loop()
68
-
69
  def signal_handler():
70
- logger.info("Received shutdown signal")
71
  asyncio.create_task(service.stop())
72
-
73
  for sig in (signal.SIGTERM, signal.SIGINT):
74
  loop.add_signal_handler(sig, signal_handler)
75
-
76
  try:
77
  await service.start()
78
  except KeyboardInterrupt:
79
- logger.info("Keyboard interrupt received")
80
  finally:
81
  await service.stop()
82
 
 
7
  import asyncio
8
  import signal
9
  from app.core.config import settings
10
+ from app.core.logging import setup_logging, get_logger, log_service_lifecycle
11
  from app.sync.worker_manager import WorkerManager
12
 
13
  # Setup logging
 
17
 
18
  class SyncWorkerService:
19
  """Main sync worker service."""
20
+
21
  def __init__(self):
22
  self.worker_manager = None
23
  self.running = False
24
+
25
  async def start(self):
26
  """Start the sync worker service."""
27
+ log_service_lifecycle(logger, "service_starting", service="sync-worker-ms")
28
+
 
 
29
  try:
 
30
  self.worker_manager = WorkerManager()
31
  await self.worker_manager.initialize()
 
 
32
  await self.worker_manager.start_all_workers()
33
+
34
  self.running = True
35
+ log_service_lifecycle(logger, "service_ready", service="sync-worker-ms")
36
+
 
 
37
  while self.running:
38
  await asyncio.sleep(1)
39
+
40
  except Exception as e:
41
+ logger.critical(
42
+ "Service failed to start",
43
+ exc_info=True,
44
+ extra={"event": "service_start_error", "error": str(e)},
45
+ )
46
  raise
47
+
48
  async def stop(self):
49
  """Stop the sync worker service."""
50
+ log_service_lifecycle(logger, "service_stopping", service="sync-worker-ms")
51
  self.running = False
52
+
53
  if self.worker_manager:
54
  await self.worker_manager.stop_all_workers()
55
+
56
+ log_service_lifecycle(logger, "service_stopped", service="sync-worker-ms")
57
 
58
 
59
  async def main():
60
  """Main entry point."""
61
  service = SyncWorkerService()
62
+
 
63
  loop = asyncio.get_running_loop()
64
+
65
  def signal_handler():
66
+ logger.info("Received shutdown signal", extra={"event": "signal_received"})
67
  asyncio.create_task(service.stop())
68
+
69
  for sig in (signal.SIGTERM, signal.SIGINT):
70
  loop.add_signal_handler(sig, signal_handler)
71
+
72
  try:
73
  await service.start()
74
  except KeyboardInterrupt:
75
+ logger.info("Keyboard interrupt received", extra={"event": "keyboard_interrupt"})
76
  finally:
77
  await service.stop()
78
 
app/sync/common/base_service.py CHANGED
@@ -7,7 +7,13 @@ from typing import Optional
7
  from motor.motor_asyncio import AsyncIOMotorDatabase
8
  from sqlalchemy.ext.asyncio import AsyncEngine
9
  import redis.asyncio as redis
10
- from app.core.logging import get_logger
 
 
 
 
 
 
11
  import time
12
  import traceback
13
 
@@ -182,56 +188,53 @@ class BaseSyncService:
182
 
183
  async def _worker(self, worker_id: int) -> None:
184
  """Background worker that processes Redis sync queue."""
185
- logger.info(f"{self.entity_type} sync worker {worker_id} started (is_running={self.is_running})")
186
-
187
- # Log immediately to confirm we enter the loop
188
- logger.info(f"{self.entity_type} worker {worker_id} entering main loop")
189
-
 
 
 
 
190
  iteration = 0
191
  while self.is_running:
192
  try:
193
  iteration += 1
194
- if iteration % 180 == 0: # Log every ~3 minutes (1s poll interval)
195
  queue_size = await self.sync_queue.size()
196
- logger.info(f"{self.entity_type} worker {worker_id} alive (iteration {iteration}, queue_size={queue_size})")
197
-
198
- # Get next operation from Redis queue (blocking with timeout)
199
  sync_op = await self.sync_queue.dequeue(timeout=1)
200
-
201
  if not sync_op:
202
- # Timeout, no operations available
203
  continue
204
-
205
- logger.info(
206
- f"{self.entity_type} worker {worker_id} processing {sync_op.entity_id}",
207
- extra={
208
- "worker_id": worker_id,
209
- "entity_type": self.entity_type,
210
- "entity_id": sync_op.entity_id,
211
- "operation": sync_op.operation
212
- }
213
- )
214
-
215
- # Process the sync operation
216
  await self._process_sync_operation(sync_op, worker_id)
217
-
218
  except asyncio.CancelledError:
219
- logger.info(f"{self.entity_type} sync worker {worker_id} cancelled")
 
 
 
220
  break
221
  except Exception as e:
222
  logger.error(
223
- f"Error in {self.entity_type} sync worker {worker_id}",
224
- exc_info=e,
225
  extra={
 
226
  "entity_type": self.entity_type,
227
  "worker_id": worker_id,
228
- "error": str(e)
229
- }
230
  )
231
- # Continue processing despite error
232
  await asyncio.sleep(1.0)
233
-
234
- logger.info(f"{self.entity_type} sync worker {worker_id} stopped")
 
 
 
235
 
236
  async def _process_sync_operation(
237
  self,
@@ -240,19 +243,8 @@ class BaseSyncService:
240
  ) -> None:
241
  """Process a single sync operation with retry logic."""
242
  start_time = time.time()
243
-
244
  try:
245
- logger.debug(
246
- f"Worker {worker_id} processing {self.entity_type} sync",
247
- extra={
248
- "worker_id": worker_id,
249
- "entity_type": sync_op.entity_type,
250
- "entity_id": sync_op.entity_id,
251
- "operation": sync_op.operation
252
- }
253
- )
254
-
255
- # Execute sync with retry logic
256
  await self.retry_manager.execute_with_retry(
257
  self._sync_with_connection,
258
  sync_op.entity_id,
@@ -260,38 +252,36 @@ class BaseSyncService:
260
  entity_type=self.entity_type,
261
  entity_id=sync_op.entity_id
262
  )
263
-
264
- # Record success
265
  duration_ms = (time.time() - start_time) * 1000
266
  self.monitoring.record_sync_success(
267
  entity_type=self.entity_type,
268
  entity_id=sync_op.entity_id,
269
- duration_ms=duration_ms
270
  )
271
  await self.sync_queue.record_processed()
272
-
 
 
 
 
273
  except Exception as e:
274
- # Record failure
275
  error_msg = str(e)
276
- stack_trace = traceback.format_exc()
277
-
278
  self.monitoring.record_sync_failure(
279
  entity_type=self.entity_type,
280
  entity_id=sync_op.entity_id,
281
  error=error_msg,
282
- stack_trace=stack_trace
283
  )
284
  await self.sync_queue.record_failed()
285
-
286
- logger.error(
287
- f"Worker {worker_id} failed to sync {self.entity_type} after retries",
288
- exc_info=e,
289
- extra={
290
- "worker_id": worker_id,
291
- "entity_type": sync_op.entity_type,
292
- "entity_id": sync_op.entity_id,
293
- "error": error_msg
294
- }
295
  )
296
 
297
  async def _sync_with_connection(self, entity_id: str, operation: str = "update") -> bool:
 
7
  from motor.motor_asyncio import AsyncIOMotorDatabase
8
  from sqlalchemy.ext.asyncio import AsyncEngine
9
  import redis.asyncio as redis
10
+ from app.core.logging import (
11
+ get_logger,
12
+ log_sync_start,
13
+ log_sync_success,
14
+ log_sync_failure,
15
+ log_worker_heartbeat,
16
+ )
17
  import time
18
  import traceback
19
 
 
188
 
189
  async def _worker(self, worker_id: int) -> None:
190
  """Background worker that processes Redis sync queue."""
191
+ logger.info(
192
+ "Worker started",
193
+ extra={
194
+ "event": "worker_start",
195
+ "entity_type": self.entity_type,
196
+ "worker_id": worker_id,
197
+ },
198
+ )
199
+
200
  iteration = 0
201
  while self.is_running:
202
  try:
203
  iteration += 1
204
+ if iteration % 180 == 0: # heartbeat every ~3 minutes
205
  queue_size = await self.sync_queue.size()
206
+ log_worker_heartbeat(logger, self.entity_type, worker_id, iteration, queue_size)
207
+
 
208
  sync_op = await self.sync_queue.dequeue(timeout=1)
 
209
  if not sync_op:
 
210
  continue
211
+
212
+ log_sync_start(logger, self.entity_type, sync_op.entity_id, sync_op.operation, worker_id)
 
 
 
 
 
 
 
 
 
 
213
  await self._process_sync_operation(sync_op, worker_id)
214
+
215
  except asyncio.CancelledError:
216
+ logger.info(
217
+ "Worker cancelled",
218
+ extra={"event": "worker_cancel", "entity_type": self.entity_type, "worker_id": worker_id},
219
+ )
220
  break
221
  except Exception as e:
222
  logger.error(
223
+ "Unhandled error in worker loop",
224
+ exc_info=True,
225
  extra={
226
+ "event": "worker_loop_error",
227
  "entity_type": self.entity_type,
228
  "worker_id": worker_id,
229
+ "error": str(e),
230
+ },
231
  )
 
232
  await asyncio.sleep(1.0)
233
+
234
+ logger.info(
235
+ "Worker stopped",
236
+ extra={"event": "worker_stop", "entity_type": self.entity_type, "worker_id": worker_id},
237
+ )
238
 
239
  async def _process_sync_operation(
240
  self,
 
243
  ) -> None:
244
  """Process a single sync operation with retry logic."""
245
  start_time = time.time()
246
+
247
  try:
 
 
 
 
 
 
 
 
 
 
 
248
  await self.retry_manager.execute_with_retry(
249
  self._sync_with_connection,
250
  sync_op.entity_id,
 
252
  entity_type=self.entity_type,
253
  entity_id=sync_op.entity_id
254
  )
255
+
 
256
  duration_ms = (time.time() - start_time) * 1000
257
  self.monitoring.record_sync_success(
258
  entity_type=self.entity_type,
259
  entity_id=sync_op.entity_id,
260
+ duration_ms=duration_ms,
261
  )
262
  await self.sync_queue.record_processed()
263
+ log_sync_success(
264
+ logger, self.entity_type, sync_op.entity_id,
265
+ sync_op.operation, worker_id, duration_ms,
266
+ )
267
+
268
  except Exception as e:
269
+ duration_ms = (time.time() - start_time) * 1000
270
  error_msg = str(e)
271
+
 
272
  self.monitoring.record_sync_failure(
273
  entity_type=self.entity_type,
274
  entity_id=sync_op.entity_id,
275
  error=error_msg,
276
+ stack_trace=traceback.format_exc(),
277
  )
278
  await self.sync_queue.record_failed()
279
+ log_sync_failure(
280
+ logger, self.entity_type, sync_op.entity_id,
281
+ sync_op.operation, worker_id, duration_ms,
282
+ error=error_msg,
283
+ attempt=self.retry_manager.max_retries + 1,
284
+ max_attempts=self.retry_manager.max_retries + 1,
 
 
 
 
285
  )
286
 
287
  async def _sync_with_connection(self, entity_id: str, operation: str = "update") -> bool:
app/sync/worker_manager.py CHANGED
@@ -8,7 +8,7 @@ from sqlalchemy import text
8
  import redis.asyncio as redis
9
 
10
  from app.core.config import settings
11
- from app.core.logging import get_logger
12
 
13
  logger = get_logger(__name__)
14
 
@@ -25,42 +25,41 @@ class WorkerManager:
25
 
26
  async def initialize(self):
27
  """Initialize database connections."""
28
- logger.info("Initializing connections...")
29
-
30
  # MongoDB
31
- logger.info(f"Connecting to MongoDB: {settings.MONGODB_URI}")
32
  self.mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
33
  self.mongo_db = self.mongo_client[settings.MONGODB_DB_NAME]
34
  await self.mongo_client.admin.command('ping')
35
- logger.info("✅ Connected to MongoDB")
36
-
37
  # PostgreSQL
38
  pg_url = f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}/{settings.POSTGRES_DB}"
39
- logger.info(f"Connecting to PostgreSQL: {settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}")
40
  self.pg_engine = create_async_engine(pg_url, echo=False, pool_pre_ping=True)
41
  async with self.pg_engine.begin() as conn:
42
  await conn.execute(text("SELECT 1"))
43
- logger.info("✅ Connected to PostgreSQL")
44
-
45
- # Redis with connection pooling
46
- logger.info(f"Connecting to Redis: {settings.REDIS_HOST}:{settings.REDIS_PORT}")
47
  pool = redis.ConnectionPool(
48
  host=settings.REDIS_HOST,
49
  port=settings.REDIS_PORT,
50
  password=settings.REDIS_PASSWORD,
51
  db=settings.REDIS_DB,
52
  decode_responses=True,
53
- max_connections=15, # Slightly higher for sync workers
54
  socket_keepalive=True,
55
  socket_connect_timeout=5,
56
  retry_on_timeout=True,
57
- health_check_interval=30
58
  )
59
  self.redis_client = redis.Redis(connection_pool=pool)
60
  await self.redis_client.ping()
61
- logger.info(" Connected to Redis with connection pool (max_connections=15)")
62
-
63
- # Initialize sync services
64
  await self._initialize_sync_services()
65
 
66
  async def _initialize_sync_services(self):
@@ -70,89 +69,72 @@ class WorkerManager:
70
  from app.sync.merchants.service import MerchantSyncService
71
  from app.sync.warehouses.service import WarehouseSyncService
72
  from app.sync.uom.service import UOMSyncService
73
-
74
- logger.info("Initializing sync services...")
75
-
76
  self.sync_services = {
77
  "catalogues": CatalogueSyncService(
78
- mongo_db=self.mongo_db,
79
- pg_engine=self.pg_engine,
80
- redis_client=self.redis_client,
81
- max_queue_size=10000,
82
- worker_count=settings.CATALOGUE_WORKERS,
83
- max_retries=3
84
  ),
85
  "employees": EmployeeSyncService(
86
- mongo_db=self.mongo_db,
87
- pg_engine=self.pg_engine,
88
- redis_client=self.redis_client,
89
- max_queue_size=10000,
90
- worker_count=settings.EMPLOYEE_WORKERS,
91
- max_retries=3
92
  ),
93
  "merchants": MerchantSyncService(
94
- mongo_db=self.mongo_db,
95
- pg_engine=self.pg_engine,
96
- redis_client=self.redis_client,
97
- max_queue_size=10000,
98
- worker_count=settings.MERCHANT_WORKERS,
99
- max_retries=3
100
  ),
101
  "warehouses": WarehouseSyncService(
102
- mongo_db=self.mongo_db,
103
- pg_engine=self.pg_engine,
104
- redis_client=self.redis_client,
105
- max_queue_size=10000,
106
- worker_count=settings.WAREHOUSE_WORKERS,
107
- max_retries=3
108
  ),
109
  "uom": UOMSyncService(
110
- mongo_db=self.mongo_db,
111
- pg_engine=self.pg_engine,
112
- redis_client=self.redis_client,
113
- max_queue_size=5000,
114
- worker_count=settings.UOM_WORKERS,
115
- max_retries=3
116
- )
117
  }
118
-
119
- logger.info(f" Initialized {len(self.sync_services)} sync services")
 
120
 
121
  async def start_all_workers(self):
122
  """Start all sync workers."""
123
- logger.info("Starting all sync workers...")
124
-
125
  for entity_type, service in self.sync_services.items():
126
  try:
127
  await service.start_workers()
128
- logger.info(f" Started {entity_type} workers")
 
129
  except Exception as e:
130
- logger.error(f"Failed to start {entity_type} workers: {e}", exc_info=True)
131
-
132
- # Log summary
133
  total_workers = sum(s.worker_count for s in self.sync_services.values())
134
- logger.info(f" Started {total_workers} workers across {len(self.sync_services)} services")
135
-
 
136
  async def stop_all_workers(self):
137
  """Stop all sync workers."""
138
- logger.info("Stopping all sync workers...")
139
-
140
  for entity_type, service in self.sync_services.items():
141
  try:
142
  await service.stop_workers()
143
- logger.info(f" Stopped {entity_type} workers")
144
  except Exception as e:
145
- logger.error(f"Failed to stop {entity_type} workers: {e}", exc_info=True)
146
-
147
- # Close connections
148
  if self.redis_client:
149
  await self.redis_client.aclose()
150
- logger.info("✅ Closed Redis connection")
151
-
152
  if self.pg_engine:
153
  await self.pg_engine.dispose()
154
- logger.info("✅ Closed PostgreSQL connection")
155
-
156
  if self.mongo_client:
157
  self.mongo_client.close()
158
- logger.info("✅ Closed MongoDB connection")
 
8
  import redis.asyncio as redis
9
 
10
  from app.core.config import settings
11
+ from app.core.logging import get_logger, log_service_lifecycle
12
 
13
  logger = get_logger(__name__)
14
 
 
25
 
26
  async def initialize(self):
27
  """Initialize database connections."""
28
+ log_service_lifecycle(logger, "connections_init")
29
+
30
  # MongoDB
31
+ log_service_lifecycle(logger, "mongodb_connecting", host=settings.MONGODB_URI.split("@")[-1])
32
  self.mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
33
  self.mongo_db = self.mongo_client[settings.MONGODB_DB_NAME]
34
  await self.mongo_client.admin.command('ping')
35
+ log_service_lifecycle(logger, "mongodb_connected", db=settings.MONGODB_DB_NAME)
36
+
37
  # PostgreSQL
38
  pg_url = f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}/{settings.POSTGRES_DB}"
39
+ log_service_lifecycle(logger, "postgres_connecting", host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT)
40
  self.pg_engine = create_async_engine(pg_url, echo=False, pool_pre_ping=True)
41
  async with self.pg_engine.begin() as conn:
42
  await conn.execute(text("SELECT 1"))
43
+ log_service_lifecycle(logger, "postgres_connected", db=settings.POSTGRES_DB)
44
+
45
+ # Redis
46
+ log_service_lifecycle(logger, "redis_connecting", host=settings.REDIS_HOST, port=settings.REDIS_PORT)
47
  pool = redis.ConnectionPool(
48
  host=settings.REDIS_HOST,
49
  port=settings.REDIS_PORT,
50
  password=settings.REDIS_PASSWORD,
51
  db=settings.REDIS_DB,
52
  decode_responses=True,
53
+ max_connections=15,
54
  socket_keepalive=True,
55
  socket_connect_timeout=5,
56
  retry_on_timeout=True,
57
+ health_check_interval=30,
58
  )
59
  self.redis_client = redis.Redis(connection_pool=pool)
60
  await self.redis_client.ping()
61
+ log_service_lifecycle(logger, "redis_connected", max_connections=15)
62
+
 
63
  await self._initialize_sync_services()
64
 
65
  async def _initialize_sync_services(self):
 
69
  from app.sync.merchants.service import MerchantSyncService
70
  from app.sync.warehouses.service import WarehouseSyncService
71
  from app.sync.uom.service import UOMSyncService
72
+
73
+ log_service_lifecycle(logger, "sync_services_init")
74
+
75
  self.sync_services = {
76
  "catalogues": CatalogueSyncService(
77
+ mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
78
+ max_queue_size=10000, worker_count=settings.CATALOGUE_WORKERS, max_retries=3,
 
 
 
 
79
  ),
80
  "employees": EmployeeSyncService(
81
+ mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
82
+ max_queue_size=10000, worker_count=settings.EMPLOYEE_WORKERS, max_retries=3,
 
 
 
 
83
  ),
84
  "merchants": MerchantSyncService(
85
+ mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
86
+ max_queue_size=10000, worker_count=settings.MERCHANT_WORKERS, max_retries=3,
 
 
 
 
87
  ),
88
  "warehouses": WarehouseSyncService(
89
+ mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
90
+ max_queue_size=10000, worker_count=settings.WAREHOUSE_WORKERS, max_retries=3,
 
 
 
 
91
  ),
92
  "uom": UOMSyncService(
93
+ mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
94
+ max_queue_size=5000, worker_count=settings.UOM_WORKERS, max_retries=3,
95
+ ),
 
 
 
 
96
  }
97
+
98
+ log_service_lifecycle(logger, "sync_services_ready", count=len(self.sync_services),
99
+ services=list(self.sync_services.keys()))
100
 
101
  async def start_all_workers(self):
102
  """Start all sync workers."""
103
+ log_service_lifecycle(logger, "workers_starting", count=len(self.sync_services))
104
+
105
  for entity_type, service in self.sync_services.items():
106
  try:
107
  await service.start_workers()
108
+ log_service_lifecycle(logger, "workers_started", entity_type=entity_type,
109
+ worker_count=service.worker_count)
110
  except Exception as e:
111
+ logger.error("Failed to start workers", exc_info=True,
112
+ extra={"event": "workers_start_error", "entity_type": entity_type, "error": str(e)})
113
+
114
  total_workers = sum(s.worker_count for s in self.sync_services.values())
115
+ log_service_lifecycle(logger, "all_workers_started", total_workers=total_workers,
116
+ service_count=len(self.sync_services))
117
+
118
  async def stop_all_workers(self):
119
  """Stop all sync workers."""
120
+ log_service_lifecycle(logger, "workers_stopping")
121
+
122
  for entity_type, service in self.sync_services.items():
123
  try:
124
  await service.stop_workers()
125
+ log_service_lifecycle(logger, "workers_stopped", entity_type=entity_type)
126
  except Exception as e:
127
+ logger.error("Failed to stop workers", exc_info=True,
128
+ extra={"event": "workers_stop_error", "entity_type": entity_type, "error": str(e)})
129
+
130
  if self.redis_client:
131
  await self.redis_client.aclose()
132
+ log_service_lifecycle(logger, "redis_disconnected")
133
+
134
  if self.pg_engine:
135
  await self.pg_engine.dispose()
136
+ log_service_lifecycle(logger, "postgres_disconnected")
137
+
138
  if self.mongo_client:
139
  self.mongo_client.close()
140
+ log_service_lifecycle(logger, "mongodb_disconnected")