from typing import Optional from aioredis import Redis from api.chat.chat_api import ChatAPI from .base import ChatQueueBase class RedisQueue(ChatQueueBase): def __init__(self, redis: Redis): self.redis = redis self.queue_key = "chat_queue" async def add(self, api_key: str) -> None: # 检查 api_key 是否已经在队列中 exists = await self.redis.lpos(self.queue_key, api_key) if exists is None: # 如果不存在,则添加 await self.redis.rpush(self.queue_key, api_key) async def get(self) -> Optional[ChatAPI]: api_key = await self.redis.lpop(self.queue_key) if api_key: # Handle both bytes and str cases if isinstance(api_key, bytes): api_key = api_key.decode() chat = ChatAPI(api_key=api_key) # 重新加入队列 await self.add(api_key) return chat return None async def length(self) -> int: return await self.redis.llen(self.queue_key)