| from typing import Optional | |
| from managers.queue.base import ChatQueueBase | |
| from managers.queue.deque_queue import DequeQueue | |
| from api.chat.chat_api import ChatAPI | |
| class ChatManager: | |
| def __init__(self, queue_type: str = "deque", **kwargs): | |
| self.queue: ChatQueueBase = self._create_queue(queue_type, **kwargs) | |
| def _create_queue(self, queue_type: str, **kwargs) -> ChatQueueBase: | |
| queue_types = { | |
| "deque": DequeQueue, | |
| } | |
| if queue_type not in queue_types: | |
| raise ValueError(f"Unsupported queue type: {queue_type}") | |
| return queue_types[queue_type](**kwargs) | |
| async def add_chat(self, api_key: str): | |
| """添加一个新的chat实例到队列""" | |
| await self.queue.add(api_key) | |
| async def get_chat(self) -> Optional[ChatAPI]: | |
| """获取队列中的下一个chat实例""" | |
| return await self.queue.get() | |
| async def length(self) -> int: | |
| """获取队列长度""" | |
| return await self.queue.length() | |