Todo_App / services /sse_service.py
Abdullahcoder54's picture
push
6a3de9e
# phase3/backend/services/sse_service.py
from asyncio import Queue
from typing import Dict
# In-memory store for SSE queues.
# The key will be a user identifier (e.g., user_id as a string)
# The value will be the asyncio.Queue for that user.
# In a multi-worker setup, this would need to be replaced with
# a more robust solution like Redis Pub/Sub.
sse_connections: Dict[str, Queue] = {}
async def notify_clients(user_id: str, message: str):
"""
Sends a message to a specific user's SSE queue if they are connected.
"""
user_id_str = str(user_id) # Ensure user_id is a string
if user_id_str in sse_connections:
await sse_connections[user_id_str].put(message)
def get_sse_queue(user_id: str) -> Queue:
"""
Retrieves the SSE queue for a user, creating it if it doesn't exist.
"""
user_id_str = str(user_id)
if user_id_str not in sse_connections:
sse_connections[user_id_str] = Queue()
return sse_connections[user_id_str]
def remove_sse_queue(user_id: str):
"""
Removes the SSE queue for a user when they disconnect.
"""
user_id_str = str(user_id)
if user_id_str in sse_connections:
del sse_connections[user_id_str]