File size: 1,761 Bytes
774fe36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
import time


class GenerationQueue:
    def __init__(self):
        self.queue = asyncio.Queue()
        self.active_task = None
        self.position_counter = 0
        self.clients = {}          # client_id -> websocket
        self.lock = asyncio.Lock()
        self.last_seeds = {}       # client_id -> last seed
        self._processor = None

    async def add_task(self, client_id: str, task_type: str, params: dict):
        async with self.lock:
            self.position_counter += 1
            task = {
                "id": self.position_counter,
                "client_id": client_id,
                "type": task_type,
                "params": params,
                "timestamp": time.time(),
            }
            await self.queue.put(task)
            return self.position_counter, self.queue.qsize()

    async def notify_client(self, client_id: str, message: dict):
        if client_id in self.clients:
            try:
                await self.clients[client_id].send_json(message)
            except Exception:
                pass

    def get_last_seed(self, client_id: str) -> int:
        return self.last_seeds.get(client_id, -1)

    def start(self, handler):
        """
        Start the queue processor.
        *handler* is an async callable(task, queue) that processes one task.
        """
        self._processor = asyncio.create_task(self._loop(handler))

    async def _loop(self, handler):
        while True:
            task = await self.queue.get()
            self.active_task = task
            try:
                await handler(task, self)
            except Exception:
                pass
            finally:
                self.active_task = None
                self.queue.task_done()