const express = require('express'); const fetch = require('node-fetch'); const cors = require('cors'); const rateLimit = require('express-rate-limit'); const helmet = require('helmet'); require('dotenv').config(); const app = express(); // --- PRODUCTION MODE --- const PRODUCTION_MODE = process.env.PRODUCTION_MODE === 'true'; function log(message, level = 'info') { if (PRODUCTION_MODE && level === 'debug') return; console.log(message); } function logSensitive(message) { if (!PRODUCTION_MODE) console.log(message); } // Mask sensitive data function maskIP(ip) { if (PRODUCTION_MODE) { const parts = ip.split('.'); return parts.length === 4 ? `${parts[0]}.${parts[1]}.***.**` : 'masked'; } return ip; } function maskOrigin(origin) { if (PRODUCTION_MODE && origin && origin !== 'no-origin') { try { const url = new URL(origin); return `${url.protocol}//${url.hostname.substring(0, 3)}***`; } catch { return 'masked'; } } return origin; } app.use(helmet({ contentSecurityPolicy: false, crossOriginEmbedderPolicy: false })); app.set('trust proxy', 1); app.use(express.json({ limit: '1mb' })); // --- API KEY AUTHENTICATION --- const API_KEYS = process.env.API_KEYS ? process.env.API_KEYS.split(',').map(k => k.trim()) : []; function authenticateRequest(req) { const origin = req.headers.origin; const apiKey = req.headers['x-api-key']; if (origin) { if (allowedOrigins.length === 0) return { valid: true, source: 'open-mode' }; return { valid: allowedOrigins.includes(origin), source: PRODUCTION_MODE ? 'authorized-origin' : origin }; } if (apiKey) { if (API_KEYS.length === 0) return { valid: true, source: 'no-keys-configured' }; return { valid: API_KEYS.includes(apiKey), source: 'api-key' }; } return { valid: false, source: 'unauthorized' }; } // --- CORS --- const allowedOrigins = process.env.ALLOWED_ORIGINS ? process.env.ALLOWED_ORIGINS.split(',').map(o => o.trim()) : []; app.use(cors({ origin: function (origin, callback) { if (!origin) { return callback(null, true); } if (allowedOrigins.length === 0) return callback(null, true); if (allowedOrigins.includes(origin)) { callback(null, true); } else { log(`[Security] Blocked origin: ${maskOrigin(origin)}`); callback(new Error('Not allowed by CORS')); } }, credentials: true })); app.use((err, req, res, next) => { if (err.message === 'Not allowed by CORS') { return res.status(403).json({ error: 'Access denied' }); } next(err); }); // --- API KEY AUTHENTICATION MIDDLEWARE --- app.use((req, res, next) => { if (req.path === '/' || req.path === '/health' || req.path.startsWith('/test-')) { return next(); } const auth = authenticateRequest(req); if (!auth.valid) { log(`[Security] Blocked unauthorized request from ${maskIP(req.ip)}`); return res.status(401).json({ error: 'Unauthorized', message: 'Valid origin or API key required' }); } log(`[Auth] Request authorized from: ${auth.source}`); next(); }); // --- RATE LIMITING --- const limiter = rateLimit({ windowMs: 15 * 60 * 1000, max: 100, message: { error: "Too many requests" }, keyGenerator: (req) => { const ip = req.ip || req.connection.remoteAddress || 'unknown'; return ip.replace(/:\d+[^:]*$/, ''); } }); app.use(limiter); // --- REQUEST LOGGING (SAFE) --- app.use((req, res, next) => { const ip = maskIP((req.ip || 'unknown').replace(/:\d+[^:]*$/, '')); const origin = maskOrigin(req.headers.origin || 'no-origin'); const apiKey = req.headers['x-api-key'] ? '***' : 'none'; const path = PRODUCTION_MODE ? req.path.split('/').slice(0, 4).join('/') + '/***' : req.path; log(`[${new Date().toISOString()}] ${ip} -> ${req.method} ${path} | Origin: ${origin} | Key: ${apiKey}`); next(); }); // --- DAILY USAGE CAPS --- const dailyUsage = new Map(); let lastResetDate = new Date().toDateString(); function checkDailyReset() { const today = new Date().toDateString(); if (today !== lastResetDate) { dailyUsage.clear(); lastResetDate = today; log('[System] Daily usage counters reset'); } } setInterval(checkDailyReset, 60 * 60 * 1000); app.use((req, res, next) => { if (req.method === 'POST' && req.path.includes('/prediction/')) { checkDailyReset(); const ip = (req.ip || 'unknown').replace(/:\d+[^:]*$/, ''); const count = dailyUsage.get(ip) || 0; if (count >= 200) { return res.status(429).json({ error: 'Daily limit reached', message: 'You have reached your daily usage limit. Try again tomorrow.' }); } dailyUsage.set(ip, count + 1); if (dailyUsage.size > 10000) { log('[System] Daily usage map too large, clearing oldest entries', 'debug'); const entries = Array.from(dailyUsage.entries()).slice(0, 1000); entries.forEach(([key]) => dailyUsage.delete(key)); } } next(); }); // --- BOT DETECTION --- app.use((req, res, next) => { if (req.method !== 'POST') { return next(); } const userAgent = (req.headers['user-agent'] || '').toLowerCase(); const suspiciousBots = ['python-requests', 'curl/', 'wget/', 'scrapy', 'crawler']; const hasValidApiKey = req.headers['x-api-key'] && API_KEYS.includes(req.headers['x-api-key']); if (hasValidApiKey) { return next(); } const isBot = suspiciousBots.some(bot => userAgent.includes(bot)); if (isBot) { log(`[Security] Blocked bot from ${maskIP(req.ip)}`); return res.status(403).json({ error: 'Automated access detected', message: 'This service is for web browsers only.' }); } next(); }); // --- INSTANCES CONFIGURATION --- let INSTANCES = []; try { INSTANCES = JSON.parse(process.env.FLOWISE_INSTANCES || '[]'); log(`[System] Loaded ${INSTANCES.length} instances`); if (!Array.isArray(INSTANCES) || INSTANCES.length === 0) { log('ERROR: FLOWISE_INSTANCES must be a non-empty array'); } } catch (e) { log("CRITICAL ERROR: Could not parse FLOWISE_INSTANCES JSON"); } // --- CACHE WITH AUTO-CLEANUP --- const flowCache = new Map(); setInterval(() => { const now = Date.now(); for (const [key, value] of flowCache.entries()) { if (value.timestamp && now - value.timestamp > 10 * 60 * 1000) { flowCache.delete(key); } } }, 10 * 60 * 1000); // --- FETCH WITH TIMEOUT --- async function fetchWithTimeout(url, options, timeout = 10000) { return Promise.race([ fetch(url, options), new Promise((_, reject) => setTimeout(() => reject(new Error('Request timeout')), timeout) ) ]); } // --- RESOLVE CHATFLOW ID --- async function resolveChatflowId(instanceNum, botName) { const cacheKey = `${instanceNum}-${botName}`; const cached = flowCache.get(cacheKey); if (cached && cached.timestamp && Date.now() - cached.timestamp < 5 * 60 * 1000) { return { id: cached.id, instance: cached.instance }; } if (isNaN(instanceNum) || instanceNum < 1 || instanceNum > INSTANCES.length) { throw new Error(`Instance ${instanceNum} does not exist. Valid: 1-${INSTANCES.length}`); } const instance = INSTANCES[instanceNum - 1]; logSensitive(`[System] Looking up '${botName}' in instance ${instanceNum}...`); const headers = {}; if (instance.key && instance.key.length > 0) { headers['Authorization'] = `Bearer ${instance.key}`; } const response = await fetchWithTimeout(`${instance.url}/api/v1/chatflows`, { headers }, 10000); if (!response.ok) { throw new Error(`Instance ${instanceNum} returned status ${response.status}`); } const flows = await response.json(); if (!Array.isArray(flows)) { throw new Error(`Instance ${instanceNum} returned invalid response`); } const match = flows.find(f => f.name && f.name.toLowerCase().replace(/\s+/g, '-') === botName); if (!match || !match.id) { throw new Error(`Bot '${botName}' not found in instance ${instanceNum}`); } flowCache.set(cacheKey, { id: match.id, instance: instance, timestamp: Date.now() }); logSensitive(`[System] Found '${botName}' -> ${match.id}`); return { id: match.id, instance }; } // --- STREAMING HANDLER --- async function handleStreamingResponse(flowiseResponse, clientRes) { clientRes.setHeader('Content-Type', 'text/event-stream'); clientRes.setHeader('Cache-Control', 'no-cache'); clientRes.setHeader('Connection', 'keep-alive'); clientRes.setHeader('X-Accel-Buffering', 'no'); log('[Streaming] Forwarding SSE stream...'); let streamStarted = false; let dataReceived = false; let lastDataTime = Date.now(); let totalBytes = 0; const timeoutCheck = setInterval(() => { const timeSinceData = Date.now() - lastDataTime; if (timeSinceData > 45000) { log(`[Streaming] Timeout - no data for ${(timeSinceData/1000).toFixed(1)}s`); clearInterval(timeoutCheck); if (!dataReceived) { log('[Streaming] Stream completed with NO data received!'); if (!streamStarted) { clientRes.status(504).json({ error: 'Gateway timeout', message: 'No response from chatbot within 45 seconds' }); } else { clientRes.write('\n\nevent: error\ndata: {"error": "Response timeout - no data received"}\n\n'); } } clientRes.end(); } }, 5000); flowiseResponse.body.on('data', (chunk) => { clearTimeout(timeoutCheck); streamStarted = true; dataReceived = true; lastDataTime = Date.now(); totalBytes += chunk.length; logSensitive(`[Streaming] Received chunk: ${chunk.length} bytes (total: ${totalBytes})`); clientRes.write(chunk); }); flowiseResponse.body.on('end', () => { clearInterval(timeoutCheck); if (dataReceived) { log(`[Streaming] Stream completed - ${totalBytes} bytes`); } else { log('[Streaming] Stream completed but NO data received!'); } clientRes.end(); }); flowiseResponse.body.on('error', (err) => { clearInterval(timeoutCheck); log('[Streaming Error]'); if (streamStarted && dataReceived) { clientRes.write(`\n\nevent: error\ndata: {"error": "Stream interrupted"}\n\n`); } else if (!streamStarted) { clientRes.status(500).json({ error: 'Stream failed to start' }); } clientRes.end(); }); } // --- PREDICTION ROUTE --- app.post('/api/v1/prediction/:instanceNum/:botName', async (req, res) => { try { const instanceNum = parseInt(req.params.instanceNum); const botName = req.params.botName.toLowerCase().substring(0, 100); if (!req.body.question || typeof req.body.question !== 'string') { return res.status(400).json({ error: 'Invalid request', message: 'Question must be a non-empty string.' }); } if (req.body.question.length > 2000) { return res.status(400).json({ error: 'Message too long', message: 'Please keep messages under 2000 characters.' }); } const { id, instance } = await resolveChatflowId(instanceNum, botName); const headers = { 'Content-Type': 'application/json' }; if (instance.key && instance.key.length > 0) { headers['Authorization'] = `Bearer ${instance.key}`; } const startTime = Date.now(); logSensitive(`[Timing] Calling Flowise at ${new Date().toISOString()}`); const response = await fetchWithTimeout( `${instance.url}/api/v1/prediction/${id}`, { method: 'POST', headers, body: JSON.stringify(req.body) }, 60000 ); const duration = Date.now() - startTime; log(`[Timing] Response received in ${(duration/1000).toFixed(1)}s`); if (!response.ok) { const errorText = await response.text(); logSensitive(`[Error] Instance returned ${response.status}: ${errorText.substring(0, 100)}`); return res.status(response.status).json({ error: 'Flowise instance error', message: 'The chatbot instance returned an error.' }); } const contentType = response.headers.get('content-type') || ''; if (contentType.includes('text/event-stream')) { log('[Streaming] Detected SSE response'); return handleStreamingResponse(response, res); } log('[Non-streaming] Parsing JSON response'); const text = await response.text(); try { const data = JSON.parse(text); res.status(200).json(data); } catch (e) { log('[Error] Invalid JSON response'); res.status(500).json({ error: 'Invalid response from Flowise' }); } } catch (error) { log(`[Error] ${error.message}`); res.status(500).json({ error: 'Request failed', message: error.message }); } }); // --- CONFIG ROUTE --- app.get('/api/v1/public-chatbotConfig/:instanceNum/:botName', async (req, res) => { try { const instanceNum = parseInt(req.params.instanceNum); const botName = req.params.botName.toLowerCase().substring(0, 100); const { id, instance } = await resolveChatflowId(instanceNum, botName); const headers = {}; if (instance.key && instance.key.length > 0) { headers['Authorization'] = `Bearer ${instance.key}`; } const response = await fetchWithTimeout( `${instance.url}/api/v1/public-chatbotConfig/${id}`, { headers }, 10000 ); if (!response.ok) { return res.status(response.status).json({ error: 'Config not available' }); } const data = await response.json(); res.status(200).json(data); } catch (error) { log('[Error] Config request failed'); res.status(404).json({ error: error.message }); } }); // --- STREAMING CHECK ROUTE --- app.get('/api/v1/chatflows-streaming/:instanceNum/:botName', async (req, res) => { try { const instanceNum = parseInt(req.params.instanceNum); const botName = req.params.botName.toLowerCase().substring(0, 100); const { id, instance } = await resolveChatflowId(instanceNum, botName); const headers = {}; if (instance.key && instance.key.length > 0) { headers['Authorization'] = `Bearer ${instance.key}`; } const response = await fetchWithTimeout( `${instance.url}/api/v1/chatflows-streaming/${id}`, { headers }, 10000 ); if (!response.ok) { return res.status(200).json({ isStreaming: false }); } const data = await response.json(); res.status(200).json(data); } catch (error) { log('[Error] Streaming check failed', 'debug'); res.status(200).json({ isStreaming: false }); } }); // --- TEST ENDPOINTS (DISABLED IN PRODUCTION) --- if (!PRODUCTION_MODE) { app.get('/test-stream', (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); let count = 0; const interval = setInterval(() => { count++; res.write(`data: {"message": "Test ${count}"}\n\n`); if (count >= 5) { clearInterval(interval); res.end(); } }, 500); }); } // --- HEALTH CHECK --- app.get('/', (req, res) => res.send('Federated Proxy Active')); app.get('/health', (req, res) => { res.json({ status: 'healthy', instances: INSTANCES.length, cached_bots: flowCache.size, daily_active_ips: dailyUsage.size, uptime: process.uptime(), production_mode: PRODUCTION_MODE }); }); // --- 404 HANDLER --- app.use((req, res) => { res.status(404).json({ error: 'Route not found' }); }); // --- GLOBAL ERROR HANDLER --- app.use((err, req, res, next) => { log('[Error] Unhandled error'); res.status(500).json({ error: 'Internal server error' }); }); // --- SERVER START --- const server = app.listen(7860, '0.0.0.0', () => { log('===== Federated Proxy Started ====='); log(`Port: 7860`); log(`Mode: ${PRODUCTION_MODE ? 'PRODUCTION' : 'DEVELOPMENT'}`); log(`Instances: ${INSTANCES.length}`); log(`Allowed Origins: ${allowedOrigins.length || 'Open'}`); log(`API Keys: ${API_KEYS.length || 'None'}`); log('===================================='); }); process.on('SIGTERM', () => { log('[System] Shutting down gracefully...'); server.close(() => { log('[System] Server closed'); process.exit(0); }); }); process.on('SIGINT', () => { log('[System] Shutting down gracefully...'); server.close(() => { log('[System] Server closed'); process.exit(0); }); });