""" Base sync service with Redis queue support. Template for all entity-specific sync services. """ import asyncio from typing import Optional from motor.motor_asyncio import AsyncIOMotorDatabase from sqlalchemy.ext.asyncio import AsyncEngine import redis.asyncio as redis from app.core.logging import ( get_logger, log_sync_start, log_sync_success, log_sync_failure, log_worker_heartbeat, ) import time import traceback from app.sync.common.retry import RetryManager from app.sync.common.monitoring import SyncMonitoringService from app.sync.common.models import SyncOperation from app.sync.common.redis_queue import RedisSyncQueue logger = get_logger(__name__) class BaseSyncService: """ Base class for entity sync services with Redis queue support. Provides common functionality: - Redis-based persistent queue - Background worker management - Retry logic - Monitoring and metrics """ def __init__( self, entity_type: str, handler, mongo_db: AsyncIOMotorDatabase, pg_engine: AsyncEngine, redis_client: redis.Redis, max_queue_size: int = 10000, worker_count: int = 5, max_retries: int = 3 ): """ Initialize base sync service. Args: entity_type: Type of entity (e.g., "catalogue", "merchant") handler: Entity-specific sync handler mongo_db: MongoDB database instance pg_engine: PostgreSQL async engine redis_client: Redis client instance max_queue_size: Maximum size of sync queue worker_count: Number of background workers max_retries: Maximum retry attempts for failed syncs """ self.entity_type = entity_type self.handler = handler self.mongo_db = mongo_db self.pg_engine = pg_engine self.max_queue_size = max_queue_size self.worker_count = worker_count # Initialize components self.retry_manager = RetryManager(max_retries=max_retries) self.monitoring = SyncMonitoringService() # Redis-based persistent queue self.sync_queue = RedisSyncQueue( redis_client=redis_client, queue_name=f"sync:queue:{entity_type}", max_size=max_queue_size ) # Background workers self.workers: list[asyncio.Task] = [] self.is_running = False async def start_workers(self) -> None: """Start background workers for processing sync queue.""" if self.is_running: logger.warning(f"{self.entity_type} sync workers already running") return self.is_running = True logger.info( f"Starting {self.entity_type} sync workers", extra={ "entity_type": self.entity_type, "worker_count": self.worker_count, "max_queue_size": self.max_queue_size } ) for i in range(self.worker_count): worker = asyncio.create_task(self._worker(worker_id=i)) self.workers.append(worker) logger.info(f"Started {self.worker_count} {self.entity_type} sync workers") async def stop_workers(self) -> None: """Stop all background workers gracefully.""" if not self.is_running: logger.warning(f"{self.entity_type} sync workers not running") return logger.info(f"Stopping {self.entity_type} sync workers") self.is_running = False # Cancel all workers for worker in self.workers: worker.cancel() # Wait for workers to finish await asyncio.gather(*self.workers, return_exceptions=True) self.workers.clear() logger.info(f"{self.entity_type} sync workers stopped") async def enqueue_sync( self, entity_id: str, operation: str = "update" ) -> bool: """ Queue an entity for synchronization. Args: entity_id: ID of entity to sync operation: Type of operation ("create" | "update") Returns: True if enqueued successfully """ try: sync_op = SyncOperation( entity_type=self.entity_type, entity_id=entity_id, operation=operation ) # Enqueue to Redis success = await self.sync_queue.enqueue(sync_op) if success: queue_size = await self.sync_queue.size() logger.debug( f"{self.entity_type} queued for sync", extra={ "entity_type": self.entity_type, "entity_id": entity_id, "operation": operation, "queue_size": queue_size } ) else: logger.warning( f"Failed to queue {self.entity_type} for sync (queue full)", extra={ "entity_type": self.entity_type, "entity_id": entity_id, "operation": operation } ) return success except Exception as e: logger.error( f"Error queuing {self.entity_type} for sync", exc_info=e, extra={ "entity_type": self.entity_type, "entity_id": entity_id, "error": str(e) } ) return False async def _worker(self, worker_id: int) -> None: """Background worker that processes Redis sync queue.""" logger.info( "Worker started", extra={ "event": "worker_start", "entity_type": self.entity_type, "worker_id": worker_id, }, ) iteration = 0 redis_backoff = 1.0 # seconds; resets on success while self.is_running: try: iteration += 1 if iteration % 180 == 0: log_worker_heartbeat(logger, self.entity_type, worker_id, iteration, -1) sync_op = await self.sync_queue.dequeue(timeout=1) redis_backoff = 1.0 # reset on successful dequeue call if not sync_op: continue log_sync_start(logger, self.entity_type, sync_op.entity_id, sync_op.operation, worker_id) await self._process_sync_operation(sync_op, worker_id) except asyncio.CancelledError: logger.info( "Worker cancelled", extra={"event": "worker_cancel", "entity_type": self.entity_type, "worker_id": worker_id}, ) break except Exception as e: import redis as redis_lib is_conn_error = isinstance(e, redis_lib.exceptions.ConnectionError) logger.error( "Redis connection error in worker loop" if is_conn_error else "Unhandled error in worker loop", exc_info=True, extra={ "event": "worker_redis_error" if is_conn_error else "worker_loop_error", "entity_type": self.entity_type, "worker_id": worker_id, "backoff_seconds": redis_backoff, "error": str(e), }, ) await asyncio.sleep(redis_backoff) # Exponential backoff capped at 30s, with jitter import random redis_backoff = min(redis_backoff * 2 + random.uniform(0, 1), 30.0) logger.info( "Worker stopped", extra={"event": "worker_stop", "entity_type": self.entity_type, "worker_id": worker_id}, ) async def _process_sync_operation( self, sync_op: SyncOperation, worker_id: int ) -> None: """Process a single sync operation with retry logic.""" start_time = time.time() try: await self.retry_manager.execute_with_retry( self._sync_with_connection, sync_op.entity_id, sync_op.operation, entity_type=self.entity_type, entity_id=sync_op.entity_id ) duration_ms = (time.time() - start_time) * 1000 self.monitoring.record_sync_success( entity_type=self.entity_type, entity_id=sync_op.entity_id, duration_ms=duration_ms, ) await self.sync_queue.record_processed() log_sync_success( logger, self.entity_type, sync_op.entity_id, sync_op.operation, worker_id, duration_ms, ) except Exception as e: duration_ms = (time.time() - start_time) * 1000 error_msg = str(e) self.monitoring.record_sync_failure( entity_type=self.entity_type, entity_id=sync_op.entity_id, error=error_msg, stack_trace=traceback.format_exc(), ) await self.sync_queue.record_failed() log_sync_failure( logger, self.entity_type, sync_op.entity_id, sync_op.operation, worker_id, duration_ms, error=error_msg, attempt=self.retry_manager.max_retries + 1, max_attempts=self.retry_manager.max_retries + 1, ) async def _sync_with_connection(self, entity_id: str, operation: str = "update") -> bool: """ Sync a single entity with connection management. Args: entity_id: ID of entity to sync operation: Type of operation ("create" | "update" | "delete") Returns: True if sync successful """ async with self.pg_engine.begin() as pg_conn: # Perform sync using handler success = await self.handler.sync( entity_id=entity_id, mongo_db=self.mongo_db, pg_conn=pg_conn, operation=operation ) return success async def get_queue_size(self) -> int: """Get current size of sync queue.""" return await self.sync_queue.size() async def get_queue_stats(self) -> dict: """Get detailed queue statistics.""" return await self.sync_queue.get_stats() def get_metrics(self) -> dict: """Get sync metrics for this entity type.""" return self.monitoring.get_entity_metrics(self.entity_type)