Spaces:
Sleeping
Sleeping
File size: 1,206 Bytes
6a3de9e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | # phase3/backend/services/sse_service.py
from asyncio import Queue
from typing import Dict
# In-memory store for SSE queues.
# The key will be a user identifier (e.g., user_id as a string)
# The value will be the asyncio.Queue for that user.
# In a multi-worker setup, this would need to be replaced with
# a more robust solution like Redis Pub/Sub.
sse_connections: Dict[str, Queue] = {}
async def notify_clients(user_id: str, message: str):
"""
Sends a message to a specific user's SSE queue if they are connected.
"""
user_id_str = str(user_id) # Ensure user_id is a string
if user_id_str in sse_connections:
await sse_connections[user_id_str].put(message)
def get_sse_queue(user_id: str) -> Queue:
"""
Retrieves the SSE queue for a user, creating it if it doesn't exist.
"""
user_id_str = str(user_id)
if user_id_str not in sse_connections:
sse_connections[user_id_str] = Queue()
return sse_connections[user_id_str]
def remove_sse_queue(user_id: str):
"""
Removes the SSE queue for a user when they disconnect.
"""
user_id_str = str(user_id)
if user_id_str in sse_connections:
del sse_connections[user_id_str] |