GodSpeed / src /redis /queues.py
Samyuktha24's picture
Add Redis utilities for session management, caching, and task queues
58ace4b
"""Redis-backed task queues for document ingestion and webhook events."""
import json
from datetime import datetime
from typing import Optional
from enum import Enum
import redis
from pydantic import BaseModel
class Priority(str, Enum):
"""Task priority levels."""
CRITICAL = "critical" # Real-time (errors, alerts)
HIGH = "high" # Fast track (key decisions, breaking changes)
NORMAL = "normal" # Standard (routine data, logs)
LOW = "low" # Background (metrics, non-urgent)
class QueuedTask(BaseModel):
"""Task in a queue."""
id: str
source_type: str
payload: dict
rbac_tags: dict
priority: Priority
created_at: datetime
attempt: int = 0
max_retries: int = 3
class IngestQueue:
"""Queue for documents ready for ingestion pipeline."""
def __init__(self, redis_client: redis.Redis, key_prefix: str = "ingest"):
self.redis = redis_client
self.key_prefix = key_prefix
async def add(
self,
source_type: str,
payload: dict,
rbac_tags: dict,
priority: Priority = Priority.NORMAL,
) -> str:
"""
Add document to ingest queue.
Uses sorted set with priority as score for ordering:
- CRITICAL (0): real-time processing
- HIGH (1): priority batch
- NORMAL (2): standard batch
- LOW (3): low-priority batch
"""
task_id = f"{source_type}:{payload.get('uri', 'unknown')}:{datetime.utcnow().timestamp()}"
priority_scores = {
Priority.CRITICAL: 0,
Priority.HIGH: 1,
Priority.NORMAL: 2,
Priority.LOW: 3,
}
task = {
"id": task_id,
"source_type": source_type,
"payload": json.dumps(payload),
"rbac_tags": json.dumps(rbac_tags),
"priority": priority.value,
"created_at": datetime.utcnow().isoformat(),
"attempt": 0,
}
# Add to sorted set: score = priority (lower = higher priority)
self.redis.zadd(
f"{self.key_prefix}:queue",
{json.dumps(task): priority_scores[priority]}
)
return task_id
async def pop(self, batch_size: int = 10) -> list[QueuedTask]:
"""
Pop highest-priority tasks from queue.
Returns up to batch_size tasks, prioritizing CRITICAL > HIGH > NORMAL > LOW.
"""
key = f"{self.key_prefix}:queue"
tasks = []
# Pop from lowest score (highest priority)
for _ in range(batch_size):
result = self.redis.zrange(key, 0, 0) # Get first item
if not result:
break
task_json = result[0].decode('utf-8')
task_dict = json.loads(task_json)
# Remove from queue
self.redis.zrem(key, task_json)
task = QueuedTask(
id=task_dict["id"],
source_type=task_dict["source_type"],
payload=json.loads(task_dict["payload"]),
rbac_tags=json.loads(task_dict["rbac_tags"]),
priority=Priority(task_dict["priority"]),
created_at=datetime.fromisoformat(task_dict["created_at"]),
attempt=task_dict["attempt"],
)
tasks.append(task)
return tasks
async def requeue_on_failure(self, task: QueuedTask) -> bool:
"""
Re-queue task if it hasn't exceeded max retries.
Returns True if re-queued, False if max retries exceeded.
"""
if task.attempt >= task.max_retries:
# Send to deadletter queue
await self.send_to_deadletter(task, "max_retries_exceeded")
return False
task.attempt += 1
await self.add(
task.source_type,
task.payload,
task.rbac_tags,
task.priority,
)
return True
async def send_to_deadletter(self, task: QueuedTask, reason: str):
"""Send task to deadletter queue for manual inspection."""
dlq_key = f"{self.key_prefix}:deadletter"
dlq_item = {
"task": json.dumps(task.dict()),
"reason": reason,
"failed_at": datetime.utcnow().isoformat(),
}
self.redis.lpush(dlq_key, json.dumps(dlq_item))
# Keep deadletter for 30 days
self.redis.expire(dlq_key, 86400 * 30)
async def get_stats(self) -> dict:
"""Get queue statistics."""
key = f"{self.key_prefix}:queue"
dlq_key = f"{self.key_prefix}:deadletter"
queue_length = self.redis.zcard(key)
dlq_length = self.redis.llen(dlq_key)
return {
"queue_length": queue_length,
"deadletter_length": dlq_length,
"total": queue_length + dlq_length,
}
class WebhookQueue:
"""Queue for raw webhook events (before processing)."""
def __init__(self, redis_client: redis.Redis, key_prefix: str = "webhooks"):
self.redis = redis_client
self.key_prefix = key_prefix
async def add(
self,
event_type: str,
payload: dict,
priority: Priority = Priority.NORMAL,
) -> str:
"""Queue a raw webhook event for async processing."""
event_id = f"{event_type}:{datetime.utcnow().timestamp()}"
priority_scores = {
Priority.CRITICAL: 0,
Priority.HIGH: 1,
Priority.NORMAL: 2,
Priority.LOW: 3,
}
event = {
"id": event_id,
"event_type": event_type,
"payload": json.dumps(payload),
"priority": priority.value,
"received_at": datetime.utcnow().isoformat(),
}
# Add to sorted set by priority
self.redis.zadd(
f"{self.key_prefix}:pending",
{json.dumps(event): priority_scores[priority]}
)
return event_id
async def pop(self, batch_size: int = 5) -> list[dict]:
"""Pop highest-priority webhook events."""
key = f"{self.key_prefix}:pending"
events = []
for _ in range(batch_size):
result = self.redis.zrange(key, 0, 0)
if not result:
break
event_json = result[0].decode('utf-8')
event = json.loads(event_json)
# Remove from queue
self.redis.zrem(key, event_json)
events.append(event)
return events
def get_queue(queue_type: str, redis_client: redis.Redis) -> Optional[object]:
"""Factory function to get queue instance."""
queues = {
"ingest": IngestQueue,
"webhook": WebhookQueue,
}
queue_class = queues.get(queue_type)
if not queue_class:
return None
return queue_class(redis_client)