cuatrolabs-scm-ms / app /sync /common /redis_queue.py
MukeshKapoor25's picture
Refactor logging in Trade Returns, UOM, and Warehouse services
7bd7f5e
"""
Redis-based persistent queue for sync operations.
Replaces in-memory asyncio.Queue with Redis Lists for durability.
"""
import json
import asyncio
from typing import Optional, Dict, Any
from datetime import datetime
import redis.asyncio as redis
from app.core.logging import get_logger
from app.sync.common.models import SyncOperation
logger = get_logger(__name__)
class RedisSyncQueue:
"""
Redis-backed persistent queue for sync operations.
Uses Redis Lists (LPUSH/BRPOP) for FIFO queue with persistence.
Operations survive application restarts.
"""
def __init__(
self,
redis_client: redis.Redis,
queue_name: str,
max_size: int = 10000
):
"""
Initialize Redis sync queue.
Args:
redis_client: Redis client instance (can be None if Redis unavailable)
queue_name: Name of the Redis list key (e.g., "sync:queue:catalogues")
max_size: Maximum queue size (soft limit)
"""
self.redis_client = redis_client
self.queue_name = queue_name
self.max_size = max_size
self.stats_key = f"{queue_name}:stats"
# Log Redis connection info for debugging
logger.info(
"Initialized RedisSyncQueue",
extra={
"event": "queue_init",
"queue_name": queue_name,
"max_size": max_size,
"redis_available": redis_client is not None,
"note": "Sync queue will not persist if Redis is unavailable"
}
)
async def enqueue(self, sync_op: SyncOperation) -> bool:
"""
Add sync operation to queue.
Args:
sync_op: Sync operation to enqueue
Returns:
True if enqueued successfully, False if queue is full or Redis unavailable
"""
if not self.redis_client:
logger.debug(
"Redis unavailable - skipping enqueue",
extra={
"event": "enqueue_skipped",
"queue_name": self.queue_name,
"entity_type": sync_op.entity_type,
"entity_id": sync_op.entity_id
}
)
return False
try:
# Check queue size (soft limit)
current_size = await self.size()
if current_size >= self.max_size:
logger.warning(
"Queue is full",
extra={
"event": "queue_full",
"queue_name": self.queue_name,
"current_size": current_size,
"max_size": self.max_size
}
)
return False
# Serialize operation
payload = {
"entity_type": sync_op.entity_type,
"entity_id": sync_op.entity_id,
"operation": sync_op.operation,
"enqueued_at": datetime.utcnow().isoformat()
}
# Push to Redis list (left push for FIFO with right pop)
await self.redis_client.lpush(self.queue_name, json.dumps(payload))
# Update stats
await self.redis_client.hincrby(self.stats_key, "total_enqueued", 1)
logger.debug(
"Enqueued sync operation",
extra={
"event": "enqueued",
"queue_name": self.queue_name,
"entity_type": sync_op.entity_type,
"entity_id": sync_op.entity_id,
"operation": sync_op.operation,
"queue_size": current_size + 1
}
)
return True
except Exception as e:
logger.error(
"Error enqueuing sync operation",
exc_info=True,
extra={
"event": "enqueue_error",
"queue_name": self.queue_name,
"entity_type": sync_op.entity_type,
"entity_id": sync_op.entity_id
}
)
return False
async def dequeue(self, timeout: int = 1) -> Optional[SyncOperation]:
"""
Remove and return sync operation from queue (blocking).
Args:
timeout: Timeout in seconds for blocking pop (0 = block forever)
Returns:
SyncOperation if available, None if timeout or Redis unavailable
"""
if not self.redis_client:
await asyncio.sleep(timeout) # Simulate timeout
return None
try:
# Blocking right pop (FIFO)
logger.debug(
"Attempting to dequeue",
extra={"event": "dequeue_attempt", "queue_name": self.queue_name, "timeout": timeout},
)
result = await self.redis_client.brpop(self.queue_name, timeout=timeout)
logger.info(
"Redis BRPOP result",
extra={"event": "dequeue_result", "queue_name": self.queue_name, "has_result": result is not None, "result_type": str(type(result))},
)
if not result:
logger.debug("Dequeue timeout", extra={"event": "dequeue_timeout", "queue_name": self.queue_name})
return None
# result is tuple: (key, value)
_, payload_json = result
payload = json.loads(payload_json)
# Deserialize operation
sync_op = SyncOperation(
entity_type=payload["entity_type"],
entity_id=payload["entity_id"],
operation=payload["operation"]
)
# Update stats
await self.redis_client.hincrby(self.stats_key, "total_dequeued", 1)
logger.info(
"Dequeued sync operation",
extra={
"event": "dequeued",
"queue_name": self.queue_name,
"entity_type": sync_op.entity_type,
"entity_id": sync_op.entity_id,
"operation": sync_op.operation
}
)
return sync_op
except asyncio.TimeoutError:
logger.debug("Dequeue asyncio timeout", extra={"event": "dequeue_async_timeout", "queue_name": self.queue_name})
return None
except Exception as e:
logger.error(
"Error dequeuing sync operation",
exc_info=True,
extra={"queue_name": self.queue_name, "error": str(e)}
)
return None
async def size(self) -> int:
"""
Get current queue size.
Returns:
Number of operations in queue (0 if Redis unavailable)
"""
if not self.redis_client:
return 0
try:
size = await self.redis_client.llen(self.queue_name)
return size
except Exception as e:
logger.error(
"Error getting queue size",
exc_info=True,
extra={"queue_name": self.queue_name, "error": str(e)}
)
return 0
async def clear(self) -> bool:
"""
Clear all operations from queue.
Returns:
True if cleared successfully
"""
try:
await self.redis_client.delete(self.queue_name)
logger.info(
"Cleared queue",
extra={"event": "queue_cleared", "queue_name": self.queue_name}
)
return True
except Exception as e:
logger.error(
"Error clearing queue",
exc_info=True,
extra={"queue_name": self.queue_name}
)
return False
async def get_stats(self) -> Dict[str, Any]:
"""
Get queue statistics.
Returns:
Dictionary with queue stats
"""
try:
stats = await self.redis_client.hgetall(self.stats_key)
current_size = await self.size()
return {
"queue_name": self.queue_name,
"current_size": current_size,
"max_size": self.max_size,
"total_enqueued": int(stats.get("total_enqueued", 0)),
"total_dequeued": int(stats.get("total_dequeued", 0)),
"total_processed": int(stats.get("total_processed", 0)),
"total_failed": int(stats.get("total_failed", 0))
}
except Exception as e:
logger.error(
"Error getting queue stats",
exc_info=True,
extra={"queue_name": self.queue_name}
)
return {
"queue_name": self.queue_name,
"error": str(e)
}
async def record_processed(self) -> None:
"""Record successful processing of an operation."""
try:
await self.redis_client.hincrby(self.stats_key, "total_processed", 1)
except Exception as e:
logger.error("Error recording processed stat", exc_info=True, extra={"event": "queue_stat_record_failure", "stat": "total_processed", "error": str(e)})
async def record_failed(self) -> None:
"""Record failed processing of an operation."""
try:
await self.redis_client.hincrby(self.stats_key, "total_failed", 1)
except Exception as e:
logger.error("Error recording failed stat", exc_info=True, extra={"event": "queue_stat_record_failure", "stat": "total_failed", "error": str(e)})
async def peek(self, count: int = 10) -> list:
"""
Peek at operations in queue without removing them.
Args:
count: Number of operations to peek at
Returns:
List of operations (as dicts)
"""
try:
# Get operations from right (oldest first)
items = await self.redis_client.lrange(self.queue_name, -count, -1)
return [json.loads(item) for item in items]
except Exception as e:
logger.error(
"Error peeking at queue",
exc_info=True,
extra={"queue_name": self.queue_name}
)
return []