proxy / server.js
unknownfriend00007's picture
Update server.js
1321522 verified
const express = require('express');
const fetch = require('node-fetch');
const cors = require('cors');
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
require('dotenv').config();
const app = express();
// --- PRODUCTION MODE ---
const PRODUCTION_MODE = process.env.PRODUCTION_MODE === 'true';
function log(message, level = 'info') {
if (PRODUCTION_MODE && level === 'debug') return;
console.log(message);
}
function logSensitive(message) {
if (!PRODUCTION_MODE) console.log(message);
}
// Mask sensitive data
function maskIP(ip) {
if (PRODUCTION_MODE) {
const parts = ip.split('.');
return parts.length === 4 ? `${parts[0]}.${parts[1]}.***.**` : 'masked';
}
return ip;
}
function maskOrigin(origin) {
if (PRODUCTION_MODE && origin && origin !== 'no-origin') {
try {
const url = new URL(origin);
return `${url.protocol}//${url.hostname.substring(0, 3)}***`;
} catch {
return 'masked';
}
}
return origin;
}
app.use(helmet({
contentSecurityPolicy: false,
crossOriginEmbedderPolicy: false
}));
app.set('trust proxy', 1);
app.use(express.json({ limit: '1mb' }));
// --- API KEY AUTHENTICATION ---
const API_KEYS = process.env.API_KEYS
? process.env.API_KEYS.split(',').map(k => k.trim())
: [];
function authenticateRequest(req) {
const origin = req.headers.origin;
const apiKey = req.headers['x-api-key'];
if (origin) {
if (allowedOrigins.length === 0) return { valid: true, source: 'open-mode' };
return {
valid: allowedOrigins.includes(origin),
source: PRODUCTION_MODE ? 'authorized-origin' : origin
};
}
if (apiKey) {
if (API_KEYS.length === 0) return { valid: true, source: 'no-keys-configured' };
return {
valid: API_KEYS.includes(apiKey),
source: 'api-key'
};
}
return { valid: false, source: 'unauthorized' };
}
// --- CORS ---
const allowedOrigins = process.env.ALLOWED_ORIGINS
? process.env.ALLOWED_ORIGINS.split(',').map(o => o.trim())
: [];
app.use(cors({
origin: function (origin, callback) {
if (!origin) {
return callback(null, true);
}
if (allowedOrigins.length === 0) return callback(null, true);
if (allowedOrigins.includes(origin)) {
callback(null, true);
} else {
log(`[Security] Blocked origin: ${maskOrigin(origin)}`);
callback(new Error('Not allowed by CORS'));
}
},
credentials: true
}));
app.use((err, req, res, next) => {
if (err.message === 'Not allowed by CORS') {
return res.status(403).json({ error: 'Access denied' });
}
next(err);
});
// --- API KEY AUTHENTICATION MIDDLEWARE ---
app.use((req, res, next) => {
if (req.path === '/' || req.path === '/health' || req.path.startsWith('/test-')) {
return next();
}
const auth = authenticateRequest(req);
if (!auth.valid) {
log(`[Security] Blocked unauthorized request from ${maskIP(req.ip)}`);
return res.status(401).json({
error: 'Unauthorized',
message: 'Valid origin or API key required'
});
}
log(`[Auth] Request authorized from: ${auth.source}`);
next();
});
// --- RATE LIMITING ---
const limiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 100,
message: { error: "Too many requests" },
keyGenerator: (req) => {
const ip = req.ip || req.connection.remoteAddress || 'unknown';
return ip.replace(/:\d+[^:]*$/, '');
}
});
app.use(limiter);
// --- REQUEST LOGGING (SAFE) ---
app.use((req, res, next) => {
const ip = maskIP((req.ip || 'unknown').replace(/:\d+[^:]*$/, ''));
const origin = maskOrigin(req.headers.origin || 'no-origin');
const apiKey = req.headers['x-api-key'] ? '***' : 'none';
const path = PRODUCTION_MODE ? req.path.split('/').slice(0, 4).join('/') + '/***' : req.path;
log(`[${new Date().toISOString()}] ${ip} -> ${req.method} ${path} | Origin: ${origin} | Key: ${apiKey}`);
next();
});
// --- DAILY USAGE CAPS ---
const dailyUsage = new Map();
let lastResetDate = new Date().toDateString();
function checkDailyReset() {
const today = new Date().toDateString();
if (today !== lastResetDate) {
dailyUsage.clear();
lastResetDate = today;
log('[System] Daily usage counters reset');
}
}
setInterval(checkDailyReset, 60 * 60 * 1000);
app.use((req, res, next) => {
if (req.method === 'POST' && req.path.includes('/prediction/')) {
checkDailyReset();
const ip = (req.ip || 'unknown').replace(/:\d+[^:]*$/, '');
const count = dailyUsage.get(ip) || 0;
if (count >= 200) {
return res.status(429).json({
error: 'Daily limit reached',
message: 'You have reached your daily usage limit. Try again tomorrow.'
});
}
dailyUsage.set(ip, count + 1);
if (dailyUsage.size > 10000) {
log('[System] Daily usage map too large, clearing oldest entries', 'debug');
const entries = Array.from(dailyUsage.entries()).slice(0, 1000);
entries.forEach(([key]) => dailyUsage.delete(key));
}
}
next();
});
// --- BOT DETECTION ---
app.use((req, res, next) => {
if (req.method !== 'POST') {
return next();
}
const userAgent = (req.headers['user-agent'] || '').toLowerCase();
const suspiciousBots = ['python-requests', 'curl/', 'wget/', 'scrapy', 'crawler'];
const hasValidApiKey = req.headers['x-api-key'] && API_KEYS.includes(req.headers['x-api-key']);
if (hasValidApiKey) {
return next();
}
const isBot = suspiciousBots.some(bot => userAgent.includes(bot));
if (isBot) {
log(`[Security] Blocked bot from ${maskIP(req.ip)}`);
return res.status(403).json({
error: 'Automated access detected',
message: 'This service is for web browsers only.'
});
}
next();
});
// --- INSTANCES CONFIGURATION ---
let INSTANCES = [];
try {
INSTANCES = JSON.parse(process.env.FLOWISE_INSTANCES || '[]');
log(`[System] Loaded ${INSTANCES.length} instances`);
if (!Array.isArray(INSTANCES) || INSTANCES.length === 0) {
log('ERROR: FLOWISE_INSTANCES must be a non-empty array');
}
} catch (e) {
log("CRITICAL ERROR: Could not parse FLOWISE_INSTANCES JSON");
}
// --- CACHE WITH AUTO-CLEANUP ---
const flowCache = new Map();
setInterval(() => {
const now = Date.now();
for (const [key, value] of flowCache.entries()) {
if (value.timestamp && now - value.timestamp > 10 * 60 * 1000) {
flowCache.delete(key);
}
}
}, 10 * 60 * 1000);
// --- FETCH WITH TIMEOUT ---
async function fetchWithTimeout(url, options, timeout = 10000) {
return Promise.race([
fetch(url, options),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Request timeout')), timeout)
)
]);
}
// --- RESOLVE CHATFLOW ID ---
async function resolveChatflowId(instanceNum, botName) {
const cacheKey = `${instanceNum}-${botName}`;
const cached = flowCache.get(cacheKey);
if (cached && cached.timestamp && Date.now() - cached.timestamp < 5 * 60 * 1000) {
return { id: cached.id, instance: cached.instance };
}
if (isNaN(instanceNum) || instanceNum < 1 || instanceNum > INSTANCES.length) {
throw new Error(`Instance ${instanceNum} does not exist. Valid: 1-${INSTANCES.length}`);
}
const instance = INSTANCES[instanceNum - 1];
logSensitive(`[System] Looking up '${botName}' in instance ${instanceNum}...`);
const headers = {};
if (instance.key && instance.key.length > 0) {
headers['Authorization'] = `Bearer ${instance.key}`;
}
const response = await fetchWithTimeout(`${instance.url}/api/v1/chatflows`, { headers }, 10000);
if (!response.ok) {
throw new Error(`Instance ${instanceNum} returned status ${response.status}`);
}
const flows = await response.json();
if (!Array.isArray(flows)) {
throw new Error(`Instance ${instanceNum} returned invalid response`);
}
const match = flows.find(f => f.name && f.name.toLowerCase().replace(/\s+/g, '-') === botName);
if (!match || !match.id) {
throw new Error(`Bot '${botName}' not found in instance ${instanceNum}`);
}
flowCache.set(cacheKey, {
id: match.id,
instance: instance,
timestamp: Date.now()
});
logSensitive(`[System] Found '${botName}' -> ${match.id}`);
return { id: match.id, instance };
}
// --- STREAMING HANDLER ---
async function handleStreamingResponse(flowiseResponse, clientRes) {
clientRes.setHeader('Content-Type', 'text/event-stream');
clientRes.setHeader('Cache-Control', 'no-cache');
clientRes.setHeader('Connection', 'keep-alive');
clientRes.setHeader('X-Accel-Buffering', 'no');
log('[Streaming] Forwarding SSE stream...');
let streamStarted = false;
let dataReceived = false;
let lastDataTime = Date.now();
let totalBytes = 0;
const timeoutCheck = setInterval(() => {
const timeSinceData = Date.now() - lastDataTime;
if (timeSinceData > 45000) {
log(`[Streaming] Timeout - no data for ${(timeSinceData/1000).toFixed(1)}s`);
clearInterval(timeoutCheck);
if (!dataReceived) {
log('[Streaming] Stream completed with NO data received!');
if (!streamStarted) {
clientRes.status(504).json({
error: 'Gateway timeout',
message: 'No response from chatbot within 45 seconds'
});
} else {
clientRes.write('\n\nevent: error\ndata: {"error": "Response timeout - no data received"}\n\n');
}
}
clientRes.end();
}
}, 5000);
flowiseResponse.body.on('data', (chunk) => {
clearTimeout(timeoutCheck);
streamStarted = true;
dataReceived = true;
lastDataTime = Date.now();
totalBytes += chunk.length;
logSensitive(`[Streaming] Received chunk: ${chunk.length} bytes (total: ${totalBytes})`);
clientRes.write(chunk);
});
flowiseResponse.body.on('end', () => {
clearInterval(timeoutCheck);
if (dataReceived) {
log(`[Streaming] Stream completed - ${totalBytes} bytes`);
} else {
log('[Streaming] Stream completed but NO data received!');
}
clientRes.end();
});
flowiseResponse.body.on('error', (err) => {
clearInterval(timeoutCheck);
log('[Streaming Error]');
if (streamStarted && dataReceived) {
clientRes.write(`\n\nevent: error\ndata: {"error": "Stream interrupted"}\n\n`);
} else if (!streamStarted) {
clientRes.status(500).json({ error: 'Stream failed to start' });
}
clientRes.end();
});
}
// --- PREDICTION ROUTE ---
app.post('/api/v1/prediction/:instanceNum/:botName', async (req, res) => {
try {
const instanceNum = parseInt(req.params.instanceNum);
const botName = req.params.botName.toLowerCase().substring(0, 100);
if (!req.body.question || typeof req.body.question !== 'string') {
return res.status(400).json({
error: 'Invalid request',
message: 'Question must be a non-empty string.'
});
}
if (req.body.question.length > 2000) {
return res.status(400).json({
error: 'Message too long',
message: 'Please keep messages under 2000 characters.'
});
}
const { id, instance } = await resolveChatflowId(instanceNum, botName);
const headers = { 'Content-Type': 'application/json' };
if (instance.key && instance.key.length > 0) {
headers['Authorization'] = `Bearer ${instance.key}`;
}
const startTime = Date.now();
logSensitive(`[Timing] Calling Flowise at ${new Date().toISOString()}`);
const response = await fetchWithTimeout(
`${instance.url}/api/v1/prediction/${id}`,
{
method: 'POST',
headers,
body: JSON.stringify(req.body)
},
60000
);
const duration = Date.now() - startTime;
log(`[Timing] Response received in ${(duration/1000).toFixed(1)}s`);
if (!response.ok) {
const errorText = await response.text();
logSensitive(`[Error] Instance returned ${response.status}: ${errorText.substring(0, 100)}`);
return res.status(response.status).json({
error: 'Flowise instance error',
message: 'The chatbot instance returned an error.'
});
}
const contentType = response.headers.get('content-type') || '';
if (contentType.includes('text/event-stream')) {
log('[Streaming] Detected SSE response');
return handleStreamingResponse(response, res);
}
log('[Non-streaming] Parsing JSON response');
const text = await response.text();
try {
const data = JSON.parse(text);
res.status(200).json(data);
} catch (e) {
log('[Error] Invalid JSON response');
res.status(500).json({ error: 'Invalid response from Flowise' });
}
} catch (error) {
log(`[Error] ${error.message}`);
res.status(500).json({
error: 'Request failed',
message: error.message
});
}
});
// --- CONFIG ROUTE ---
app.get('/api/v1/public-chatbotConfig/:instanceNum/:botName', async (req, res) => {
try {
const instanceNum = parseInt(req.params.instanceNum);
const botName = req.params.botName.toLowerCase().substring(0, 100);
const { id, instance } = await resolveChatflowId(instanceNum, botName);
const headers = {};
if (instance.key && instance.key.length > 0) {
headers['Authorization'] = `Bearer ${instance.key}`;
}
const response = await fetchWithTimeout(
`${instance.url}/api/v1/public-chatbotConfig/${id}`,
{ headers },
10000
);
if (!response.ok) {
return res.status(response.status).json({ error: 'Config not available' });
}
const data = await response.json();
res.status(200).json(data);
} catch (error) {
log('[Error] Config request failed');
res.status(404).json({ error: error.message });
}
});
// --- STREAMING CHECK ROUTE ---
app.get('/api/v1/chatflows-streaming/:instanceNum/:botName', async (req, res) => {
try {
const instanceNum = parseInt(req.params.instanceNum);
const botName = req.params.botName.toLowerCase().substring(0, 100);
const { id, instance } = await resolveChatflowId(instanceNum, botName);
const headers = {};
if (instance.key && instance.key.length > 0) {
headers['Authorization'] = `Bearer ${instance.key}`;
}
const response = await fetchWithTimeout(
`${instance.url}/api/v1/chatflows-streaming/${id}`,
{ headers },
10000
);
if (!response.ok) {
return res.status(200).json({ isStreaming: false });
}
const data = await response.json();
res.status(200).json(data);
} catch (error) {
log('[Error] Streaming check failed', 'debug');
res.status(200).json({ isStreaming: false });
}
});
// --- TEST ENDPOINTS (DISABLED IN PRODUCTION) ---
if (!PRODUCTION_MODE) {
app.get('/test-stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
let count = 0;
const interval = setInterval(() => {
count++;
res.write(`data: {"message": "Test ${count}"}\n\n`);
if (count >= 5) {
clearInterval(interval);
res.end();
}
}, 500);
});
}
// --- HEALTH CHECK ---
app.get('/', (req, res) => res.send('Federated Proxy Active'));
app.get('/health', (req, res) => {
res.json({
status: 'healthy',
instances: INSTANCES.length,
cached_bots: flowCache.size,
daily_active_ips: dailyUsage.size,
uptime: process.uptime(),
production_mode: PRODUCTION_MODE
});
});
// --- 404 HANDLER ---
app.use((req, res) => {
res.status(404).json({ error: 'Route not found' });
});
// --- GLOBAL ERROR HANDLER ---
app.use((err, req, res, next) => {
log('[Error] Unhandled error');
res.status(500).json({ error: 'Internal server error' });
});
// --- SERVER START ---
const server = app.listen(7860, '0.0.0.0', () => {
log('===== Federated Proxy Started =====');
log(`Port: 7860`);
log(`Mode: ${PRODUCTION_MODE ? 'PRODUCTION' : 'DEVELOPMENT'}`);
log(`Instances: ${INSTANCES.length}`);
log(`Allowed Origins: ${allowedOrigins.length || 'Open'}`);
log(`API Keys: ${API_KEYS.length || 'None'}`);
log('====================================');
});
process.on('SIGTERM', () => {
log('[System] Shutting down gracefully...');
server.close(() => {
log('[System] Server closed');
process.exit(0);
});
});
process.on('SIGINT', () => {
log('[System] Shutting down gracefully...');
server.close(() => {
log('[System] Server closed');
process.exit(0);
});
});