File size: 5,789 Bytes
d4abe4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e12ae4
 
d4abe4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e12ae4
d4abe4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e12ae4
d4abe4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e12ae4
d4abe4b
 
 
3e12ae4
d4abe4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
/**
 * WebSocket Connection Manager
 * Manages session-based WebSocket connections for ReAct flow streaming
 */

import { WebSocket } from 'ws';
import { logger } from '../utils/logger.js';
import type { WebSocketMessage } from '../types/websocket.js';

export class WebSocketConnectionManager {
  // Map of session_id -> WebSocket connection
  private connections: Map<string, WebSocket> = new Map();

  // Map of session_id -> last activity timestamp
  private lastActivity: Map<string, number> = new Map();

  // Map of session_id -> message count (for rate limiting)
  private messageCount: Map<string, number> = new Map();

  // Cleanup interval (check every 1 minute)
  private cleanupInterval: NodeJS.Timeout;

  // Configuration
  private readonly INACTIVITY_TIMEOUT = 30 * 60 * 1000; // 30 minutes
  private readonly RATE_LIMIT = 100; // messages per minute
  // Rate limit window can be used for future rate limiting implementation
  // private readonly RATE_LIMIT_WINDOW = 60 * 1000; // 1 minute

  constructor() {
    // Start cleanup task
    this.cleanupInterval = setInterval(() => {
      this.cleanupInactiveConnections();
    }, 60 * 1000); // Run every minute

    logger.info('WebSocketConnectionManager initialized');
  }

  /**
   * Add a new connection for a session
   */
  addConnection(sessionId: string, ws: WebSocket): void {
    // Close existing connection if any
    if (this.connections.has(sessionId)) {
      logger.warn(`Replacing existing connection for session: ${sessionId}`);
      this.removeConnection(sessionId);
    }

    this.connections.set(sessionId, ws);
    this.lastActivity.set(sessionId, Date.now());
    this.messageCount.set(sessionId, 0);

    logger.info(`WebSocket connected for session: ${sessionId}`);

    // Setup connection handlers
    ws.on('close', () => {
      this.removeConnection(sessionId);
    });

    ws.on('error', (error) => {
      logger.error({ error, sessionId }, 'WebSocket error');
      this.removeConnection(sessionId);
    });
  }

  /**
   * Remove a connection
   */
  removeConnection(sessionId: string): void {
    const ws = this.connections.get(sessionId);
    if (ws) {
      try {
        if (ws.readyState === WebSocket.OPEN) {
          ws.close();
        }
      } catch (error) {
        logger.error({ error, sessionId }, 'Error closing WebSocket');
      }
    }

    this.connections.delete(sessionId);
    this.lastActivity.delete(sessionId);
    this.messageCount.delete(sessionId);

    logger.info(`WebSocket disconnected for session: ${sessionId}`);
  }

  /**
   * Send a message to a specific session
   */
  async sendToSession(sessionId: string, message: WebSocketMessage): Promise<boolean> {
    const ws = this.connections.get(sessionId);

    if (!ws || ws.readyState !== WebSocket.OPEN) {
      logger.warn(`Cannot send message to session ${sessionId}: connection not open`);
      return false;
    }

    // Check rate limit
    if (!this.checkRateLimit(sessionId)) {
      logger.warn(`Rate limit exceeded for session: ${sessionId}`);
      this.sendError(sessionId, 'RATE_LIMIT', 'Rate limit exceeded (100 messages/minute)');
      return false;
    }

    try {
      // Add timestamp if not present
      if (!message.timestamp) {
        message.timestamp = new Date().toISOString();
      }

      ws.send(JSON.stringify(message));

      // Update activity
      this.lastActivity.set(sessionId, Date.now());

      // Increment message count
      const count = this.messageCount.get(sessionId) || 0;
      this.messageCount.set(sessionId, count + 1);

      logger.debug({ sessionId, messageType: message.type }, 'Message sent to session');

      return true;
    } catch (error) {
      logger.error({ error, sessionId }, 'Error sending message');
      return false;
    }
  }

  /**
   * Send error message to a session
   */
  sendError(sessionId: string, code: string, message: string): void {
    this.sendToSession(sessionId, {
      type: 'error',
      code,
      message,
      timestamp: new Date().toISOString(),
    });
  }

  /**
   * Check if session has active connection
   */
  hasConnection(sessionId: string): boolean {
    const ws = this.connections.get(sessionId);
    return ws !== undefined && ws.readyState === WebSocket.OPEN;
  }

  /**
   * Get number of active connections
   */
  getConnectionCount(): number {
    return this.connections.size;
  }

  /**
   * Check rate limit for a session
   */
  private checkRateLimit(sessionId: string): boolean {
    const count = this.messageCount.get(sessionId) || 0;
    return count < this.RATE_LIMIT;
  }

  /**
   * Reset rate limit counters (called every minute)
   */
  private resetRateLimits(): void {
    this.messageCount.clear();
  }

  /**
   * Cleanup inactive connections
   */
  private cleanupInactiveConnections(): void {
    const now = Date.now();
    const inactiveSessions: string[] = [];

    for (const [sessionId, lastTime] of this.lastActivity.entries()) {
      if (now - lastTime > this.INACTIVITY_TIMEOUT) {
        inactiveSessions.push(sessionId);
      }
    }

    if (inactiveSessions.length > 0) {
      logger.info(`Cleaning up ${inactiveSessions.length} inactive connections`);

      for (const sessionId of inactiveSessions) {
        this.removeConnection(sessionId);
      }
    }

    // Also reset rate limits every cleanup cycle
    this.resetRateLimits();
  }

  /**
   * Cleanup on shutdown
   */
  destroy(): void {
    clearInterval(this.cleanupInterval);

    // Close all connections
    for (const sessionId of this.connections.keys()) {
      this.removeConnection(sessionId);
    }

    logger.info('WebSocketConnectionManager destroyed');
  }
}

// Singleton instance
export const wsConnectionManager = new WebSocketConnectionManager();