Spaces:
Runtime error
Runtime error
| """ | |
| Redis-based persistent queue for sync operations. | |
| Replaces in-memory asyncio.Queue with Redis Lists for durability. | |
| """ | |
| import json | |
| import asyncio | |
| from typing import Optional, Dict, Any | |
| from datetime import datetime | |
| import redis.asyncio as redis | |
| from app.core.logging import get_logger | |
| from app.sync.common.models import SyncOperation | |
| logger = get_logger(__name__) | |
| class RedisSyncQueue: | |
| """ | |
| Redis-backed persistent queue for sync operations. | |
| Uses Redis Lists (LPUSH/BRPOP) for FIFO queue with persistence. | |
| Operations survive application restarts. | |
| """ | |
| def __init__( | |
| self, | |
| redis_client: redis.Redis, | |
| queue_name: str, | |
| max_size: int = 10000 | |
| ): | |
| """ | |
| Initialize Redis sync queue. | |
| Args: | |
| redis_client: Redis client instance (can be None if Redis unavailable) | |
| queue_name: Name of the Redis list key (e.g., "sync:queue:catalogues") | |
| max_size: Maximum queue size (soft limit) | |
| """ | |
| self.redis_client = redis_client | |
| self.queue_name = queue_name | |
| self.max_size = max_size | |
| self.stats_key = f"{queue_name}:stats" | |
| # Log Redis connection info for debugging | |
| logger.info( | |
| "Initialized RedisSyncQueue", | |
| extra={ | |
| "event": "queue_init", | |
| "queue_name": queue_name, | |
| "max_size": max_size, | |
| "redis_available": redis_client is not None, | |
| "note": "Sync queue will not persist if Redis is unavailable" | |
| } | |
| ) | |
| async def enqueue(self, sync_op: SyncOperation) -> bool: | |
| """ | |
| Add sync operation to queue. | |
| Args: | |
| sync_op: Sync operation to enqueue | |
| Returns: | |
| True if enqueued successfully, False if queue is full or Redis unavailable | |
| """ | |
| if not self.redis_client: | |
| logger.debug( | |
| "Redis unavailable - skipping enqueue", | |
| extra={ | |
| "event": "enqueue_skipped", | |
| "queue_name": self.queue_name, | |
| "entity_type": sync_op.entity_type, | |
| "entity_id": sync_op.entity_id | |
| } | |
| ) | |
| return False | |
| try: | |
| # Check queue size (soft limit) | |
| current_size = await self.size() | |
| if current_size >= self.max_size: | |
| logger.warning( | |
| "Queue is full", | |
| extra={ | |
| "event": "queue_full", | |
| "queue_name": self.queue_name, | |
| "current_size": current_size, | |
| "max_size": self.max_size | |
| } | |
| ) | |
| return False | |
| # Serialize operation | |
| payload = { | |
| "entity_type": sync_op.entity_type, | |
| "entity_id": sync_op.entity_id, | |
| "operation": sync_op.operation, | |
| "enqueued_at": datetime.utcnow().isoformat() | |
| } | |
| # Push to Redis list (left push for FIFO with right pop) | |
| await self.redis_client.lpush(self.queue_name, json.dumps(payload)) | |
| # Update stats | |
| await self.redis_client.hincrby(self.stats_key, "total_enqueued", 1) | |
| logger.debug( | |
| "Enqueued sync operation", | |
| extra={ | |
| "event": "enqueued", | |
| "queue_name": self.queue_name, | |
| "entity_type": sync_op.entity_type, | |
| "entity_id": sync_op.entity_id, | |
| "operation": sync_op.operation, | |
| "queue_size": current_size + 1 | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| "Error enqueuing sync operation", | |
| exc_info=True, | |
| extra={ | |
| "event": "enqueue_error", | |
| "queue_name": self.queue_name, | |
| "entity_type": sync_op.entity_type, | |
| "entity_id": sync_op.entity_id | |
| } | |
| ) | |
| return False | |
| async def dequeue(self, timeout: int = 1) -> Optional[SyncOperation]: | |
| """ | |
| Remove and return sync operation from queue (blocking). | |
| Args: | |
| timeout: Timeout in seconds for blocking pop (0 = block forever) | |
| Returns: | |
| SyncOperation if available, None if timeout or Redis unavailable | |
| """ | |
| if not self.redis_client: | |
| await asyncio.sleep(timeout) # Simulate timeout | |
| return None | |
| try: | |
| # Blocking right pop (FIFO) | |
| logger.debug( | |
| "Attempting to dequeue", | |
| extra={"event": "dequeue_attempt", "queue_name": self.queue_name, "timeout": timeout}, | |
| ) | |
| result = await self.redis_client.brpop(self.queue_name, timeout=timeout) | |
| logger.info( | |
| "Redis BRPOP result", | |
| extra={"event": "dequeue_result", "queue_name": self.queue_name, "has_result": result is not None, "result_type": str(type(result))}, | |
| ) | |
| if not result: | |
| logger.debug("Dequeue timeout", extra={"event": "dequeue_timeout", "queue_name": self.queue_name}) | |
| return None | |
| # result is tuple: (key, value) | |
| _, payload_json = result | |
| payload = json.loads(payload_json) | |
| # Deserialize operation | |
| sync_op = SyncOperation( | |
| entity_type=payload["entity_type"], | |
| entity_id=payload["entity_id"], | |
| operation=payload["operation"] | |
| ) | |
| # Update stats | |
| await self.redis_client.hincrby(self.stats_key, "total_dequeued", 1) | |
| logger.info( | |
| "Dequeued sync operation", | |
| extra={ | |
| "event": "dequeued", | |
| "queue_name": self.queue_name, | |
| "entity_type": sync_op.entity_type, | |
| "entity_id": sync_op.entity_id, | |
| "operation": sync_op.operation | |
| } | |
| ) | |
| return sync_op | |
| except asyncio.TimeoutError: | |
| logger.debug("Dequeue asyncio timeout", extra={"event": "dequeue_async_timeout", "queue_name": self.queue_name}) | |
| return None | |
| except Exception as e: | |
| logger.error( | |
| "Error dequeuing sync operation", | |
| exc_info=True, | |
| extra={"queue_name": self.queue_name, "error": str(e)} | |
| ) | |
| return None | |
| async def size(self) -> int: | |
| """ | |
| Get current queue size. | |
| Returns: | |
| Number of operations in queue (0 if Redis unavailable) | |
| """ | |
| if not self.redis_client: | |
| return 0 | |
| try: | |
| size = await self.redis_client.llen(self.queue_name) | |
| return size | |
| except Exception as e: | |
| logger.error( | |
| "Error getting queue size", | |
| exc_info=True, | |
| extra={"queue_name": self.queue_name, "error": str(e)} | |
| ) | |
| return 0 | |
| async def clear(self) -> bool: | |
| """ | |
| Clear all operations from queue. | |
| Returns: | |
| True if cleared successfully | |
| """ | |
| try: | |
| await self.redis_client.delete(self.queue_name) | |
| logger.info( | |
| "Cleared queue", | |
| extra={"event": "queue_cleared", "queue_name": self.queue_name} | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| "Error clearing queue", | |
| exc_info=True, | |
| extra={"queue_name": self.queue_name} | |
| ) | |
| return False | |
| async def get_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get queue statistics. | |
| Returns: | |
| Dictionary with queue stats | |
| """ | |
| try: | |
| stats = await self.redis_client.hgetall(self.stats_key) | |
| current_size = await self.size() | |
| return { | |
| "queue_name": self.queue_name, | |
| "current_size": current_size, | |
| "max_size": self.max_size, | |
| "total_enqueued": int(stats.get("total_enqueued", 0)), | |
| "total_dequeued": int(stats.get("total_dequeued", 0)), | |
| "total_processed": int(stats.get("total_processed", 0)), | |
| "total_failed": int(stats.get("total_failed", 0)) | |
| } | |
| except Exception as e: | |
| logger.error( | |
| "Error getting queue stats", | |
| exc_info=True, | |
| extra={"queue_name": self.queue_name} | |
| ) | |
| return { | |
| "queue_name": self.queue_name, | |
| "error": str(e) | |
| } | |
| async def record_processed(self) -> None: | |
| """Record successful processing of an operation.""" | |
| try: | |
| await self.redis_client.hincrby(self.stats_key, "total_processed", 1) | |
| except Exception as e: | |
| logger.error("Error recording processed stat", exc_info=True, extra={"event": "queue_stat_record_failure", "stat": "total_processed", "error": str(e)}) | |
| async def record_failed(self) -> None: | |
| """Record failed processing of an operation.""" | |
| try: | |
| await self.redis_client.hincrby(self.stats_key, "total_failed", 1) | |
| except Exception as e: | |
| logger.error("Error recording failed stat", exc_info=True, extra={"event": "queue_stat_record_failure", "stat": "total_failed", "error": str(e)}) | |
| async def peek(self, count: int = 10) -> list: | |
| """ | |
| Peek at operations in queue without removing them. | |
| Args: | |
| count: Number of operations to peek at | |
| Returns: | |
| List of operations (as dicts) | |
| """ | |
| try: | |
| # Get operations from right (oldest first) | |
| items = await self.redis_client.lrange(self.queue_name, -count, -1) | |
| return [json.loads(item) for item in items] | |
| except Exception as e: | |
| logger.error( | |
| "Error peeking at queue", | |
| exc_info=True, | |
| extra={"queue_name": self.queue_name} | |
| ) | |
| return [] | |