Spaces:
Runtime error
Runtime error
fix(redis): cut workers to 7 total, remove per-iteration Redis calls to reduce connection pressure
dfbedd9 | """ | |
| Base sync service with Redis queue support. | |
| Template for all entity-specific sync services. | |
| """ | |
| import asyncio | |
| from typing import Optional | |
| from motor.motor_asyncio import AsyncIOMotorDatabase | |
| from sqlalchemy.ext.asyncio import AsyncEngine | |
| import redis.asyncio as redis | |
| from app.core.logging import ( | |
| get_logger, | |
| log_sync_start, | |
| log_sync_success, | |
| log_sync_failure, | |
| log_worker_heartbeat, | |
| ) | |
| import time | |
| import traceback | |
| from app.sync.common.retry import RetryManager | |
| from app.sync.common.monitoring import SyncMonitoringService | |
| from app.sync.common.models import SyncOperation | |
| from app.sync.common.redis_queue import RedisSyncQueue | |
| logger = get_logger(__name__) | |
| class BaseSyncService: | |
| """ | |
| Base class for entity sync services with Redis queue support. | |
| Provides common functionality: | |
| - Redis-based persistent queue | |
| - Background worker management | |
| - Retry logic | |
| - Monitoring and metrics | |
| """ | |
| def __init__( | |
| self, | |
| entity_type: str, | |
| handler, | |
| mongo_db: AsyncIOMotorDatabase, | |
| pg_engine: AsyncEngine, | |
| redis_client: redis.Redis, | |
| max_queue_size: int = 10000, | |
| worker_count: int = 5, | |
| max_retries: int = 3 | |
| ): | |
| """ | |
| Initialize base sync service. | |
| Args: | |
| entity_type: Type of entity (e.g., "catalogue", "merchant") | |
| handler: Entity-specific sync handler | |
| mongo_db: MongoDB database instance | |
| pg_engine: PostgreSQL async engine | |
| redis_client: Redis client instance | |
| max_queue_size: Maximum size of sync queue | |
| worker_count: Number of background workers | |
| max_retries: Maximum retry attempts for failed syncs | |
| """ | |
| self.entity_type = entity_type | |
| self.handler = handler | |
| self.mongo_db = mongo_db | |
| self.pg_engine = pg_engine | |
| self.max_queue_size = max_queue_size | |
| self.worker_count = worker_count | |
| # Initialize components | |
| self.retry_manager = RetryManager(max_retries=max_retries) | |
| self.monitoring = SyncMonitoringService() | |
| # Redis-based persistent queue | |
| self.sync_queue = RedisSyncQueue( | |
| redis_client=redis_client, | |
| queue_name=f"sync:queue:{entity_type}", | |
| max_size=max_queue_size | |
| ) | |
| # Background workers | |
| self.workers: list[asyncio.Task] = [] | |
| self.is_running = False | |
| async def start_workers(self) -> None: | |
| """Start background workers for processing sync queue.""" | |
| if self.is_running: | |
| logger.warning(f"{self.entity_type} sync workers already running") | |
| return | |
| self.is_running = True | |
| logger.info( | |
| f"Starting {self.entity_type} sync workers", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "worker_count": self.worker_count, | |
| "max_queue_size": self.max_queue_size | |
| } | |
| ) | |
| for i in range(self.worker_count): | |
| worker = asyncio.create_task(self._worker(worker_id=i)) | |
| self.workers.append(worker) | |
| logger.info(f"Started {self.worker_count} {self.entity_type} sync workers") | |
| async def stop_workers(self) -> None: | |
| """Stop all background workers gracefully.""" | |
| if not self.is_running: | |
| logger.warning(f"{self.entity_type} sync workers not running") | |
| return | |
| logger.info(f"Stopping {self.entity_type} sync workers") | |
| self.is_running = False | |
| # Cancel all workers | |
| for worker in self.workers: | |
| worker.cancel() | |
| # Wait for workers to finish | |
| await asyncio.gather(*self.workers, return_exceptions=True) | |
| self.workers.clear() | |
| logger.info(f"{self.entity_type} sync workers stopped") | |
| async def enqueue_sync( | |
| self, | |
| entity_id: str, | |
| operation: str = "update" | |
| ) -> bool: | |
| """ | |
| Queue an entity for synchronization. | |
| Args: | |
| entity_id: ID of entity to sync | |
| operation: Type of operation ("create" | "update") | |
| Returns: | |
| True if enqueued successfully | |
| """ | |
| try: | |
| sync_op = SyncOperation( | |
| entity_type=self.entity_type, | |
| entity_id=entity_id, | |
| operation=operation | |
| ) | |
| # Enqueue to Redis | |
| success = await self.sync_queue.enqueue(sync_op) | |
| if success: | |
| queue_size = await self.sync_queue.size() | |
| logger.debug( | |
| f"{self.entity_type} queued for sync", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id, | |
| "operation": operation, | |
| "queue_size": queue_size | |
| } | |
| ) | |
| else: | |
| logger.warning( | |
| f"Failed to queue {self.entity_type} for sync (queue full)", | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id, | |
| "operation": operation | |
| } | |
| ) | |
| return success | |
| except Exception as e: | |
| logger.error( | |
| f"Error queuing {self.entity_type} for sync", | |
| exc_info=e, | |
| extra={ | |
| "entity_type": self.entity_type, | |
| "entity_id": entity_id, | |
| "error": str(e) | |
| } | |
| ) | |
| return False | |
| async def _worker(self, worker_id: int) -> None: | |
| """Background worker that processes Redis sync queue.""" | |
| logger.info( | |
| "Worker started", | |
| extra={ | |
| "event": "worker_start", | |
| "entity_type": self.entity_type, | |
| "worker_id": worker_id, | |
| }, | |
| ) | |
| iteration = 0 | |
| redis_backoff = 1.0 # seconds; resets on success | |
| while self.is_running: | |
| try: | |
| iteration += 1 | |
| if iteration % 180 == 0: | |
| log_worker_heartbeat(logger, self.entity_type, worker_id, iteration, -1) | |
| sync_op = await self.sync_queue.dequeue(timeout=1) | |
| redis_backoff = 1.0 # reset on successful dequeue call | |
| if not sync_op: | |
| continue | |
| log_sync_start(logger, self.entity_type, sync_op.entity_id, sync_op.operation, worker_id) | |
| await self._process_sync_operation(sync_op, worker_id) | |
| except asyncio.CancelledError: | |
| logger.info( | |
| "Worker cancelled", | |
| extra={"event": "worker_cancel", "entity_type": self.entity_type, "worker_id": worker_id}, | |
| ) | |
| break | |
| except Exception as e: | |
| import redis as redis_lib | |
| is_conn_error = isinstance(e, redis_lib.exceptions.ConnectionError) | |
| logger.error( | |
| "Redis connection error in worker loop" if is_conn_error else "Unhandled error in worker loop", | |
| exc_info=True, | |
| extra={ | |
| "event": "worker_redis_error" if is_conn_error else "worker_loop_error", | |
| "entity_type": self.entity_type, | |
| "worker_id": worker_id, | |
| "backoff_seconds": redis_backoff, | |
| "error": str(e), | |
| }, | |
| ) | |
| await asyncio.sleep(redis_backoff) | |
| # Exponential backoff capped at 30s, with jitter | |
| import random | |
| redis_backoff = min(redis_backoff * 2 + random.uniform(0, 1), 30.0) | |
| logger.info( | |
| "Worker stopped", | |
| extra={"event": "worker_stop", "entity_type": self.entity_type, "worker_id": worker_id}, | |
| ) | |
| async def _process_sync_operation( | |
| self, | |
| sync_op: SyncOperation, | |
| worker_id: int | |
| ) -> None: | |
| """Process a single sync operation with retry logic.""" | |
| start_time = time.time() | |
| try: | |
| await self.retry_manager.execute_with_retry( | |
| self._sync_with_connection, | |
| sync_op.entity_id, | |
| sync_op.operation, | |
| entity_type=self.entity_type, | |
| entity_id=sync_op.entity_id | |
| ) | |
| duration_ms = (time.time() - start_time) * 1000 | |
| self.monitoring.record_sync_success( | |
| entity_type=self.entity_type, | |
| entity_id=sync_op.entity_id, | |
| duration_ms=duration_ms, | |
| ) | |
| await self.sync_queue.record_processed() | |
| log_sync_success( | |
| logger, self.entity_type, sync_op.entity_id, | |
| sync_op.operation, worker_id, duration_ms, | |
| ) | |
| except Exception as e: | |
| duration_ms = (time.time() - start_time) * 1000 | |
| error_msg = str(e) | |
| self.monitoring.record_sync_failure( | |
| entity_type=self.entity_type, | |
| entity_id=sync_op.entity_id, | |
| error=error_msg, | |
| stack_trace=traceback.format_exc(), | |
| ) | |
| await self.sync_queue.record_failed() | |
| log_sync_failure( | |
| logger, self.entity_type, sync_op.entity_id, | |
| sync_op.operation, worker_id, duration_ms, | |
| error=error_msg, | |
| attempt=self.retry_manager.max_retries + 1, | |
| max_attempts=self.retry_manager.max_retries + 1, | |
| ) | |
| async def _sync_with_connection(self, entity_id: str, operation: str = "update") -> bool: | |
| """ | |
| Sync a single entity with connection management. | |
| Args: | |
| entity_id: ID of entity to sync | |
| operation: Type of operation ("create" | "update" | "delete") | |
| Returns: | |
| True if sync successful | |
| """ | |
| async with self.pg_engine.begin() as pg_conn: | |
| # Perform sync using handler | |
| success = await self.handler.sync( | |
| entity_id=entity_id, | |
| mongo_db=self.mongo_db, | |
| pg_conn=pg_conn, | |
| operation=operation | |
| ) | |
| return success | |
| async def get_queue_size(self) -> int: | |
| """Get current size of sync queue.""" | |
| return await self.sync_queue.size() | |
| async def get_queue_stats(self) -> dict: | |
| """Get detailed queue statistics.""" | |
| return await self.sync_queue.get_stats() | |
| def get_metrics(self) -> dict: | |
| """Get sync metrics for this entity type.""" | |
| return self.monitoring.get_entity_metrics(self.entity_type) | |