File size: 7,085 Bytes
dc3879e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""WebSocket ConnectionManager for multi-client broadcasting.

[Task]: T068
[From]: specs/004-ai-chatbot/tasks.md

This module provides connection management for WebSocket connections,
supporting multiple concurrent connections per user (e.g., multiple browser tabs).
"""
import asyncio
import logging
from typing import Dict, List

from fastapi import WebSocket

logger = logging.getLogger("websockets.manager")


class ConnectionManager:
    """Manages WebSocket connections for broadcasting progress events.

    [From]: specs/004-ai-chatbot/research.md - Section 4

    This manager:
    - Tracks multiple WebSocket connections per user_id
    - Supports broadcasting to all connections for a user
    - Handles connection lifecycle (connect, disconnect, cleanup)
    - Provides graceful handling of connection errors

    Attributes:
        active_connections: Mapping of user_id to list of WebSocket connections
    """

    def __init__(self) -> None:
        """Initialize the connection manager with empty connection tracking."""
        # user_id -> list of WebSocket connections
        # Supports multiple tabs per user
        self.active_connections: Dict[str, List[WebSocket]] = {}

    async def connect(self, user_id: str, websocket: WebSocket) -> None:
        """Accept and register a new WebSocket connection.

        [From]: specs/004-ai-chatbot/research.md - Section 4

        Args:
            user_id: The user's unique identifier (UUID string)
            websocket: The WebSocket connection to register

        Sends a connection_established event to the client upon successful connection.
        """
        await websocket.accept()

        # Initialize connection list for new users
        if user_id not in self.active_connections:
            self.active_connections[user_id] = []

        # Add this connection to the user's list
        self.active_connections[user_id].append(websocket)

        logger.info(f"WebSocket connected for user {user_id}. "
                    f"Total connections for user: {len(self.active_connections[user_id])}")

        # Send confirmation event to client
        await self.send_personal({
            "event_type": "connection_established",
            "message": "Connected to real-time updates",
            "user_id": user_id,
        }, websocket)

    def disconnect(self, user_id: str, websocket: WebSocket) -> None:
        """Remove a WebSocket connection from tracking.

        [From]: specs/004-ai-chatbot/research.md - Section 4

        Args:
            user_id: The user's unique identifier
            websocket: The WebSocket connection to remove

        Cleans up empty connection lists to prevent memory leaks.
        """
        if user_id in self.active_connections:
            try:
                self.active_connections[user_id].remove(websocket)
                logger.info(f"WebSocket disconnected for user {user_id}. "
                            f"Remaining connections: {len(self.active_connections[user_id])}")

                # Clean up empty connection lists
                if not self.active_connections[user_id]:
                    del self.active_connections[user_id]
                    logger.debug(f"Removed empty connection list for user {user_id}")
            except ValueError:
                # Connection already removed
                logger.warning(f"Attempted to remove non-existent connection for user {user_id}")

    async def send_personal(self, message: dict, websocket: WebSocket) -> None:
        """Send a message to a specific WebSocket connection.

        [From]: specs/004-ai-chatbot/research.md - Section 4

        Args:
            message: The message dictionary to send (will be JSON serialized)
            websocket: The target WebSocket connection
        """
        try:
            await websocket.send_json(message)
        except Exception as e:
            logger.error(f"Failed to send message to WebSocket: {e}")
            # Don't raise - connection may be closing

    async def broadcast(self, user_id: str, message: dict) -> None:
        """Send a message to all active connections for a user.

        [From]: specs/004-ai-chatbot/research.md - Section 4

        This is the primary method for sending progress events to all
        browser tabs/devices a user has open.

        Args:
            user_id: The user's unique identifier
            message: The message dictionary to broadcast (will be JSON serialized)

        Handles connection errors gracefully - if a connection fails,
        it's removed but other connections continue to receive messages.
        """
        if user_id not in self.active_connections:
            logger.debug(f"No active connections for user {user_id}")
            return

        # Track failed connections for cleanup
        failed_connections: List[WebSocket] = []

        for connection in self.active_connections[user_id]:
            try:
                await connection.send_json(message)
            except Exception as e:
                logger.warning(f"Failed to send to connection for user {user_id}: {e}")
                failed_connections.append(connection)

        # Clean up failed connections
        for failed in failed_connections:
            self.disconnect(user_id, failed)

    def get_connection_count(self, user_id: str) -> int:
        """Get the number of active connections for a user.

        [From]: specs/004-ai-chatbot/research.md - Section 4

        Args:
            user_id: The user's unique identifier

        Returns:
            The number of active WebSocket connections for this user
        """
        return len(self.active_connections.get(user_id, []))

    async def broadcast_to_all(self, message: dict) -> None:
        """Broadcast a message to all connected users.

        [From]: specs/004-ai-chatbot/research.md - Section 4

        This is useful for system-wide announcements or maintenance notices.

        Args:
            message: The message dictionary to broadcast
        """
        for user_id in list(self.active_connections.keys()):
            await self.broadcast(user_id, message)

    async def close_all_connections(self) -> None:
        """Close all active WebSocket connections.

        Useful for server shutdown or maintenance.
        """
        for user_id, connections in list(self.active_connections.items()):
            for connection in connections:
                try:
                    await connection.close()
                except Exception:
                    pass  # Connection may already be closed
        self.active_connections.clear()
        logger.info("All WebSocket connections closed")


# Global singleton instance
# [From]: specs/004-ai-chatbot/research.md - Section 4
# Import this instance in other modules to use the connection manager
manager = ConnectionManager()


async def get_manager() -> ConnectionManager:
    """Dependency injection helper for FastAPI routes.

    Returns:
        The global ConnectionManager instance
    """
    return manager