File size: 1,206 Bytes
6a3de9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# phase3/backend/services/sse_service.py

from asyncio import Queue
from typing import Dict

# In-memory store for SSE queues.
# The key will be a user identifier (e.g., user_id as a string)
# The value will be the asyncio.Queue for that user.
# In a multi-worker setup, this would need to be replaced with
# a more robust solution like Redis Pub/Sub.
sse_connections: Dict[str, Queue] = {}

async def notify_clients(user_id: str, message: str):
    """
    Sends a message to a specific user's SSE queue if they are connected.
    """
    user_id_str = str(user_id) # Ensure user_id is a string
    if user_id_str in sse_connections:
        await sse_connections[user_id_str].put(message)

def get_sse_queue(user_id: str) -> Queue:
    """
    Retrieves the SSE queue for a user, creating it if it doesn't exist.
    """
    user_id_str = str(user_id)
    if user_id_str not in sse_connections:
        sse_connections[user_id_str] = Queue()
    return sse_connections[user_id_str]

def remove_sse_queue(user_id: str):
    """
    Removes the SSE queue for a user when they disconnect.
    """
    user_id_str = str(user_id)
    if user_id_str in sse_connections:
        del sse_connections[user_id_str]