MukeshKapoor25's picture
fix(redis): cut workers to 7 total, remove per-iteration Redis calls to reduce connection pressure
dfbedd9
"""
Base sync service with Redis queue support.
Template for all entity-specific sync services.
"""
import asyncio
from typing import Optional
from motor.motor_asyncio import AsyncIOMotorDatabase
from sqlalchemy.ext.asyncio import AsyncEngine
import redis.asyncio as redis
from app.core.logging import (
get_logger,
log_sync_start,
log_sync_success,
log_sync_failure,
log_worker_heartbeat,
)
import time
import traceback
from app.sync.common.retry import RetryManager
from app.sync.common.monitoring import SyncMonitoringService
from app.sync.common.models import SyncOperation
from app.sync.common.redis_queue import RedisSyncQueue
logger = get_logger(__name__)
class BaseSyncService:
"""
Base class for entity sync services with Redis queue support.
Provides common functionality:
- Redis-based persistent queue
- Background worker management
- Retry logic
- Monitoring and metrics
"""
def __init__(
self,
entity_type: str,
handler,
mongo_db: AsyncIOMotorDatabase,
pg_engine: AsyncEngine,
redis_client: redis.Redis,
max_queue_size: int = 10000,
worker_count: int = 5,
max_retries: int = 3
):
"""
Initialize base sync service.
Args:
entity_type: Type of entity (e.g., "catalogue", "merchant")
handler: Entity-specific sync handler
mongo_db: MongoDB database instance
pg_engine: PostgreSQL async engine
redis_client: Redis client instance
max_queue_size: Maximum size of sync queue
worker_count: Number of background workers
max_retries: Maximum retry attempts for failed syncs
"""
self.entity_type = entity_type
self.handler = handler
self.mongo_db = mongo_db
self.pg_engine = pg_engine
self.max_queue_size = max_queue_size
self.worker_count = worker_count
# Initialize components
self.retry_manager = RetryManager(max_retries=max_retries)
self.monitoring = SyncMonitoringService()
# Redis-based persistent queue
self.sync_queue = RedisSyncQueue(
redis_client=redis_client,
queue_name=f"sync:queue:{entity_type}",
max_size=max_queue_size
)
# Background workers
self.workers: list[asyncio.Task] = []
self.is_running = False
async def start_workers(self) -> None:
"""Start background workers for processing sync queue."""
if self.is_running:
logger.warning(f"{self.entity_type} sync workers already running")
return
self.is_running = True
logger.info(
f"Starting {self.entity_type} sync workers",
extra={
"entity_type": self.entity_type,
"worker_count": self.worker_count,
"max_queue_size": self.max_queue_size
}
)
for i in range(self.worker_count):
worker = asyncio.create_task(self._worker(worker_id=i))
self.workers.append(worker)
logger.info(f"Started {self.worker_count} {self.entity_type} sync workers")
async def stop_workers(self) -> None:
"""Stop all background workers gracefully."""
if not self.is_running:
logger.warning(f"{self.entity_type} sync workers not running")
return
logger.info(f"Stopping {self.entity_type} sync workers")
self.is_running = False
# Cancel all workers
for worker in self.workers:
worker.cancel()
# Wait for workers to finish
await asyncio.gather(*self.workers, return_exceptions=True)
self.workers.clear()
logger.info(f"{self.entity_type} sync workers stopped")
async def enqueue_sync(
self,
entity_id: str,
operation: str = "update"
) -> bool:
"""
Queue an entity for synchronization.
Args:
entity_id: ID of entity to sync
operation: Type of operation ("create" | "update")
Returns:
True if enqueued successfully
"""
try:
sync_op = SyncOperation(
entity_type=self.entity_type,
entity_id=entity_id,
operation=operation
)
# Enqueue to Redis
success = await self.sync_queue.enqueue(sync_op)
if success:
queue_size = await self.sync_queue.size()
logger.debug(
f"{self.entity_type} queued for sync",
extra={
"entity_type": self.entity_type,
"entity_id": entity_id,
"operation": operation,
"queue_size": queue_size
}
)
else:
logger.warning(
f"Failed to queue {self.entity_type} for sync (queue full)",
extra={
"entity_type": self.entity_type,
"entity_id": entity_id,
"operation": operation
}
)
return success
except Exception as e:
logger.error(
f"Error queuing {self.entity_type} for sync",
exc_info=e,
extra={
"entity_type": self.entity_type,
"entity_id": entity_id,
"error": str(e)
}
)
return False
async def _worker(self, worker_id: int) -> None:
"""Background worker that processes Redis sync queue."""
logger.info(
"Worker started",
extra={
"event": "worker_start",
"entity_type": self.entity_type,
"worker_id": worker_id,
},
)
iteration = 0
redis_backoff = 1.0 # seconds; resets on success
while self.is_running:
try:
iteration += 1
if iteration % 180 == 0:
log_worker_heartbeat(logger, self.entity_type, worker_id, iteration, -1)
sync_op = await self.sync_queue.dequeue(timeout=1)
redis_backoff = 1.0 # reset on successful dequeue call
if not sync_op:
continue
log_sync_start(logger, self.entity_type, sync_op.entity_id, sync_op.operation, worker_id)
await self._process_sync_operation(sync_op, worker_id)
except asyncio.CancelledError:
logger.info(
"Worker cancelled",
extra={"event": "worker_cancel", "entity_type": self.entity_type, "worker_id": worker_id},
)
break
except Exception as e:
import redis as redis_lib
is_conn_error = isinstance(e, redis_lib.exceptions.ConnectionError)
logger.error(
"Redis connection error in worker loop" if is_conn_error else "Unhandled error in worker loop",
exc_info=True,
extra={
"event": "worker_redis_error" if is_conn_error else "worker_loop_error",
"entity_type": self.entity_type,
"worker_id": worker_id,
"backoff_seconds": redis_backoff,
"error": str(e),
},
)
await asyncio.sleep(redis_backoff)
# Exponential backoff capped at 30s, with jitter
import random
redis_backoff = min(redis_backoff * 2 + random.uniform(0, 1), 30.0)
logger.info(
"Worker stopped",
extra={"event": "worker_stop", "entity_type": self.entity_type, "worker_id": worker_id},
)
async def _process_sync_operation(
self,
sync_op: SyncOperation,
worker_id: int
) -> None:
"""Process a single sync operation with retry logic."""
start_time = time.time()
try:
await self.retry_manager.execute_with_retry(
self._sync_with_connection,
sync_op.entity_id,
sync_op.operation,
entity_type=self.entity_type,
entity_id=sync_op.entity_id
)
duration_ms = (time.time() - start_time) * 1000
self.monitoring.record_sync_success(
entity_type=self.entity_type,
entity_id=sync_op.entity_id,
duration_ms=duration_ms,
)
await self.sync_queue.record_processed()
log_sync_success(
logger, self.entity_type, sync_op.entity_id,
sync_op.operation, worker_id, duration_ms,
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
error_msg = str(e)
self.monitoring.record_sync_failure(
entity_type=self.entity_type,
entity_id=sync_op.entity_id,
error=error_msg,
stack_trace=traceback.format_exc(),
)
await self.sync_queue.record_failed()
log_sync_failure(
logger, self.entity_type, sync_op.entity_id,
sync_op.operation, worker_id, duration_ms,
error=error_msg,
attempt=self.retry_manager.max_retries + 1,
max_attempts=self.retry_manager.max_retries + 1,
)
async def _sync_with_connection(self, entity_id: str, operation: str = "update") -> bool:
"""
Sync a single entity with connection management.
Args:
entity_id: ID of entity to sync
operation: Type of operation ("create" | "update" | "delete")
Returns:
True if sync successful
"""
async with self.pg_engine.begin() as pg_conn:
# Perform sync using handler
success = await self.handler.sync(
entity_id=entity_id,
mongo_db=self.mongo_db,
pg_conn=pg_conn,
operation=operation
)
return success
async def get_queue_size(self) -> int:
"""Get current size of sync queue."""
return await self.sync_queue.size()
async def get_queue_stats(self) -> dict:
"""Get detailed queue statistics."""
return await self.sync_queue.get_stats()
def get_metrics(self) -> dict:
"""Get sync metrics for this entity type."""
return self.monitoring.get_entity_metrics(self.entity_type)