Spaces:
Sleeping
Sleeping
File size: 1,594 Bytes
97f9138 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | import redis
import json
from typing import Optional, Dict
from src.config import config
class QueueManager:
def __init__(self, host: str = None, port: int = None, db: int = None):
self.host = host or config.REDIS_HOST
self.port = port or config.REDIS_PORT
self.db = db or config.REDIS_DB
self.queue_name = config.QUEUE_NAME
self.client = None
def connect(self):
if self.client is None:
if config.REDIS_URL:
self.client = redis.Redis.from_url(
config.REDIS_URL,
decode_responses=True
)
else:
self.client = redis.Redis(
host=self.host,
port=self.port,
db=self.db,
decode_responses=True
)
def enqueue(self, job_data: Dict):
self.connect()
job_json = json.dumps(job_data)
self.client.rpush(self.queue_name, job_json)
def dequeue(self, timeout: int = 0) -> Optional[Dict]:
self.connect()
result = self.client.blpop(self.queue_name, timeout=timeout)
if result:
_, job_json = result
return json.loads(job_json)
return None
def get_queue_length(self) -> int:
self.connect()
return self.client.llen(self.queue_name)
def ping(self) -> bool:
try:
self.connect()
return self.client.ping()
except:
return False
queue_manager = QueueManager()
|