""" Redis-based persistent queue for sync operations. Replaces in-memory asyncio.Queue with Redis Lists for durability. """ import json import asyncio from typing import Optional, Dict, Any from datetime import datetime import redis.asyncio as redis from app.core.logging import get_logger from app.sync.common.models import SyncOperation logger = get_logger(__name__) class RedisSyncQueue: """ Redis-backed persistent queue for sync operations. Uses Redis Lists (LPUSH/BRPOP) for FIFO queue with persistence. Operations survive application restarts. """ def __init__( self, redis_client: redis.Redis, queue_name: str, max_size: int = 10000 ): """ Initialize Redis sync queue. Args: redis_client: Redis client instance (can be None if Redis unavailable) queue_name: Name of the Redis list key (e.g., "sync:queue:catalogues") max_size: Maximum queue size (soft limit) """ self.redis_client = redis_client self.queue_name = queue_name self.max_size = max_size self.stats_key = f"{queue_name}:stats" # Log Redis connection info for debugging logger.info( "Initialized RedisSyncQueue", extra={ "event": "queue_init", "queue_name": queue_name, "max_size": max_size, "redis_available": redis_client is not None, "note": "Sync queue will not persist if Redis is unavailable" } ) async def enqueue(self, sync_op: SyncOperation) -> bool: """ Add sync operation to queue. Args: sync_op: Sync operation to enqueue Returns: True if enqueued successfully, False if queue is full or Redis unavailable """ if not self.redis_client: logger.debug( "Redis unavailable - skipping enqueue", extra={ "event": "enqueue_skipped", "queue_name": self.queue_name, "entity_type": sync_op.entity_type, "entity_id": sync_op.entity_id } ) return False try: # Check queue size (soft limit) current_size = await self.size() if current_size >= self.max_size: logger.warning( "Queue is full", extra={ "event": "queue_full", "queue_name": self.queue_name, "current_size": current_size, "max_size": self.max_size } ) return False # Serialize operation payload = { "entity_type": sync_op.entity_type, "entity_id": sync_op.entity_id, "operation": sync_op.operation, "enqueued_at": datetime.utcnow().isoformat() } # Push to Redis list (left push for FIFO with right pop) await self.redis_client.lpush(self.queue_name, json.dumps(payload)) # Update stats await self.redis_client.hincrby(self.stats_key, "total_enqueued", 1) logger.debug( "Enqueued sync operation", extra={ "event": "enqueued", "queue_name": self.queue_name, "entity_type": sync_op.entity_type, "entity_id": sync_op.entity_id, "operation": sync_op.operation, "queue_size": current_size + 1 } ) return True except Exception as e: logger.error( "Error enqueuing sync operation", exc_info=True, extra={ "event": "enqueue_error", "queue_name": self.queue_name, "entity_type": sync_op.entity_type, "entity_id": sync_op.entity_id } ) return False async def dequeue(self, timeout: int = 1) -> Optional[SyncOperation]: """ Remove and return sync operation from queue (blocking). Args: timeout: Timeout in seconds for blocking pop (0 = block forever) Returns: SyncOperation if available, None if timeout or Redis unavailable """ if not self.redis_client: await asyncio.sleep(timeout) # Simulate timeout return None try: # Blocking right pop (FIFO) logger.debug( "Attempting to dequeue", extra={"event": "dequeue_attempt", "queue_name": self.queue_name, "timeout": timeout}, ) result = await self.redis_client.brpop(self.queue_name, timeout=timeout) logger.info( "Redis BRPOP result", extra={"event": "dequeue_result", "queue_name": self.queue_name, "has_result": result is not None, "result_type": str(type(result))}, ) if not result: logger.debug("Dequeue timeout", extra={"event": "dequeue_timeout", "queue_name": self.queue_name}) return None # result is tuple: (key, value) _, payload_json = result payload = json.loads(payload_json) # Deserialize operation sync_op = SyncOperation( entity_type=payload["entity_type"], entity_id=payload["entity_id"], operation=payload["operation"] ) # Update stats await self.redis_client.hincrby(self.stats_key, "total_dequeued", 1) logger.info( "Dequeued sync operation", extra={ "event": "dequeued", "queue_name": self.queue_name, "entity_type": sync_op.entity_type, "entity_id": sync_op.entity_id, "operation": sync_op.operation } ) return sync_op except asyncio.TimeoutError: logger.debug("Dequeue asyncio timeout", extra={"event": "dequeue_async_timeout", "queue_name": self.queue_name}) return None except Exception as e: logger.error( "Error dequeuing sync operation", exc_info=True, extra={"queue_name": self.queue_name, "error": str(e)} ) return None async def size(self) -> int: """ Get current queue size. Returns: Number of operations in queue (0 if Redis unavailable) """ if not self.redis_client: return 0 try: size = await self.redis_client.llen(self.queue_name) return size except Exception as e: logger.error( "Error getting queue size", exc_info=True, extra={"queue_name": self.queue_name, "error": str(e)} ) return 0 async def clear(self) -> bool: """ Clear all operations from queue. Returns: True if cleared successfully """ try: await self.redis_client.delete(self.queue_name) logger.info( "Cleared queue", extra={"event": "queue_cleared", "queue_name": self.queue_name} ) return True except Exception as e: logger.error( "Error clearing queue", exc_info=True, extra={"queue_name": self.queue_name} ) return False async def get_stats(self) -> Dict[str, Any]: """ Get queue statistics. Returns: Dictionary with queue stats """ try: stats = await self.redis_client.hgetall(self.stats_key) current_size = await self.size() return { "queue_name": self.queue_name, "current_size": current_size, "max_size": self.max_size, "total_enqueued": int(stats.get("total_enqueued", 0)), "total_dequeued": int(stats.get("total_dequeued", 0)), "total_processed": int(stats.get("total_processed", 0)), "total_failed": int(stats.get("total_failed", 0)) } except Exception as e: logger.error( "Error getting queue stats", exc_info=True, extra={"queue_name": self.queue_name} ) return { "queue_name": self.queue_name, "error": str(e) } async def record_processed(self) -> None: """Record successful processing of an operation.""" try: await self.redis_client.hincrby(self.stats_key, "total_processed", 1) except Exception as e: logger.error("Error recording processed stat", exc_info=True, extra={"event": "queue_stat_record_failure", "stat": "total_processed", "error": str(e)}) async def record_failed(self) -> None: """Record failed processing of an operation.""" try: await self.redis_client.hincrby(self.stats_key, "total_failed", 1) except Exception as e: logger.error("Error recording failed stat", exc_info=True, extra={"event": "queue_stat_record_failure", "stat": "total_failed", "error": str(e)}) async def peek(self, count: int = 10) -> list: """ Peek at operations in queue without removing them. Args: count: Number of operations to peek at Returns: List of operations (as dicts) """ try: # Get operations from right (oldest first) items = await self.redis_client.lrange(self.queue_name, -count, -1) return [json.loads(item) for item in items] except Exception as e: logger.error( "Error peeking at queue", exc_info=True, extra={"queue_name": self.queue_name} ) return []