File size: 1,952 Bytes
1905805
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import asyncio
from typing import List, AsyncGenerator

'''ObserverClient asynchronously handles events in queue populated by an ObserverServer'''
class BaseObserverClient():
    def __init__(self, server = None):
        self.server = None
        if server:
            self.listen(server)
            
        self.queue = asyncio.Queue()
        self.event_listener = None

    def listen(self, server):
        if self.server:
            self.close()

        self.server = server
        self.server.join(self)
        
        self.event_listener = asyncio.create_task(self._event_listener())

    def close(self):
        self.server.detach(self)
        self.server = None
        
    async def _event_listener(self):
        while True:
            next_event = await self.queue.get()
            await self.handle_event(next_event['event'], next_event['payload'])
            
    # To Be Implement
    async def handle_event(self, event_id: str, payload) -> None:
        raise NotImplementedError

'''ObserverServer adds events and payloads to all listening client queues.'''
class ObserverServer():
    def __init__(self):
        self.clients: List[ObserverClient] = []

    def join(self, client):
        if client not in self.clients:
            self.clients.append(client)

    def detach(self, client):
        if client in self.clients:
            self.clients.remove(client)

    async def broadcast_event(self, event_id: str, payload: dict = {}):
        for client in self.clients:
            await client.queue.put({
                "event": event_id,
                "payload": payload
            })
            
    async def broadcast_stream(self, event_id: str, payload_stream: AsyncGenerator):
        async for payload in payload_stream:
            for client in self.clients:
                await client.queue.put({
                    "event": event_id,
                    "payload": payload
                })