File size: 1,411 Bytes
e19b795
 
d51aaa4
c303abd
22a85f1
e19b795
 
 
 
 
 
 
 
 
 
 
 
22a85f1
e19b795
 
 
 
 
 
 
 
22a85f1
 
 
e19b795
22a85f1
e19b795
22a85f1
 
 
 
 
e19b795
22a85f1
 
e19b795
 
 
 
 
 
c303abd
 
e19b795
22a85f1
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from backend.api.routers.metrics import active_dashboards 
import asyncio
import traceback
import redis.asyncio as aioredis

router = APIRouter()

@router.websocket("/stream")
async def dashboard_websocket(websocket: WebSocket):
    """
    WebScoket sending updates to the dashboard. 

    url: ws://127.0.0.1:8000/dashboard/stream
    """
    state = websocket.app.state
    logger = state.logger
    redis = state.redis

    # Accept the client connection. 
    await websocket.accept()

    # Logging and tracking 
    active_dashboards.inc()
    logger.info("Dashboard Connected...")

    pubsub = redis.pubsub()
    await pubsub.subscribe("dashboard_stream")

    try:

        while True:
            message = await pubsub.get_message(ignore_subscribe_messages=True)

            if message:
                logger.debug("Sending updates to Dashboard...")
                await websocket.send_text(message["data"])

            await asyncio.sleep(0.01)  # giving time to detect server disconnection. 
                

    except WebSocketDisconnect:
        logger.warn("Dashboard Disconnected Normally...")

    except Exception as e:
        logger.error(f"Dashboard Error: {e}")
        traceback.print_exc()

    finally:
        active_dashboards.dec()
        await pubsub.unsubscribe("dashboard_stream")
        await pubsub.close()