Spaces:
Running
Running
| import asyncio | |
| import time | |
| class GenerationQueue: | |
| def __init__(self): | |
| self.queue = asyncio.Queue() | |
| self.active_task = None | |
| self.position_counter = 0 | |
| self.clients = {} # client_id -> websocket | |
| self.lock = asyncio.Lock() | |
| self.last_seeds = {} # client_id -> last seed | |
| self._processor = None | |
| async def add_task(self, client_id: str, task_type: str, params: dict): | |
| async with self.lock: | |
| self.position_counter += 1 | |
| task = { | |
| "id": self.position_counter, | |
| "client_id": client_id, | |
| "type": task_type, | |
| "params": params, | |
| "timestamp": time.time(), | |
| } | |
| await self.queue.put(task) | |
| return self.position_counter, self.queue.qsize() | |
| async def notify_client(self, client_id: str, message: dict): | |
| if client_id in self.clients: | |
| try: | |
| await self.clients[client_id].send_json(message) | |
| except Exception: | |
| pass | |
| def get_last_seed(self, client_id: str) -> int: | |
| return self.last_seeds.get(client_id, -1) | |
| def start(self, handler): | |
| """ | |
| Start the queue processor. | |
| *handler* is an async callable(task, queue) that processes one task. | |
| """ | |
| self._processor = asyncio.create_task(self._loop(handler)) | |
| async def _loop(self, handler): | |
| while True: | |
| task = await self.queue.get() | |
| self.active_task = task | |
| try: | |
| await handler(task, self) | |
| except Exception: | |
| pass | |
| finally: | |
| self.active_task = None | |
| self.queue.task_done() |