test / managers /queue /deque_queue.py
gaoqilan's picture
Upload 103 files
1f1b4db verified
from collections import deque
from typing import Optional
from api.chat.chat_api import ChatAPI
from .base import ChatQueueBase
class DequeQueue(ChatQueueBase):
def __init__(self):
self.queue = deque()
async def add(self, api_key: str) -> None:
self.queue.append(ChatAPI(api_key=api_key))
async def get(self) -> Optional[ChatAPI]:
if not self.queue:
return None
chat = self.queue.popleft()
self.queue.append(chat)
return chat
async def length(self) -> int:
return len(self.queue)