client / server /queue.py
P01yH3dr0n's picture
launch
774fe36
Raw
History Blame Contribute Delete
1.76 kB
import asyncio
import time
class GenerationQueue:
def __init__(self):
self.queue = asyncio.Queue()
self.active_task = None
self.position_counter = 0
self.clients = {} # client_id -> websocket
self.lock = asyncio.Lock()
self.last_seeds = {} # client_id -> last seed
self._processor = None
async def add_task(self, client_id: str, task_type: str, params: dict):
async with self.lock:
self.position_counter += 1
task = {
"id": self.position_counter,
"client_id": client_id,
"type": task_type,
"params": params,
"timestamp": time.time(),
}
await self.queue.put(task)
return self.position_counter, self.queue.qsize()
async def notify_client(self, client_id: str, message: dict):
if client_id in self.clients:
try:
await self.clients[client_id].send_json(message)
except Exception:
pass
def get_last_seed(self, client_id: str) -> int:
return self.last_seeds.get(client_id, -1)
def start(self, handler):
"""
Start the queue processor.
*handler* is an async callable(task, queue) that processes one task.
"""
self._processor = asyncio.create_task(self._loop(handler))
async def _loop(self, handler):
while True:
task = await self.queue.get()
self.active_task = task
try:
await handler(task, self)
except Exception:
pass
finally:
self.active_task = None
self.queue.task_done()