|
|
from typing import Optional |
|
|
from aioredis import Redis |
|
|
from api.chat.chat_api import ChatAPI |
|
|
from .base import ChatQueueBase |
|
|
|
|
|
class RedisQueue(ChatQueueBase): |
|
|
def __init__(self, redis: Redis): |
|
|
self.redis = redis |
|
|
self.queue_key = "chat_queue" |
|
|
|
|
|
async def add(self, api_key: str) -> None: |
|
|
|
|
|
exists = await self.redis.lpos(self.queue_key, api_key) |
|
|
if exists is None: |
|
|
await self.redis.rpush(self.queue_key, api_key) |
|
|
|
|
|
async def get(self) -> Optional[ChatAPI]: |
|
|
api_key = await self.redis.lpop(self.queue_key) |
|
|
if api_key: |
|
|
|
|
|
if isinstance(api_key, bytes): |
|
|
api_key = api_key.decode() |
|
|
chat = ChatAPI(api_key=api_key) |
|
|
|
|
|
await self.add(api_key) |
|
|
return chat |
|
|
return None |
|
|
|
|
|
async def length(self) -> int: |
|
|
return await self.redis.llen(self.queue_key) |