import asyncio import time class GenerationQueue: def __init__(self): self.queue = asyncio.Queue() self.active_task = None self.position_counter = 0 self.clients = {} # client_id -> websocket self.lock = asyncio.Lock() self.last_seeds = {} # client_id -> last seed self._processor = None async def add_task(self, client_id: str, task_type: str, params: dict): async with self.lock: self.position_counter += 1 task = { "id": self.position_counter, "client_id": client_id, "type": task_type, "params": params, "timestamp": time.time(), } await self.queue.put(task) return self.position_counter, self.queue.qsize() async def notify_client(self, client_id: str, message: dict): if client_id in self.clients: try: await self.clients[client_id].send_json(message) except Exception: pass def get_last_seed(self, client_id: str) -> int: return self.last_seeds.get(client_id, -1) def start(self, handler): """ Start the queue processor. *handler* is an async callable(task, queue) that processes one task. """ self._processor = asyncio.create_task(self._loop(handler)) async def _loop(self, handler): while True: task = await self.queue.get() self.active_task = task try: await handler(task, self) except Exception: pass finally: self.active_task = None self.queue.task_done()