|
|
const express = require('express'); |
|
|
const fetch = require('node-fetch'); |
|
|
const cors = require('cors'); |
|
|
const rateLimit = require('express-rate-limit'); |
|
|
const helmet = require('helmet'); |
|
|
require('dotenv').config(); |
|
|
|
|
|
const app = express(); |
|
|
|
|
|
|
|
|
const PRODUCTION_MODE = process.env.PRODUCTION_MODE === 'true'; |
|
|
|
|
|
function log(message, level = 'info') { |
|
|
if (PRODUCTION_MODE && level === 'debug') return; |
|
|
console.log(message); |
|
|
} |
|
|
|
|
|
function logSensitive(message) { |
|
|
if (!PRODUCTION_MODE) console.log(message); |
|
|
} |
|
|
|
|
|
|
|
|
function maskIP(ip) { |
|
|
if (PRODUCTION_MODE) { |
|
|
const parts = ip.split('.'); |
|
|
return parts.length === 4 ? `${parts[0]}.${parts[1]}.***.**` : 'masked'; |
|
|
} |
|
|
return ip; |
|
|
} |
|
|
|
|
|
function maskOrigin(origin) { |
|
|
if (PRODUCTION_MODE && origin && origin !== 'no-origin') { |
|
|
try { |
|
|
const url = new URL(origin); |
|
|
return `${url.protocol}//${url.hostname.substring(0, 3)}***`; |
|
|
} catch { |
|
|
return 'masked'; |
|
|
} |
|
|
} |
|
|
return origin; |
|
|
} |
|
|
|
|
|
app.use(helmet({ |
|
|
contentSecurityPolicy: false, |
|
|
crossOriginEmbedderPolicy: false |
|
|
})); |
|
|
|
|
|
app.set('trust proxy', 1); |
|
|
app.use(express.json({ limit: '1mb' })); |
|
|
|
|
|
|
|
|
const API_KEYS = process.env.API_KEYS |
|
|
? process.env.API_KEYS.split(',').map(k => k.trim()) |
|
|
: []; |
|
|
|
|
|
function authenticateRequest(req) { |
|
|
const origin = req.headers.origin; |
|
|
const apiKey = req.headers['x-api-key']; |
|
|
|
|
|
if (origin) { |
|
|
if (allowedOrigins.length === 0) return { valid: true, source: 'open-mode' }; |
|
|
return { |
|
|
valid: allowedOrigins.includes(origin), |
|
|
source: PRODUCTION_MODE ? 'authorized-origin' : origin |
|
|
}; |
|
|
} |
|
|
|
|
|
if (apiKey) { |
|
|
if (API_KEYS.length === 0) return { valid: true, source: 'no-keys-configured' }; |
|
|
return { |
|
|
valid: API_KEYS.includes(apiKey), |
|
|
source: 'api-key' |
|
|
}; |
|
|
} |
|
|
|
|
|
return { valid: false, source: 'unauthorized' }; |
|
|
} |
|
|
|
|
|
|
|
|
const allowedOrigins = process.env.ALLOWED_ORIGINS |
|
|
? process.env.ALLOWED_ORIGINS.split(',').map(o => o.trim()) |
|
|
: []; |
|
|
|
|
|
app.use(cors({ |
|
|
origin: function (origin, callback) { |
|
|
if (!origin) { |
|
|
return callback(null, true); |
|
|
} |
|
|
if (allowedOrigins.length === 0) return callback(null, true); |
|
|
if (allowedOrigins.includes(origin)) { |
|
|
callback(null, true); |
|
|
} else { |
|
|
log(`[Security] Blocked origin: ${maskOrigin(origin)}`); |
|
|
callback(new Error('Not allowed by CORS')); |
|
|
} |
|
|
}, |
|
|
credentials: true |
|
|
})); |
|
|
|
|
|
app.use((err, req, res, next) => { |
|
|
if (err.message === 'Not allowed by CORS') { |
|
|
return res.status(403).json({ error: 'Access denied' }); |
|
|
} |
|
|
next(err); |
|
|
}); |
|
|
|
|
|
|
|
|
app.use((req, res, next) => { |
|
|
if (req.path === '/' || req.path === '/health' || req.path.startsWith('/test-')) { |
|
|
return next(); |
|
|
} |
|
|
|
|
|
const auth = authenticateRequest(req); |
|
|
|
|
|
if (!auth.valid) { |
|
|
log(`[Security] Blocked unauthorized request from ${maskIP(req.ip)}`); |
|
|
return res.status(401).json({ |
|
|
error: 'Unauthorized', |
|
|
message: 'Valid origin or API key required' |
|
|
}); |
|
|
} |
|
|
|
|
|
log(`[Auth] Request authorized from: ${auth.source}`); |
|
|
next(); |
|
|
}); |
|
|
|
|
|
|
|
|
const limiter = rateLimit({ |
|
|
windowMs: 15 * 60 * 1000, |
|
|
max: 100, |
|
|
message: { error: "Too many requests" }, |
|
|
keyGenerator: (req) => { |
|
|
const ip = req.ip || req.connection.remoteAddress || 'unknown'; |
|
|
return ip.replace(/:\d+[^:]*$/, ''); |
|
|
} |
|
|
}); |
|
|
app.use(limiter); |
|
|
|
|
|
|
|
|
app.use((req, res, next) => { |
|
|
const ip = maskIP((req.ip || 'unknown').replace(/:\d+[^:]*$/, '')); |
|
|
const origin = maskOrigin(req.headers.origin || 'no-origin'); |
|
|
const apiKey = req.headers['x-api-key'] ? '***' : 'none'; |
|
|
const path = PRODUCTION_MODE ? req.path.split('/').slice(0, 4).join('/') + '/***' : req.path; |
|
|
|
|
|
log(`[${new Date().toISOString()}] ${ip} -> ${req.method} ${path} | Origin: ${origin} | Key: ${apiKey}`); |
|
|
next(); |
|
|
}); |
|
|
|
|
|
|
|
|
const dailyUsage = new Map(); |
|
|
let lastResetDate = new Date().toDateString(); |
|
|
|
|
|
function checkDailyReset() { |
|
|
const today = new Date().toDateString(); |
|
|
if (today !== lastResetDate) { |
|
|
dailyUsage.clear(); |
|
|
lastResetDate = today; |
|
|
log('[System] Daily usage counters reset'); |
|
|
} |
|
|
} |
|
|
|
|
|
setInterval(checkDailyReset, 60 * 60 * 1000); |
|
|
|
|
|
app.use((req, res, next) => { |
|
|
if (req.method === 'POST' && req.path.includes('/prediction/')) { |
|
|
checkDailyReset(); |
|
|
|
|
|
const ip = (req.ip || 'unknown').replace(/:\d+[^:]*$/, ''); |
|
|
const count = dailyUsage.get(ip) || 0; |
|
|
|
|
|
if (count >= 200) { |
|
|
return res.status(429).json({ |
|
|
error: 'Daily limit reached', |
|
|
message: 'You have reached your daily usage limit. Try again tomorrow.' |
|
|
}); |
|
|
} |
|
|
|
|
|
dailyUsage.set(ip, count + 1); |
|
|
|
|
|
if (dailyUsage.size > 10000) { |
|
|
log('[System] Daily usage map too large, clearing oldest entries', 'debug'); |
|
|
const entries = Array.from(dailyUsage.entries()).slice(0, 1000); |
|
|
entries.forEach(([key]) => dailyUsage.delete(key)); |
|
|
} |
|
|
} |
|
|
next(); |
|
|
}); |
|
|
|
|
|
|
|
|
app.use((req, res, next) => { |
|
|
if (req.method !== 'POST') { |
|
|
return next(); |
|
|
} |
|
|
|
|
|
const userAgent = (req.headers['user-agent'] || '').toLowerCase(); |
|
|
const suspiciousBots = ['python-requests', 'curl/', 'wget/', 'scrapy', 'crawler']; |
|
|
|
|
|
const hasValidApiKey = req.headers['x-api-key'] && API_KEYS.includes(req.headers['x-api-key']); |
|
|
if (hasValidApiKey) { |
|
|
return next(); |
|
|
} |
|
|
|
|
|
const isBot = suspiciousBots.some(bot => userAgent.includes(bot)); |
|
|
|
|
|
if (isBot) { |
|
|
log(`[Security] Blocked bot from ${maskIP(req.ip)}`); |
|
|
return res.status(403).json({ |
|
|
error: 'Automated access detected', |
|
|
message: 'This service is for web browsers only.' |
|
|
}); |
|
|
} |
|
|
next(); |
|
|
}); |
|
|
|
|
|
|
|
|
let INSTANCES = []; |
|
|
try { |
|
|
INSTANCES = JSON.parse(process.env.FLOWISE_INSTANCES || '[]'); |
|
|
log(`[System] Loaded ${INSTANCES.length} instances`); |
|
|
if (!Array.isArray(INSTANCES) || INSTANCES.length === 0) { |
|
|
log('ERROR: FLOWISE_INSTANCES must be a non-empty array'); |
|
|
} |
|
|
} catch (e) { |
|
|
log("CRITICAL ERROR: Could not parse FLOWISE_INSTANCES JSON"); |
|
|
} |
|
|
|
|
|
|
|
|
const flowCache = new Map(); |
|
|
|
|
|
setInterval(() => { |
|
|
const now = Date.now(); |
|
|
for (const [key, value] of flowCache.entries()) { |
|
|
if (value.timestamp && now - value.timestamp > 10 * 60 * 1000) { |
|
|
flowCache.delete(key); |
|
|
} |
|
|
} |
|
|
}, 10 * 60 * 1000); |
|
|
|
|
|
|
|
|
async function fetchWithTimeout(url, options, timeout = 10000) { |
|
|
return Promise.race([ |
|
|
fetch(url, options), |
|
|
new Promise((_, reject) => |
|
|
setTimeout(() => reject(new Error('Request timeout')), timeout) |
|
|
) |
|
|
]); |
|
|
} |
|
|
|
|
|
|
|
|
async function resolveChatflowId(instanceNum, botName) { |
|
|
const cacheKey = `${instanceNum}-${botName}`; |
|
|
|
|
|
const cached = flowCache.get(cacheKey); |
|
|
if (cached && cached.timestamp && Date.now() - cached.timestamp < 5 * 60 * 1000) { |
|
|
return { id: cached.id, instance: cached.instance }; |
|
|
} |
|
|
|
|
|
if (isNaN(instanceNum) || instanceNum < 1 || instanceNum > INSTANCES.length) { |
|
|
throw new Error(`Instance ${instanceNum} does not exist. Valid: 1-${INSTANCES.length}`); |
|
|
} |
|
|
|
|
|
const instance = INSTANCES[instanceNum - 1]; |
|
|
logSensitive(`[System] Looking up '${botName}' in instance ${instanceNum}...`); |
|
|
|
|
|
const headers = {}; |
|
|
if (instance.key && instance.key.length > 0) { |
|
|
headers['Authorization'] = `Bearer ${instance.key}`; |
|
|
} |
|
|
|
|
|
const response = await fetchWithTimeout(`${instance.url}/api/v1/chatflows`, { headers }, 10000); |
|
|
|
|
|
if (!response.ok) { |
|
|
throw new Error(`Instance ${instanceNum} returned status ${response.status}`); |
|
|
} |
|
|
|
|
|
const flows = await response.json(); |
|
|
|
|
|
if (!Array.isArray(flows)) { |
|
|
throw new Error(`Instance ${instanceNum} returned invalid response`); |
|
|
} |
|
|
|
|
|
const match = flows.find(f => f.name && f.name.toLowerCase().replace(/\s+/g, '-') === botName); |
|
|
|
|
|
if (!match || !match.id) { |
|
|
throw new Error(`Bot '${botName}' not found in instance ${instanceNum}`); |
|
|
} |
|
|
|
|
|
flowCache.set(cacheKey, { |
|
|
id: match.id, |
|
|
instance: instance, |
|
|
timestamp: Date.now() |
|
|
}); |
|
|
|
|
|
logSensitive(`[System] Found '${botName}' -> ${match.id}`); |
|
|
|
|
|
return { id: match.id, instance }; |
|
|
} |
|
|
|
|
|
|
|
|
async function handleStreamingResponse(flowiseResponse, clientRes) { |
|
|
clientRes.setHeader('Content-Type', 'text/event-stream'); |
|
|
clientRes.setHeader('Cache-Control', 'no-cache'); |
|
|
clientRes.setHeader('Connection', 'keep-alive'); |
|
|
clientRes.setHeader('X-Accel-Buffering', 'no'); |
|
|
|
|
|
log('[Streaming] Forwarding SSE stream...'); |
|
|
|
|
|
let streamStarted = false; |
|
|
let dataReceived = false; |
|
|
let lastDataTime = Date.now(); |
|
|
let totalBytes = 0; |
|
|
|
|
|
const timeoutCheck = setInterval(() => { |
|
|
const timeSinceData = Date.now() - lastDataTime; |
|
|
|
|
|
if (timeSinceData > 45000) { |
|
|
log(`[Streaming] Timeout - no data for ${(timeSinceData/1000).toFixed(1)}s`); |
|
|
clearInterval(timeoutCheck); |
|
|
|
|
|
if (!dataReceived) { |
|
|
log('[Streaming] Stream completed with NO data received!'); |
|
|
if (!streamStarted) { |
|
|
clientRes.status(504).json({ |
|
|
error: 'Gateway timeout', |
|
|
message: 'No response from chatbot within 45 seconds' |
|
|
}); |
|
|
} else { |
|
|
clientRes.write('\n\nevent: error\ndata: {"error": "Response timeout - no data received"}\n\n'); |
|
|
} |
|
|
} |
|
|
clientRes.end(); |
|
|
} |
|
|
}, 5000); |
|
|
|
|
|
flowiseResponse.body.on('data', (chunk) => { |
|
|
clearTimeout(timeoutCheck); |
|
|
streamStarted = true; |
|
|
dataReceived = true; |
|
|
lastDataTime = Date.now(); |
|
|
totalBytes += chunk.length; |
|
|
|
|
|
logSensitive(`[Streaming] Received chunk: ${chunk.length} bytes (total: ${totalBytes})`); |
|
|
clientRes.write(chunk); |
|
|
}); |
|
|
|
|
|
flowiseResponse.body.on('end', () => { |
|
|
clearInterval(timeoutCheck); |
|
|
|
|
|
if (dataReceived) { |
|
|
log(`[Streaming] Stream completed - ${totalBytes} bytes`); |
|
|
} else { |
|
|
log('[Streaming] Stream completed but NO data received!'); |
|
|
} |
|
|
|
|
|
clientRes.end(); |
|
|
}); |
|
|
|
|
|
flowiseResponse.body.on('error', (err) => { |
|
|
clearInterval(timeoutCheck); |
|
|
log('[Streaming Error]'); |
|
|
|
|
|
if (streamStarted && dataReceived) { |
|
|
clientRes.write(`\n\nevent: error\ndata: {"error": "Stream interrupted"}\n\n`); |
|
|
} else if (!streamStarted) { |
|
|
clientRes.status(500).json({ error: 'Stream failed to start' }); |
|
|
} |
|
|
clientRes.end(); |
|
|
}); |
|
|
} |
|
|
|
|
|
|
|
|
app.post('/api/v1/prediction/:instanceNum/:botName', async (req, res) => { |
|
|
try { |
|
|
const instanceNum = parseInt(req.params.instanceNum); |
|
|
const botName = req.params.botName.toLowerCase().substring(0, 100); |
|
|
|
|
|
if (!req.body.question || typeof req.body.question !== 'string') { |
|
|
return res.status(400).json({ |
|
|
error: 'Invalid request', |
|
|
message: 'Question must be a non-empty string.' |
|
|
}); |
|
|
} |
|
|
|
|
|
if (req.body.question.length > 2000) { |
|
|
return res.status(400).json({ |
|
|
error: 'Message too long', |
|
|
message: 'Please keep messages under 2000 characters.' |
|
|
}); |
|
|
} |
|
|
|
|
|
const { id, instance } = await resolveChatflowId(instanceNum, botName); |
|
|
|
|
|
const headers = { 'Content-Type': 'application/json' }; |
|
|
if (instance.key && instance.key.length > 0) { |
|
|
headers['Authorization'] = `Bearer ${instance.key}`; |
|
|
} |
|
|
|
|
|
const startTime = Date.now(); |
|
|
logSensitive(`[Timing] Calling Flowise at ${new Date().toISOString()}`); |
|
|
|
|
|
const response = await fetchWithTimeout( |
|
|
`${instance.url}/api/v1/prediction/${id}`, |
|
|
{ |
|
|
method: 'POST', |
|
|
headers, |
|
|
body: JSON.stringify(req.body) |
|
|
}, |
|
|
60000 |
|
|
); |
|
|
|
|
|
const duration = Date.now() - startTime; |
|
|
log(`[Timing] Response received in ${(duration/1000).toFixed(1)}s`); |
|
|
|
|
|
if (!response.ok) { |
|
|
const errorText = await response.text(); |
|
|
logSensitive(`[Error] Instance returned ${response.status}: ${errorText.substring(0, 100)}`); |
|
|
return res.status(response.status).json({ |
|
|
error: 'Flowise instance error', |
|
|
message: 'The chatbot instance returned an error.' |
|
|
}); |
|
|
} |
|
|
|
|
|
const contentType = response.headers.get('content-type') || ''; |
|
|
|
|
|
if (contentType.includes('text/event-stream')) { |
|
|
log('[Streaming] Detected SSE response'); |
|
|
return handleStreamingResponse(response, res); |
|
|
} |
|
|
|
|
|
log('[Non-streaming] Parsing JSON response'); |
|
|
const text = await response.text(); |
|
|
|
|
|
try { |
|
|
const data = JSON.parse(text); |
|
|
res.status(200).json(data); |
|
|
} catch (e) { |
|
|
log('[Error] Invalid JSON response'); |
|
|
res.status(500).json({ error: 'Invalid response from Flowise' }); |
|
|
} |
|
|
|
|
|
} catch (error) { |
|
|
log(`[Error] ${error.message}`); |
|
|
res.status(500).json({ |
|
|
error: 'Request failed', |
|
|
message: error.message |
|
|
}); |
|
|
} |
|
|
}); |
|
|
|
|
|
|
|
|
app.get('/api/v1/public-chatbotConfig/:instanceNum/:botName', async (req, res) => { |
|
|
try { |
|
|
const instanceNum = parseInt(req.params.instanceNum); |
|
|
const botName = req.params.botName.toLowerCase().substring(0, 100); |
|
|
|
|
|
const { id, instance } = await resolveChatflowId(instanceNum, botName); |
|
|
|
|
|
const headers = {}; |
|
|
if (instance.key && instance.key.length > 0) { |
|
|
headers['Authorization'] = `Bearer ${instance.key}`; |
|
|
} |
|
|
|
|
|
const response = await fetchWithTimeout( |
|
|
`${instance.url}/api/v1/public-chatbotConfig/${id}`, |
|
|
{ headers }, |
|
|
10000 |
|
|
); |
|
|
|
|
|
if (!response.ok) { |
|
|
return res.status(response.status).json({ error: 'Config not available' }); |
|
|
} |
|
|
|
|
|
const data = await response.json(); |
|
|
res.status(200).json(data); |
|
|
|
|
|
} catch (error) { |
|
|
log('[Error] Config request failed'); |
|
|
res.status(404).json({ error: error.message }); |
|
|
} |
|
|
}); |
|
|
|
|
|
|
|
|
app.get('/api/v1/chatflows-streaming/:instanceNum/:botName', async (req, res) => { |
|
|
try { |
|
|
const instanceNum = parseInt(req.params.instanceNum); |
|
|
const botName = req.params.botName.toLowerCase().substring(0, 100); |
|
|
|
|
|
const { id, instance } = await resolveChatflowId(instanceNum, botName); |
|
|
|
|
|
const headers = {}; |
|
|
if (instance.key && instance.key.length > 0) { |
|
|
headers['Authorization'] = `Bearer ${instance.key}`; |
|
|
} |
|
|
|
|
|
const response = await fetchWithTimeout( |
|
|
`${instance.url}/api/v1/chatflows-streaming/${id}`, |
|
|
{ headers }, |
|
|
10000 |
|
|
); |
|
|
|
|
|
if (!response.ok) { |
|
|
return res.status(200).json({ isStreaming: false }); |
|
|
} |
|
|
|
|
|
const data = await response.json(); |
|
|
res.status(200).json(data); |
|
|
|
|
|
} catch (error) { |
|
|
log('[Error] Streaming check failed', 'debug'); |
|
|
res.status(200).json({ isStreaming: false }); |
|
|
} |
|
|
}); |
|
|
|
|
|
|
|
|
if (!PRODUCTION_MODE) { |
|
|
app.get('/test-stream', (req, res) => { |
|
|
res.setHeader('Content-Type', 'text/event-stream'); |
|
|
res.setHeader('Cache-Control', 'no-cache'); |
|
|
res.setHeader('Connection', 'keep-alive'); |
|
|
|
|
|
let count = 0; |
|
|
const interval = setInterval(() => { |
|
|
count++; |
|
|
res.write(`data: {"message": "Test ${count}"}\n\n`); |
|
|
|
|
|
if (count >= 5) { |
|
|
clearInterval(interval); |
|
|
res.end(); |
|
|
} |
|
|
}, 500); |
|
|
}); |
|
|
} |
|
|
|
|
|
|
|
|
app.get('/', (req, res) => res.send('Federated Proxy Active')); |
|
|
|
|
|
app.get('/health', (req, res) => { |
|
|
res.json({ |
|
|
status: 'healthy', |
|
|
instances: INSTANCES.length, |
|
|
cached_bots: flowCache.size, |
|
|
daily_active_ips: dailyUsage.size, |
|
|
uptime: process.uptime(), |
|
|
production_mode: PRODUCTION_MODE |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
|
app.use((req, res) => { |
|
|
res.status(404).json({ error: 'Route not found' }); |
|
|
}); |
|
|
|
|
|
|
|
|
app.use((err, req, res, next) => { |
|
|
log('[Error] Unhandled error'); |
|
|
res.status(500).json({ error: 'Internal server error' }); |
|
|
}); |
|
|
|
|
|
|
|
|
const server = app.listen(7860, '0.0.0.0', () => { |
|
|
log('===== Federated Proxy Started ====='); |
|
|
log(`Port: 7860`); |
|
|
log(`Mode: ${PRODUCTION_MODE ? 'PRODUCTION' : 'DEVELOPMENT'}`); |
|
|
log(`Instances: ${INSTANCES.length}`); |
|
|
log(`Allowed Origins: ${allowedOrigins.length || 'Open'}`); |
|
|
log(`API Keys: ${API_KEYS.length || 'None'}`); |
|
|
log('===================================='); |
|
|
}); |
|
|
|
|
|
process.on('SIGTERM', () => { |
|
|
log('[System] Shutting down gracefully...'); |
|
|
server.close(() => { |
|
|
log('[System] Server closed'); |
|
|
process.exit(0); |
|
|
}); |
|
|
}); |
|
|
|
|
|
process.on('SIGINT', () => { |
|
|
log('[System] Shutting down gracefully...'); |
|
|
server.close(() => { |
|
|
log('[System] Server closed'); |
|
|
process.exit(0); |
|
|
}); |
|
|
}); |
|
|
|