File size: 1,067 Bytes
1f1b4db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
from typing import Optional
from aioredis import Redis
from api.chat.chat_api import ChatAPI
from .base import ChatQueueBase
class RedisQueue(ChatQueueBase):
def __init__(self, redis: Redis):
self.redis = redis
self.queue_key = "chat_queue"
async def add(self, api_key: str) -> None:
# 检查 api_key 是否已经在队列中
exists = await self.redis.lpos(self.queue_key, api_key)
if exists is None: # 如果不存在,则添加
await self.redis.rpush(self.queue_key, api_key)
async def get(self) -> Optional[ChatAPI]:
api_key = await self.redis.lpop(self.queue_key)
if api_key:
# Handle both bytes and str cases
if isinstance(api_key, bytes):
api_key = api_key.decode()
chat = ChatAPI(api_key=api_key)
# 重新加入队列
await self.add(api_key)
return chat
return None
async def length(self) -> int:
return await self.redis.llen(self.queue_key) |