File size: 5,789 Bytes
d4abe4b 3e12ae4 d4abe4b 3e12ae4 d4abe4b 3e12ae4 d4abe4b 3e12ae4 d4abe4b 3e12ae4 d4abe4b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
/**
* WebSocket Connection Manager
* Manages session-based WebSocket connections for ReAct flow streaming
*/
import { WebSocket } from 'ws';
import { logger } from '../utils/logger.js';
import type { WebSocketMessage } from '../types/websocket.js';
export class WebSocketConnectionManager {
// Map of session_id -> WebSocket connection
private connections: Map<string, WebSocket> = new Map();
// Map of session_id -> last activity timestamp
private lastActivity: Map<string, number> = new Map();
// Map of session_id -> message count (for rate limiting)
private messageCount: Map<string, number> = new Map();
// Cleanup interval (check every 1 minute)
private cleanupInterval: NodeJS.Timeout;
// Configuration
private readonly INACTIVITY_TIMEOUT = 30 * 60 * 1000; // 30 minutes
private readonly RATE_LIMIT = 100; // messages per minute
// Rate limit window can be used for future rate limiting implementation
// private readonly RATE_LIMIT_WINDOW = 60 * 1000; // 1 minute
constructor() {
// Start cleanup task
this.cleanupInterval = setInterval(() => {
this.cleanupInactiveConnections();
}, 60 * 1000); // Run every minute
logger.info('WebSocketConnectionManager initialized');
}
/**
* Add a new connection for a session
*/
addConnection(sessionId: string, ws: WebSocket): void {
// Close existing connection if any
if (this.connections.has(sessionId)) {
logger.warn(`Replacing existing connection for session: ${sessionId}`);
this.removeConnection(sessionId);
}
this.connections.set(sessionId, ws);
this.lastActivity.set(sessionId, Date.now());
this.messageCount.set(sessionId, 0);
logger.info(`WebSocket connected for session: ${sessionId}`);
// Setup connection handlers
ws.on('close', () => {
this.removeConnection(sessionId);
});
ws.on('error', (error) => {
logger.error({ error, sessionId }, 'WebSocket error');
this.removeConnection(sessionId);
});
}
/**
* Remove a connection
*/
removeConnection(sessionId: string): void {
const ws = this.connections.get(sessionId);
if (ws) {
try {
if (ws.readyState === WebSocket.OPEN) {
ws.close();
}
} catch (error) {
logger.error({ error, sessionId }, 'Error closing WebSocket');
}
}
this.connections.delete(sessionId);
this.lastActivity.delete(sessionId);
this.messageCount.delete(sessionId);
logger.info(`WebSocket disconnected for session: ${sessionId}`);
}
/**
* Send a message to a specific session
*/
async sendToSession(sessionId: string, message: WebSocketMessage): Promise<boolean> {
const ws = this.connections.get(sessionId);
if (!ws || ws.readyState !== WebSocket.OPEN) {
logger.warn(`Cannot send message to session ${sessionId}: connection not open`);
return false;
}
// Check rate limit
if (!this.checkRateLimit(sessionId)) {
logger.warn(`Rate limit exceeded for session: ${sessionId}`);
this.sendError(sessionId, 'RATE_LIMIT', 'Rate limit exceeded (100 messages/minute)');
return false;
}
try {
// Add timestamp if not present
if (!message.timestamp) {
message.timestamp = new Date().toISOString();
}
ws.send(JSON.stringify(message));
// Update activity
this.lastActivity.set(sessionId, Date.now());
// Increment message count
const count = this.messageCount.get(sessionId) || 0;
this.messageCount.set(sessionId, count + 1);
logger.debug({ sessionId, messageType: message.type }, 'Message sent to session');
return true;
} catch (error) {
logger.error({ error, sessionId }, 'Error sending message');
return false;
}
}
/**
* Send error message to a session
*/
sendError(sessionId: string, code: string, message: string): void {
this.sendToSession(sessionId, {
type: 'error',
code,
message,
timestamp: new Date().toISOString(),
});
}
/**
* Check if session has active connection
*/
hasConnection(sessionId: string): boolean {
const ws = this.connections.get(sessionId);
return ws !== undefined && ws.readyState === WebSocket.OPEN;
}
/**
* Get number of active connections
*/
getConnectionCount(): number {
return this.connections.size;
}
/**
* Check rate limit for a session
*/
private checkRateLimit(sessionId: string): boolean {
const count = this.messageCount.get(sessionId) || 0;
return count < this.RATE_LIMIT;
}
/**
* Reset rate limit counters (called every minute)
*/
private resetRateLimits(): void {
this.messageCount.clear();
}
/**
* Cleanup inactive connections
*/
private cleanupInactiveConnections(): void {
const now = Date.now();
const inactiveSessions: string[] = [];
for (const [sessionId, lastTime] of this.lastActivity.entries()) {
if (now - lastTime > this.INACTIVITY_TIMEOUT) {
inactiveSessions.push(sessionId);
}
}
if (inactiveSessions.length > 0) {
logger.info(`Cleaning up ${inactiveSessions.length} inactive connections`);
for (const sessionId of inactiveSessions) {
this.removeConnection(sessionId);
}
}
// Also reset rate limits every cleanup cycle
this.resetRateLimits();
}
/**
* Cleanup on shutdown
*/
destroy(): void {
clearInterval(this.cleanupInterval);
// Close all connections
for (const sessionId of this.connections.keys()) {
this.removeConnection(sessionId);
}
logger.info('WebSocketConnectionManager destroyed');
}
}
// Singleton instance
export const wsConnectionManager = new WebSocketConnectionManager();
|