Spaces:
Sleeping
Sleeping
| import redis | |
| import json | |
| from typing import Optional, Dict | |
| from src.config import config | |
| class QueueManager: | |
| def __init__(self, host: str = None, port: int = None, db: int = None): | |
| self.host = host or config.REDIS_HOST | |
| self.port = port or config.REDIS_PORT | |
| self.db = db or config.REDIS_DB | |
| self.queue_name = config.QUEUE_NAME | |
| self.client = None | |
| def connect(self): | |
| if self.client is None: | |
| if config.REDIS_URL: | |
| self.client = redis.Redis.from_url( | |
| config.REDIS_URL, | |
| decode_responses=True | |
| ) | |
| else: | |
| self.client = redis.Redis( | |
| host=self.host, | |
| port=self.port, | |
| db=self.db, | |
| decode_responses=True | |
| ) | |
| def enqueue(self, job_data: Dict): | |
| self.connect() | |
| job_json = json.dumps(job_data) | |
| self.client.rpush(self.queue_name, job_json) | |
| def dequeue(self, timeout: int = 0) -> Optional[Dict]: | |
| self.connect() | |
| result = self.client.blpop(self.queue_name, timeout=timeout) | |
| if result: | |
| _, job_json = result | |
| return json.loads(job_json) | |
| return None | |
| def get_queue_length(self) -> int: | |
| self.connect() | |
| return self.client.llen(self.queue_name) | |
| def ping(self) -> bool: | |
| try: | |
| self.connect() | |
| return self.client.ping() | |
| except: | |
| return False | |
| queue_manager = QueueManager() | |