Spaces:
Sleeping
Sleeping
| # phase3/backend/services/sse_service.py | |
| from asyncio import Queue | |
| from typing import Dict | |
| # In-memory store for SSE queues. | |
| # The key will be a user identifier (e.g., user_id as a string) | |
| # The value will be the asyncio.Queue for that user. | |
| # In a multi-worker setup, this would need to be replaced with | |
| # a more robust solution like Redis Pub/Sub. | |
| sse_connections: Dict[str, Queue] = {} | |
| async def notify_clients(user_id: str, message: str): | |
| """ | |
| Sends a message to a specific user's SSE queue if they are connected. | |
| """ | |
| user_id_str = str(user_id) # Ensure user_id is a string | |
| if user_id_str in sse_connections: | |
| await sse_connections[user_id_str].put(message) | |
| def get_sse_queue(user_id: str) -> Queue: | |
| """ | |
| Retrieves the SSE queue for a user, creating it if it doesn't exist. | |
| """ | |
| user_id_str = str(user_id) | |
| if user_id_str not in sse_connections: | |
| sse_connections[user_id_str] = Queue() | |
| return sse_connections[user_id_str] | |
| def remove_sse_queue(user_id: str): | |
| """ | |
| Removes the SSE queue for a user when they disconnect. | |
| """ | |
| user_id_str = str(user_id) | |
| if user_id_str in sse_connections: | |
| del sse_connections[user_id_str] |