import redis import json from typing import Optional, Dict from src.config import config class QueueManager: def __init__(self, host: str = None, port: int = None, db: int = None): self.host = host or config.REDIS_HOST self.port = port or config.REDIS_PORT self.db = db or config.REDIS_DB self.queue_name = config.QUEUE_NAME self.client = None def connect(self): if self.client is None: if config.REDIS_URL: self.client = redis.Redis.from_url( config.REDIS_URL, decode_responses=True ) else: self.client = redis.Redis( host=self.host, port=self.port, db=self.db, decode_responses=True ) def enqueue(self, job_data: Dict): self.connect() job_json = json.dumps(job_data) self.client.rpush(self.queue_name, job_json) def dequeue(self, timeout: int = 0) -> Optional[Dict]: self.connect() result = self.client.blpop(self.queue_name, timeout=timeout) if result: _, job_json = result return json.loads(job_json) return None def get_queue_length(self) -> int: self.connect() return self.client.llen(self.queue_name) def ping(self) -> bool: try: self.connect() return self.client.ping() except: return False queue_manager = QueueManager()