unknownfriend00007 commited on
Commit
3b3a317
·
verified ·
1 Parent(s): dec253d

Update server.js

Browse files
Files changed (1) hide show
  1. server.js +284 -54
server.js CHANGED
@@ -2,13 +2,21 @@ const express = require('express');
2
  const fetch = require('node-fetch');
3
  const cors = require('cors');
4
  const rateLimit = require('express-rate-limit');
 
5
  require('dotenv').config();
6
 
7
  const app = express();
8
 
 
 
 
 
 
 
9
  app.set('trust proxy', 1);
10
  app.use(express.json({ limit: '1mb' }));
11
 
 
12
  const allowedOrigins = process.env.ALLOWED_ORIGINS
13
  ? process.env.ALLOWED_ORIGINS.split(',').map(o => o.trim())
14
  : [];
@@ -19,9 +27,11 @@ app.use(cors({
19
  if (allowedOrigins.includes(origin)) {
20
  callback(null, true);
21
  } else {
 
22
  callback(new Error('Not allowed by CORS'));
23
  }
24
- }
 
25
  }));
26
 
27
  app.use((err, req, res, next) => {
@@ -31,165 +41,385 @@ app.use((err, req, res, next) => {
31
  next(err);
32
  });
33
 
 
34
  const limiter = rateLimit({
35
  windowMs: 15 * 60 * 1000,
36
  max: 100,
37
- message: { error: "Too many requests" }
 
 
 
 
38
  });
39
  app.use(limiter);
40
 
 
41
  app.use((req, res, next) => {
42
  const ip = (req.ip || 'unknown').replace(/:\d+[^:]*$/, '');
43
- console.log(`[${new Date().toISOString()}] ${ip} -> ${req.method} ${req.path} from ${req.headers.origin || 'unknown'}`);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
  next();
45
  });
46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  let INSTANCES = [];
48
  try {
49
  INSTANCES = JSON.parse(process.env.FLOWISE_INSTANCES || '[]');
50
  console.log(`[System] Loaded ${INSTANCES.length} instances`);
 
 
 
51
  } catch (e) {
52
- console.error("ERROR parsing FLOWISE_INSTANCES:", e);
53
  }
54
 
 
55
  const flowCache = new Map();
56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  async function resolveChatflowId(instanceNum, botName) {
58
  const cacheKey = `${instanceNum}-${botName}`;
 
59
  const cached = flowCache.get(cacheKey);
60
- if (cached) return cached;
 
 
61
 
62
- if (instanceNum < 1 || instanceNum > INSTANCES.length) {
63
- throw new Error(`Invalid instance: ${instanceNum}`);
64
  }
65
 
66
  const instance = INSTANCES[instanceNum - 1];
67
  console.log(`[System] Looking up '${botName}' in instance ${instanceNum}...`);
68
 
69
  const headers = {};
70
- if (instance.key) headers['Authorization'] = `Bearer ${instance.key}`;
 
 
 
 
71
 
72
- const response = await fetch(`${instance.url}/api/v1/chatflows`, { headers });
73
  if (!response.ok) {
74
- throw new Error(`Instance ${instanceNum} returned status ${response.status}. Instance may be paused.`);
75
  }
76
 
77
  const flows = await response.json();
78
- const match = flows.find(f => f.name.toLowerCase().replace(/\s+/g, '-') === botName);
79
 
80
- if (!match) throw new Error(`Bot '${botName}' not found in instance ${instanceNum}`);
 
 
81
 
82
- const result = { id: match.id, instance };
83
- flowCache.set(cacheKey, result);
 
 
 
 
 
 
 
 
 
84
 
85
  console.log(`[System] Found '${botName}' -> ${match.id}`);
86
- return result;
 
87
  }
88
 
89
- // IMPROVED: Safe JSON parsing
90
- async function safeJsonParse(response, url) {
91
- const text = await response.text();
 
 
92
 
93
- try {
94
- return JSON.parse(text);
95
- } catch (e) {
96
- console.error(`[Error] Non-JSON response from ${url}`);
97
- console.error(`[Error] Response starts with: ${text.substring(0, 200)}`);
 
 
 
 
 
 
 
 
 
 
 
98
 
99
- // Return a user-friendly error
100
- throw new Error('The Flowise instance returned an invalid response. It may be paused or experiencing errors.');
101
- }
 
 
 
102
  }
103
 
 
104
  app.post('/api/v1/prediction/:instanceNum/:botName', async (req, res) => {
105
  try {
106
  const instanceNum = parseInt(req.params.instanceNum);
107
- const botName = req.params.botName.toLowerCase();
108
 
109
- if (req.body.question && req.body.question.length > 2000) {
110
- return res.status(400).json({ error: 'Message too long (max 2000 characters)' });
 
 
 
 
 
 
 
 
 
 
111
  }
112
 
113
  const { id, instance } = await resolveChatflowId(instanceNum, botName);
114
 
115
  const headers = { 'Content-Type': 'application/json' };
116
- if (instance.key) headers['Authorization'] = `Bearer ${instance.key}`;
 
 
117
 
118
- const response = await fetch(`${instance.url}/api/v1/prediction/${id}`, {
119
- method: 'POST',
120
- headers,
121
- body: JSON.stringify(req.body)
122
- });
 
 
 
 
123
 
124
- // Check if response is OK before parsing
125
  if (!response.ok) {
126
  const errorText = await response.text();
127
  console.error(`[Error] Instance returned ${response.status}: ${errorText.substring(0, 200)}`);
128
- return res.status(response.status).json({
129
  error: 'Flowise instance error',
130
- message: 'The chatbot instance may be paused or misconfigured. Please check the Flowise instance logs.'
131
  });
132
  }
133
 
134
- const data = await safeJsonParse(response, instance.url);
135
- res.status(200).json(data);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
 
137
  } catch (error) {
138
  console.error('[Error]', error.message);
139
- res.status(500).json({
140
  error: 'Request failed',
141
- message: error.message
142
  });
143
  }
144
  });
145
 
 
146
  app.get('/api/v1/public-chatbotConfig/:instanceNum/:botName', async (req, res) => {
147
  try {
148
  const instanceNum = parseInt(req.params.instanceNum);
149
- const botName = req.params.botName.toLowerCase();
 
150
  const { id, instance } = await resolveChatflowId(instanceNum, botName);
151
 
152
  const headers = {};
153
- if (instance.key) headers['Authorization'] = `Bearer ${instance.key}`;
 
 
154
 
155
- const response = await fetch(`${instance.url}/api/v1/public-chatbotConfig/${id}`, { headers });
 
 
 
 
156
 
157
  if (!response.ok) {
158
  return res.status(response.status).json({ error: 'Config not available' });
159
  }
160
 
161
- const data = await safeJsonParse(response, instance.url);
162
  res.status(200).json(data);
 
163
  } catch (error) {
164
  console.error('[Error Config]', error.message);
165
  res.status(404).json({ error: error.message });
166
  }
167
  });
168
 
 
169
  app.get('/api/v1/chatflows-streaming/:instanceNum/:botName', async (req, res) => {
170
  try {
171
  const instanceNum = parseInt(req.params.instanceNum);
172
- const botName = req.params.botName.toLowerCase();
 
173
  const { id, instance } = await resolveChatflowId(instanceNum, botName);
174
 
175
  const headers = {};
176
- if (instance.key) headers['Authorization'] = `Bearer ${instance.key}`;
 
 
177
 
178
- const response = await fetch(`${instance.url}/api/v1/chatflows-streaming/${id}`, { headers });
 
 
 
 
179
 
180
  if (!response.ok) {
181
- return res.status(response.status).json({ isStreaming: false });
182
  }
183
 
184
- const data = await safeJsonParse(response, instance.url);
185
  res.status(200).json(data);
 
186
  } catch (error) {
187
  console.error('[Error Streaming]', error.message);
188
- res.status(200).json({ isStreaming: false }); // Default to non-streaming
189
  }
190
  });
191
 
 
192
  app.get('/', (req, res) => res.send('Federated Proxy Active'));
193
- app.get('/health', (req, res) => res.json({ status: 'ok', instances: INSTANCES.length }));
194
 
195
- app.listen(7860, '0.0.0.0', () => console.log('Federated Proxy running on port 7860'));
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  const fetch = require('node-fetch');
3
  const cors = require('cors');
4
  const rateLimit = require('express-rate-limit');
5
+ const helmet = require('helmet');
6
  require('dotenv').config();
7
 
8
  const app = express();
9
 
10
+ // --- SECURITY HEADERS ---
11
+ app.use(helmet({
12
+ contentSecurityPolicy: false,
13
+ crossOriginEmbedderPolicy: false
14
+ }));
15
+
16
  app.set('trust proxy', 1);
17
  app.use(express.json({ limit: '1mb' }));
18
 
19
+ // --- CORS PROTECTION ---
20
  const allowedOrigins = process.env.ALLOWED_ORIGINS
21
  ? process.env.ALLOWED_ORIGINS.split(',').map(o => o.trim())
22
  : [];
 
27
  if (allowedOrigins.includes(origin)) {
28
  callback(null, true);
29
  } else {
30
+ console.log(`[Security] Blocked origin: ${origin}`);
31
  callback(new Error('Not allowed by CORS'));
32
  }
33
+ },
34
+ credentials: true
35
  }));
36
 
37
  app.use((err, req, res, next) => {
 
41
  next(err);
42
  });
43
 
44
+ // --- RATE LIMITING (15 minutes) ---
45
  const limiter = rateLimit({
46
  windowMs: 15 * 60 * 1000,
47
  max: 100,
48
+ message: { error: "Too many requests" },
49
+ keyGenerator: (req) => {
50
+ const ip = req.ip || req.connection.remoteAddress || 'unknown';
51
+ return ip.replace(/:\d+[^:]*$/, '');
52
+ }
53
  });
54
  app.use(limiter);
55
 
56
+ // --- REQUEST LOGGING ---
57
  app.use((req, res, next) => {
58
  const ip = (req.ip || 'unknown').replace(/:\d+[^:]*$/, '');
59
+ const origin = (req.headers.origin || 'unknown').substring(0, 100);
60
+ console.log(`[${new Date().toISOString()}] ${ip} -> ${req.method} ${req.path} from ${origin}`);
61
+ next();
62
+ });
63
+
64
+ // --- DAILY USAGE CAPS ---
65
+ const dailyUsage = new Map();
66
+ let lastResetDate = new Date().toDateString();
67
+
68
+ function checkDailyReset() {
69
+ const today = new Date().toDateString();
70
+ if (today !== lastResetDate) {
71
+ dailyUsage.clear();
72
+ lastResetDate = today;
73
+ console.log('[System] Daily usage counters reset');
74
+ }
75
+ }
76
+
77
+ setInterval(checkDailyReset, 60 * 60 * 1000);
78
+
79
+ app.use((req, res, next) => {
80
+ if (req.method === 'POST' && req.path.includes('/prediction/')) {
81
+ checkDailyReset();
82
+
83
+ const ip = (req.ip || 'unknown').replace(/:\d+[^:]*$/, '');
84
+ const count = dailyUsage.get(ip) || 0;
85
+
86
+ if (count >= 200) {
87
+ return res.status(429).json({
88
+ error: 'Daily limit reached',
89
+ message: 'You have reached your daily usage limit. Try again tomorrow.'
90
+ });
91
+ }
92
+
93
+ dailyUsage.set(ip, count + 1);
94
+
95
+ // FIXED: Prevent unbounded growth
96
+ if (dailyUsage.size > 10000) {
97
+ console.warn('[System] Daily usage map too large, clearing oldest entries');
98
+ const entries = Array.from(dailyUsage.entries()).slice(0, 1000);
99
+ entries.forEach(([key]) => dailyUsage.delete(key));
100
+ }
101
+ }
102
  next();
103
  });
104
 
105
+ // --- BOT DETECTION ---
106
+ app.use((req, res, next) => {
107
+ // FIXED: Check all POST requests, not just /prediction/
108
+ if (req.method !== 'POST') {
109
+ return next();
110
+ }
111
+
112
+ const userAgent = (req.headers['user-agent'] || '').toLowerCase();
113
+ const suspiciousBots = ['python-requests', 'curl/', 'wget/', 'scrapy', 'crawler'];
114
+
115
+ const isBot = suspiciousBots.some(bot => userAgent.includes(bot));
116
+
117
+ if (isBot) {
118
+ console.log(`[Security] Blocked bot: ${userAgent.substring(0, 50)} from ${req.ip}`);
119
+ return res.status(403).json({
120
+ error: 'Automated access detected',
121
+ message: 'This service is for web browsers only.'
122
+ });
123
+ }
124
+ next();
125
+ });
126
+
127
+ // --- INSTANCES CONFIGURATION ---
128
  let INSTANCES = [];
129
  try {
130
  INSTANCES = JSON.parse(process.env.FLOWISE_INSTANCES || '[]');
131
  console.log(`[System] Loaded ${INSTANCES.length} instances`);
132
+ if (!Array.isArray(INSTANCES) || INSTANCES.length === 0) {
133
+ console.error('ERROR: FLOWISE_INSTANCES must be a non-empty array');
134
+ }
135
  } catch (e) {
136
+ console.error("CRITICAL ERROR: Could not parse FLOWISE_INSTANCES JSON", e);
137
  }
138
 
139
+ // --- CACHE WITH AUTO-CLEANUP ---
140
  const flowCache = new Map();
141
 
142
+ setInterval(() => {
143
+ const now = Date.now();
144
+ for (const [key, value] of flowCache.entries()) {
145
+ // FIXED: Safety check for timestamp
146
+ if (value.timestamp && now - value.timestamp > 10 * 60 * 1000) {
147
+ flowCache.delete(key);
148
+ }
149
+ }
150
+ }, 10 * 60 * 1000);
151
+
152
+ // --- FETCH WITH TIMEOUT ---
153
+ async function fetchWithTimeout(url, options, timeout = 10000) {
154
+ return Promise.race([
155
+ fetch(url, options),
156
+ new Promise((_, reject) =>
157
+ setTimeout(() => reject(new Error('Request timeout')), timeout)
158
+ )
159
+ ]);
160
+ }
161
+
162
+ // --- RESOLVE CHATFLOW ID ---
163
  async function resolveChatflowId(instanceNum, botName) {
164
  const cacheKey = `${instanceNum}-${botName}`;
165
+
166
  const cached = flowCache.get(cacheKey);
167
+ if (cached && cached.timestamp && Date.now() - cached.timestamp < 5 * 60 * 1000) {
168
+ return { id: cached.id, instance: cached.instance };
169
+ }
170
 
171
+ if (isNaN(instanceNum) || instanceNum < 1 || instanceNum > INSTANCES.length) {
172
+ throw new Error(`Instance ${instanceNum} does not exist. Valid: 1-${INSTANCES.length}`);
173
  }
174
 
175
  const instance = INSTANCES[instanceNum - 1];
176
  console.log(`[System] Looking up '${botName}' in instance ${instanceNum}...`);
177
 
178
  const headers = {};
179
+ if (instance.key && instance.key.length > 0) {
180
+ headers['Authorization'] = `Bearer ${instance.key}`;
181
+ }
182
+
183
+ const response = await fetchWithTimeout(`${instance.url}/api/v1/chatflows`, { headers }, 10000);
184
 
 
185
  if (!response.ok) {
186
+ throw new Error(`Instance ${instanceNum} returned status ${response.status}`);
187
  }
188
 
189
  const flows = await response.json();
 
190
 
191
+ if (!Array.isArray(flows)) {
192
+ throw new Error(`Instance ${instanceNum} returned invalid response`);
193
+ }
194
 
195
+ const match = flows.find(f => f.name && f.name.toLowerCase().replace(/\s+/g, '-') === botName);
196
+
197
+ if (!match || !match.id) {
198
+ throw new Error(`Bot '${botName}' not found in instance ${instanceNum}`);
199
+ }
200
+
201
+ flowCache.set(cacheKey, {
202
+ id: match.id,
203
+ instance: instance,
204
+ timestamp: Date.now()
205
+ });
206
 
207
  console.log(`[System] Found '${botName}' -> ${match.id}`);
208
+
209
+ return { id: match.id, instance };
210
  }
211
 
212
+ // --- STREAMING HANDLER ---
213
+ async function handleStreamingResponse(flowiseResponse, clientRes) {
214
+ clientRes.setHeader('Content-Type', 'text/event-stream');
215
+ clientRes.setHeader('Cache-Control', 'no-cache');
216
+ clientRes.setHeader('Connection', 'keep-alive');
217
 
218
+ console.log('[Streaming] Forwarding SSE stream...');
219
+
220
+ let streamStarted = false;
221
+
222
+ flowiseResponse.body.on('data', (chunk) => {
223
+ streamStarted = true;
224
+ clientRes.write(chunk);
225
+ });
226
+
227
+ flowiseResponse.body.on('end', () => {
228
+ console.log('[Streaming] Stream completed');
229
+ clientRes.end();
230
+ });
231
+
232
+ flowiseResponse.body.on('error', (err) => {
233
+ console.error('[Streaming Error]', err.message);
234
 
235
+ // FIXED: Send error event if stream already started
236
+ if (streamStarted) {
237
+ clientRes.write(`\n\nevent: error\ndata: {"error": "Stream interrupted"}\n\n`);
238
+ }
239
+ clientRes.end();
240
+ });
241
  }
242
 
243
+ // --- ROUTE 1: PREDICTION (WITH STREAMING SUPPORT) ---
244
  app.post('/api/v1/prediction/:instanceNum/:botName', async (req, res) => {
245
  try {
246
  const instanceNum = parseInt(req.params.instanceNum);
247
+ const botName = req.params.botName.toLowerCase().substring(0, 100);
248
 
249
+ if (!req.body.question || typeof req.body.question !== 'string') {
250
+ return res.status(400).json({
251
+ error: 'Invalid request',
252
+ message: 'Question must be a non-empty string.'
253
+ });
254
+ }
255
+
256
+ if (req.body.question.length > 2000) {
257
+ return res.status(400).json({
258
+ error: 'Message too long',
259
+ message: 'Please keep messages under 2000 characters.'
260
+ });
261
  }
262
 
263
  const { id, instance } = await resolveChatflowId(instanceNum, botName);
264
 
265
  const headers = { 'Content-Type': 'application/json' };
266
+ if (instance.key && instance.key.length > 0) {
267
+ headers['Authorization'] = `Bearer ${instance.key}`;
268
+ }
269
 
270
+ const response = await fetchWithTimeout(
271
+ `${instance.url}/api/v1/prediction/${id}`,
272
+ {
273
+ method: 'POST',
274
+ headers,
275
+ body: JSON.stringify(req.body)
276
+ },
277
+ 30000
278
+ );
279
 
 
280
  if (!response.ok) {
281
  const errorText = await response.text();
282
  console.error(`[Error] Instance returned ${response.status}: ${errorText.substring(0, 200)}`);
283
+ return res.status(response.status).json({
284
  error: 'Flowise instance error',
285
+ message: 'The chatbot instance returned an error.'
286
  });
287
  }
288
 
289
+ const contentType = response.headers.get('content-type') || '';
290
+
291
+ // STREAMING RESPONSE
292
+ if (contentType.includes('text/event-stream')) {
293
+ console.log('[Streaming] Detected SSE response');
294
+ return handleStreamingResponse(response, res);
295
+ }
296
+
297
+ // NON-STREAMING RESPONSE
298
+ console.log('[Non-streaming] Parsing JSON response');
299
+ const text = await response.text();
300
+
301
+ try {
302
+ const data = JSON.parse(text);
303
+ res.status(200).json(data);
304
+ } catch (e) {
305
+ console.error('[Error] Invalid JSON:', text.substring(0, 200));
306
+ res.status(500).json({ error: 'Invalid response from Flowise' });
307
+ }
308
 
309
  } catch (error) {
310
  console.error('[Error]', error.message);
311
+ res.status(500).json({
312
  error: 'Request failed',
313
+ message: error.message
314
  });
315
  }
316
  });
317
 
318
+ // --- ROUTE 2: CHATBOT CONFIG ---
319
  app.get('/api/v1/public-chatbotConfig/:instanceNum/:botName', async (req, res) => {
320
  try {
321
  const instanceNum = parseInt(req.params.instanceNum);
322
+ const botName = req.params.botName.toLowerCase().substring(0, 100);
323
+
324
  const { id, instance } = await resolveChatflowId(instanceNum, botName);
325
 
326
  const headers = {};
327
+ if (instance.key && instance.key.length > 0) {
328
+ headers['Authorization'] = `Bearer ${instance.key}`;
329
+ }
330
 
331
+ const response = await fetchWithTimeout(
332
+ `${instance.url}/api/v1/public-chatbotConfig/${id}`,
333
+ { headers },
334
+ 10000
335
+ );
336
 
337
  if (!response.ok) {
338
  return res.status(response.status).json({ error: 'Config not available' });
339
  }
340
 
341
+ const data = await response.json();
342
  res.status(200).json(data);
343
+
344
  } catch (error) {
345
  console.error('[Error Config]', error.message);
346
  res.status(404).json({ error: error.message });
347
  }
348
  });
349
 
350
+ // --- ROUTE 3: STREAMING CHECK ---
351
  app.get('/api/v1/chatflows-streaming/:instanceNum/:botName', async (req, res) => {
352
  try {
353
  const instanceNum = parseInt(req.params.instanceNum);
354
+ const botName = req.params.botName.toLowerCase().substring(0, 100);
355
+
356
  const { id, instance } = await resolveChatflowId(instanceNum, botName);
357
 
358
  const headers = {};
359
+ if (instance.key && instance.key.length > 0) {
360
+ headers['Authorization'] = `Bearer ${instance.key}`;
361
+ }
362
 
363
+ const response = await fetchWithTimeout(
364
+ `${instance.url}/api/v1/chatflows-streaming/${id}`,
365
+ { headers },
366
+ 10000
367
+ );
368
 
369
  if (!response.ok) {
370
+ return res.status(200).json({ isStreaming: false });
371
  }
372
 
373
+ const data = await response.json();
374
  res.status(200).json(data);
375
+
376
  } catch (error) {
377
  console.error('[Error Streaming]', error.message);
378
+ res.status(200).json({ isStreaming: false });
379
  }
380
  });
381
 
382
+ // --- HEALTH CHECK ---
383
  app.get('/', (req, res) => res.send('Federated Proxy Active'));
 
384
 
385
+ app.get('/health', (req, res) => {
386
+ res.json({
387
+ status: 'healthy',
388
+ instances: INSTANCES.length,
389
+ cached_bots: flowCache.size,
390
+ daily_active_ips: dailyUsage.size,
391
+ uptime: process.uptime()
392
+ });
393
+ });
394
+
395
+ // --- 404 HANDLER ---
396
+ app.use((req, res) => {
397
+ res.status(404).json({ error: 'Route not found' });
398
+ });
399
+
400
+ // --- GLOBAL ERROR HANDLER ---
401
+ app.use((err, req, res, next) => {
402
+ console.error('[Error] Unhandled error:', err);
403
+ res.status(500).json({ error: 'Internal server error' });
404
+ });
405
+
406
+ // --- GRACEFUL SHUTDOWN ---
407
+ const server = app.listen(7860, '0.0.0.0', () => {
408
+ console.log('Federated Proxy running on port 7860');
409
+ });
410
+
411
+ process.on('SIGTERM', () => {
412
+ console.log('[System] SIGTERM received, shutting down gracefully...');
413
+ server.close(() => {
414
+ console.log('[System] Server closed');
415
+ process.exit(0);
416
+ });
417
+ });
418
+
419
+ process.on('SIGINT', () => {
420
+ console.log('[System] SIGINT received, shutting down gracefully...');
421
+ server.close(() => {
422
+ console.log('[System] Server closed');
423
+ process.exit(0);
424
+ });
425
+ });