test / managers /queue /redis_queue.py
gaoqilan's picture
Upload 103 files
1f1b4db verified
from typing import Optional
from aioredis import Redis
from api.chat.chat_api import ChatAPI
from .base import ChatQueueBase
class RedisQueue(ChatQueueBase):
def __init__(self, redis: Redis):
self.redis = redis
self.queue_key = "chat_queue"
async def add(self, api_key: str) -> None:
# 检查 api_key 是否已经在队列中
exists = await self.redis.lpos(self.queue_key, api_key)
if exists is None: # 如果不存在,则添加
await self.redis.rpush(self.queue_key, api_key)
async def get(self) -> Optional[ChatAPI]:
api_key = await self.redis.lpop(self.queue_key)
if api_key:
# Handle both bytes and str cases
if isinstance(api_key, bytes):
api_key = api_key.decode()
chat = ChatAPI(api_key=api_key)
# 重新加入队列
await self.add(api_key)
return chat
return None
async def length(self) -> int:
return await self.redis.llen(self.queue_key)