WebRAG / src /queue_manager.py
Arun21102003
Initial clean commit
97f9138
import redis
import json
from typing import Optional, Dict
from src.config import config
class QueueManager:
def __init__(self, host: str = None, port: int = None, db: int = None):
self.host = host or config.REDIS_HOST
self.port = port or config.REDIS_PORT
self.db = db or config.REDIS_DB
self.queue_name = config.QUEUE_NAME
self.client = None
def connect(self):
if self.client is None:
if config.REDIS_URL:
self.client = redis.Redis.from_url(
config.REDIS_URL,
decode_responses=True
)
else:
self.client = redis.Redis(
host=self.host,
port=self.port,
db=self.db,
decode_responses=True
)
def enqueue(self, job_data: Dict):
self.connect()
job_json = json.dumps(job_data)
self.client.rpush(self.queue_name, job_json)
def dequeue(self, timeout: int = 0) -> Optional[Dict]:
self.connect()
result = self.client.blpop(self.queue_name, timeout=timeout)
if result:
_, job_json = result
return json.loads(job_json)
return None
def get_queue_length(self) -> int:
self.connect()
return self.client.llen(self.queue_name)
def ping(self) -> bool:
try:
self.connect()
return self.client.ping()
except:
return False
queue_manager = QueueManager()