Spaces:
Runtime error
Runtime error
fix(redis): cut workers to 7 total, remove per-iteration Redis calls to reduce connection pressure
dfbedd9 | """ | |
| Redis-based persistent queue for sync operations. | |
| Replaces in-memory asyncio.Queue with Redis Lists for durability. | |
| """ | |
| import json | |
| import asyncio | |
| import random | |
| from typing import Optional, Dict, Any | |
| from datetime import datetime | |
| import redis.asyncio as aioredis | |
| import redis as redis_sync # for redis.exceptions | |
| from app.core.logging import get_logger | |
| from app.sync.common.models import SyncOperation | |
| logger = get_logger(__name__) | |
| class RedisSyncQueue: | |
| """ | |
| Redis-backed persistent queue for sync operations. | |
| Uses Redis Lists (LPUSH/BRPOP) for FIFO queue with persistence. | |
| Operations survive application restarts. | |
| """ | |
| def __init__( | |
| self, | |
| redis_client: aioredis.Redis, | |
| queue_name: str, | |
| max_size: int = 10000 | |
| ): | |
| self.redis_client = redis_client | |
| self.queue_name = queue_name | |
| self.max_size = max_size | |
| self.stats_key = f"{queue_name}:stats" | |
| logger.info( | |
| "Initialized RedisSyncQueue", | |
| extra={"queue_name": queue_name, "max_size": max_size}, | |
| ) | |
| async def enqueue(self, sync_op: SyncOperation) -> bool: | |
| """ | |
| Add sync operation to queue. | |
| Args: | |
| sync_op: Sync operation to enqueue | |
| Returns: | |
| True if enqueued successfully, False if queue is full | |
| """ | |
| try: | |
| # Serialize operation | |
| payload = { | |
| "entity_type": sync_op.entity_type, | |
| "entity_id": sync_op.entity_id, | |
| "operation": sync_op.operation, | |
| "enqueued_at": datetime.utcnow().isoformat() | |
| } | |
| # Push to Redis list (left push for FIFO with right pop) | |
| await self.redis_client.lpush(self.queue_name, json.dumps(payload)) | |
| # Update stats | |
| await self.redis_client.hincrby(self.stats_key, "total_enqueued", 1) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| f"Error enqueuing sync operation", | |
| exc_info=e, | |
| extra={ | |
| "queue_name": self.queue_name, | |
| "entity_type": sync_op.entity_type, | |
| "entity_id": sync_op.entity_id | |
| } | |
| ) | |
| return False | |
| async def dequeue(self, timeout: int = 1) -> Optional[SyncOperation]: | |
| """ | |
| Remove and return sync operation from queue (blocking). | |
| Raises redis.exceptions.ConnectionError so the caller (worker loop) | |
| can apply its own backoff instead of tight-looping on Redis failures. | |
| """ | |
| try: | |
| result = await self.redis_client.brpop(self.queue_name, timeout=timeout) | |
| if not result: | |
| return None | |
| _, payload_json = result | |
| payload = json.loads(payload_json) | |
| sync_op = SyncOperation( | |
| entity_type=payload["entity_type"], | |
| entity_id=payload["entity_id"], | |
| operation=payload["operation"], | |
| ) | |
| logger.info( | |
| "Dequeued sync operation", | |
| extra={ | |
| "event": "dequeued", | |
| "queue_name": self.queue_name, | |
| "entity_type": sync_op.entity_type, | |
| "entity_id": sync_op.entity_id, | |
| "operation": sync_op.operation, | |
| }, | |
| ) | |
| return sync_op | |
| except asyncio.TimeoutError: | |
| return None | |
| except redis_sync.exceptions.ConnectionError: | |
| # Re-raise so the worker loop's except-branch applies backoff sleep. | |
| # Do NOT swallow — that causes a tight retry loop that exhausts the pool. | |
| raise | |
| except Exception as e: | |
| logger.error( | |
| "Error dequeuing sync operation", | |
| exc_info=True, | |
| extra={"queue_name": self.queue_name, "error": str(e)}, | |
| ) | |
| return None | |
| async def size(self) -> int: | |
| """ | |
| Get current queue size. | |
| Returns: | |
| Number of operations in queue | |
| """ | |
| try: | |
| size = await self.redis_client.llen(self.queue_name) | |
| return size | |
| except Exception as e: | |
| logger.error( | |
| f"Error getting queue size for {self.queue_name}", | |
| exc_info=e, | |
| extra={"queue_name": self.queue_name, "error": str(e)} | |
| ) | |
| return 0 | |
| async def clear(self) -> bool: | |
| """ | |
| Clear all operations from queue. | |
| Returns: | |
| True if cleared successfully | |
| """ | |
| try: | |
| await self.redis_client.delete(self.queue_name) | |
| logger.info( | |
| f"Cleared queue", | |
| extra={"queue_name": self.queue_name} | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| f"Error clearing queue", | |
| exc_info=e, | |
| extra={"queue_name": self.queue_name} | |
| ) | |
| return False | |
| async def get_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get queue statistics. | |
| Returns: | |
| Dictionary with queue stats | |
| """ | |
| try: | |
| stats = await self.redis_client.hgetall(self.stats_key) | |
| current_size = await self.size() | |
| return { | |
| "queue_name": self.queue_name, | |
| "current_size": current_size, | |
| "max_size": self.max_size, | |
| "total_enqueued": int(stats.get("total_enqueued", 0)), | |
| "total_dequeued": int(stats.get("total_dequeued", 0)), | |
| "total_processed": int(stats.get("total_processed", 0)), | |
| "total_failed": int(stats.get("total_failed", 0)) | |
| } | |
| except Exception as e: | |
| logger.error( | |
| f"Error getting queue stats", | |
| exc_info=e, | |
| extra={"queue_name": self.queue_name} | |
| ) | |
| return { | |
| "queue_name": self.queue_name, | |
| "error": str(e) | |
| } | |
| async def record_processed(self) -> None: | |
| """Record successful processing of an operation.""" | |
| try: | |
| await self.redis_client.hincrby(self.stats_key, "total_processed", 1) | |
| except Exception as e: | |
| logger.error(f"Error recording processed stat", exc_info=e) | |
| async def record_failed(self) -> None: | |
| """Record failed processing of an operation.""" | |
| try: | |
| await self.redis_client.hincrby(self.stats_key, "total_failed", 1) | |
| except Exception as e: | |
| logger.error(f"Error recording failed stat", exc_info=e) | |
| async def peek(self, count: int = 10) -> list: | |
| """ | |
| Peek at operations in queue without removing them. | |
| Args: | |
| count: Number of operations to peek at | |
| Returns: | |
| List of operations (as dicts) | |
| """ | |
| try: | |
| # Get operations from right (oldest first) | |
| items = await self.redis_client.lrange(self.queue_name, -count, -1) | |
| return [json.loads(item) for item in items] | |
| except Exception as e: | |
| logger.error( | |
| f"Error peeking at queue", | |
| exc_info=e, | |
| extra={"queue_name": self.queue_name} | |
| ) | |
| return [] | |