Spaces:
Configuration error
Configuration error
| import express, { Request, Response } from 'express' | |
| import multer from 'multer' | |
| import path from 'path' | |
| import cors from 'cors' | |
| import http from 'http' | |
| import * as fs from 'fs' | |
| import basicAuth from 'express-basic-auth' | |
| import { Server } from 'socket.io' | |
| import { | |
| IChatFlow, | |
| IncomingInput, | |
| IReactFlowNode, | |
| IReactFlowObject, | |
| INodeData, | |
| IDatabaseExport, | |
| IRunChatflowMessageValue, | |
| IChildProcessMessage | |
| } from './Interface' | |
| import { | |
| getNodeModulesPackagePath, | |
| getStartingNodes, | |
| buildLangchain, | |
| getEndingNode, | |
| constructGraphs, | |
| resolveVariables, | |
| isStartNodeDependOnInput, | |
| getAPIKeys, | |
| addAPIKey, | |
| updateAPIKey, | |
| deleteAPIKey, | |
| compareKeys, | |
| mapMimeTypeToInputField, | |
| findAvailableConfigs, | |
| isSameOverrideConfig, | |
| replaceAllAPIKeys, | |
| isFlowValidForStream, | |
| isVectorStoreFaiss | |
| } from './utils' | |
| import { cloneDeep } from 'lodash' | |
| import { getDataSource } from './DataSource' | |
| import { NodesPool } from './NodesPool' | |
| import { ChatFlow } from './entity/ChatFlow' | |
| import { ChatMessage } from './entity/ChatMessage' | |
| import { ChatflowPool } from './ChatflowPool' | |
| import { ICommonObject } from 'flowise-components' | |
| import { fork } from 'child_process' | |
| export class App { | |
| app: express.Application | |
| nodesPool: NodesPool | |
| chatflowPool: ChatflowPool | |
| AppDataSource = getDataSource() | |
| constructor() { | |
| this.app = express() | |
| } | |
| async initDatabase() { | |
| // Initialize database | |
| this.AppDataSource.initialize() | |
| .then(async () => { | |
| console.info('📦[server]: Data Source has been initialized!') | |
| // Initialize pools | |
| this.nodesPool = new NodesPool() | |
| await this.nodesPool.initialize() | |
| this.chatflowPool = new ChatflowPool() | |
| // Initialize API keys | |
| await getAPIKeys() | |
| }) | |
| .catch((err) => { | |
| console.error('❌[server]: Error during Data Source initialization:', err) | |
| }) | |
| } | |
| async config(socketIO?: Server) { | |
| // Limit is needed to allow sending/receiving base64 encoded string | |
| this.app.use(express.json({ limit: '50mb' })) | |
| this.app.use(express.urlencoded({ limit: '50mb', extended: true })) | |
| // Allow access from * | |
| this.app.use(cors()) | |
| if (process.env.FLOWISE_USERNAME && process.env.FLOWISE_PASSWORD) { | |
| const username = process.env.FLOWISE_USERNAME | |
| const password = process.env.FLOWISE_PASSWORD | |
| const basicAuthMiddleware = basicAuth({ | |
| users: { [username]: password } | |
| }) | |
| const whitelistURLs = ['/api/v1/prediction/', '/api/v1/node-icon/', '/api/v1/chatflows-streaming'] | |
| this.app.use((req, res, next) => { | |
| if (req.url.includes('/api/v1/')) { | |
| whitelistURLs.some((url) => req.url.includes(url)) ? next() : basicAuthMiddleware(req, res, next) | |
| } else next() | |
| }) | |
| } | |
| const upload = multer({ dest: `${path.join(__dirname, '..', 'uploads')}/` }) | |
| // ---------------------------------------- | |
| // Nodes | |
| // ---------------------------------------- | |
| // Get all component nodes | |
| this.app.get('/api/v1/nodes', (req: Request, res: Response) => { | |
| const returnData = [] | |
| for (const nodeName in this.nodesPool.componentNodes) { | |
| const clonedNode = cloneDeep(this.nodesPool.componentNodes[nodeName]) | |
| returnData.push(clonedNode) | |
| } | |
| return res.json(returnData) | |
| }) | |
| // Get specific component node via name | |
| this.app.get('/api/v1/nodes/:name', (req: Request, res: Response) => { | |
| if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) { | |
| return res.json(this.nodesPool.componentNodes[req.params.name]) | |
| } else { | |
| throw new Error(`Node ${req.params.name} not found`) | |
| } | |
| }) | |
| // Returns specific component node icon via name | |
| this.app.get('/api/v1/node-icon/:name', (req: Request, res: Response) => { | |
| if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) { | |
| const nodeInstance = this.nodesPool.componentNodes[req.params.name] | |
| if (nodeInstance.icon === undefined) { | |
| throw new Error(`Node ${req.params.name} icon not found`) | |
| } | |
| if (nodeInstance.icon.endsWith('.svg') || nodeInstance.icon.endsWith('.png') || nodeInstance.icon.endsWith('.jpg')) { | |
| const filepath = nodeInstance.icon | |
| res.sendFile(filepath) | |
| } else { | |
| throw new Error(`Node ${req.params.name} icon is missing icon`) | |
| } | |
| } else { | |
| throw new Error(`Node ${req.params.name} not found`) | |
| } | |
| }) | |
| // ---------------------------------------- | |
| // Chatflows | |
| // ---------------------------------------- | |
| // Get all chatflows | |
| this.app.get('/api/v1/chatflows', async (req: Request, res: Response) => { | |
| const chatflows: IChatFlow[] = await this.AppDataSource.getRepository(ChatFlow).find() | |
| return res.json(chatflows) | |
| }) | |
| // Get specific chatflow via id | |
| this.app.get('/api/v1/chatflows/:id', async (req: Request, res: Response) => { | |
| const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ | |
| id: req.params.id | |
| }) | |
| if (chatflow) return res.json(chatflow) | |
| return res.status(404).send(`Chatflow ${req.params.id} not found`) | |
| }) | |
| // Save chatflow | |
| this.app.post('/api/v1/chatflows', async (req: Request, res: Response) => { | |
| const body = req.body | |
| const newChatFlow = new ChatFlow() | |
| Object.assign(newChatFlow, body) | |
| const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow) | |
| const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow) | |
| return res.json(results) | |
| }) | |
| // Update chatflow | |
| this.app.put('/api/v1/chatflows/:id', async (req: Request, res: Response) => { | |
| const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ | |
| id: req.params.id | |
| }) | |
| if (!chatflow) { | |
| res.status(404).send(`Chatflow ${req.params.id} not found`) | |
| return | |
| } | |
| const body = req.body | |
| const updateChatFlow = new ChatFlow() | |
| Object.assign(updateChatFlow, body) | |
| this.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow) | |
| const result = await this.AppDataSource.getRepository(ChatFlow).save(chatflow) | |
| // Update chatflowpool inSync to false, to build Langchain again because data has been changed | |
| this.chatflowPool.updateInSync(chatflow.id, false) | |
| return res.json(result) | |
| }) | |
| // Delete chatflow via id | |
| this.app.delete('/api/v1/chatflows/:id', async (req: Request, res: Response) => { | |
| const results = await this.AppDataSource.getRepository(ChatFlow).delete({ id: req.params.id }) | |
| return res.json(results) | |
| }) | |
| // Check if chatflow valid for streaming | |
| this.app.get('/api/v1/chatflows-streaming/:id', async (req: Request, res: Response) => { | |
| const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ | |
| id: req.params.id | |
| }) | |
| if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`) | |
| /*** Get Ending Node with Directed Graph ***/ | |
| const flowData = chatflow.flowData | |
| const parsedFlowData: IReactFlowObject = JSON.parse(flowData) | |
| const nodes = parsedFlowData.nodes | |
| const edges = parsedFlowData.edges | |
| const { graph, nodeDependencies } = constructGraphs(nodes, edges) | |
| const endingNodeId = getEndingNode(nodeDependencies, graph) | |
| if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`) | |
| const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data | |
| if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`) | |
| const obj = { | |
| isStreaming: isFlowValidForStream(nodes, endingNodeData) | |
| } | |
| return res.json(obj) | |
| }) | |
| // ---------------------------------------- | |
| // ChatMessage | |
| // ---------------------------------------- | |
| // Get all chatmessages from chatflowid | |
| this.app.get('/api/v1/chatmessage/:id', async (req: Request, res: Response) => { | |
| const chatmessages = await this.AppDataSource.getRepository(ChatMessage).findBy({ | |
| chatflowid: req.params.id | |
| }) | |
| return res.json(chatmessages) | |
| }) | |
| // Add chatmessages for chatflowid | |
| this.app.post('/api/v1/chatmessage/:id', async (req: Request, res: Response) => { | |
| const body = req.body | |
| const newChatMessage = new ChatMessage() | |
| Object.assign(newChatMessage, body) | |
| const chatmessage = this.AppDataSource.getRepository(ChatMessage).create(newChatMessage) | |
| const results = await this.AppDataSource.getRepository(ChatMessage).save(chatmessage) | |
| return res.json(results) | |
| }) | |
| // Delete all chatmessages from chatflowid | |
| this.app.delete('/api/v1/chatmessage/:id', async (req: Request, res: Response) => { | |
| const results = await this.AppDataSource.getRepository(ChatMessage).delete({ chatflowid: req.params.id }) | |
| return res.json(results) | |
| }) | |
| // ---------------------------------------- | |
| // Configuration | |
| // ---------------------------------------- | |
| this.app.get('/api/v1/flow-config/:id', async (req: Request, res: Response) => { | |
| const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ | |
| id: req.params.id | |
| }) | |
| if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`) | |
| const flowData = chatflow.flowData | |
| const parsedFlowData: IReactFlowObject = JSON.parse(flowData) | |
| const nodes = parsedFlowData.nodes | |
| const availableConfigs = findAvailableConfigs(nodes) | |
| return res.json(availableConfigs) | |
| }) | |
| // ---------------------------------------- | |
| // Export Load Chatflow & ChatMessage & Apikeys | |
| // ---------------------------------------- | |
| this.app.get('/api/v1/database/export', async (req: Request, res: Response) => { | |
| const chatmessages = await this.AppDataSource.getRepository(ChatMessage).find() | |
| const chatflows = await this.AppDataSource.getRepository(ChatFlow).find() | |
| const apikeys = await getAPIKeys() | |
| const result: IDatabaseExport = { | |
| chatmessages, | |
| chatflows, | |
| apikeys | |
| } | |
| return res.json(result) | |
| }) | |
| this.app.post('/api/v1/database/load', async (req: Request, res: Response) => { | |
| const databaseItems: IDatabaseExport = req.body | |
| await this.AppDataSource.getRepository(ChatFlow).delete({}) | |
| await this.AppDataSource.getRepository(ChatMessage).delete({}) | |
| let error = '' | |
| // Get a new query runner instance | |
| const queryRunner = this.AppDataSource.createQueryRunner() | |
| // Start a new transaction | |
| await queryRunner.startTransaction() | |
| try { | |
| const chatflows: ChatFlow[] = databaseItems.chatflows | |
| const chatmessages: ChatMessage[] = databaseItems.chatmessages | |
| await queryRunner.manager.insert(ChatFlow, chatflows) | |
| await queryRunner.manager.insert(ChatMessage, chatmessages) | |
| await queryRunner.commitTransaction() | |
| } catch (err: any) { | |
| error = err?.message ?? 'Error loading database' | |
| await queryRunner.rollbackTransaction() | |
| } finally { | |
| await queryRunner.release() | |
| } | |
| await replaceAllAPIKeys(databaseItems.apikeys) | |
| if (error) return res.status(500).send(error) | |
| return res.status(201).send('OK') | |
| }) | |
| // ---------------------------------------- | |
| // Prediction | |
| // ---------------------------------------- | |
| // Send input message and get prediction result (External) | |
| this.app.post('/api/v1/prediction/:id', upload.array('files'), async (req: Request, res: Response) => { | |
| await this.processPrediction(req, res, socketIO) | |
| }) | |
| // Send input message and get prediction result (Internal) | |
| this.app.post('/api/v1/internal-prediction/:id', async (req: Request, res: Response) => { | |
| await this.processPrediction(req, res, socketIO, true) | |
| }) | |
| // ---------------------------------------- | |
| // Marketplaces | |
| // ---------------------------------------- | |
| // Get all chatflows for marketplaces | |
| this.app.get('/api/v1/marketplaces', async (req: Request, res: Response) => { | |
| const marketplaceDir = path.join(__dirname, '..', 'marketplaces') | |
| const jsonsInDir = fs.readdirSync(marketplaceDir).filter((file) => path.extname(file) === '.json') | |
| const templates: any[] = [] | |
| jsonsInDir.forEach((file, index) => { | |
| const filePath = path.join(__dirname, '..', 'marketplaces', file) | |
| const fileData = fs.readFileSync(filePath) | |
| const fileDataObj = JSON.parse(fileData.toString()) | |
| const template = { | |
| id: index, | |
| name: file.split('.json')[0], | |
| flowData: fileData.toString(), | |
| description: fileDataObj?.description || '' | |
| } | |
| templates.push(template) | |
| }) | |
| return res.json(templates) | |
| }) | |
| // ---------------------------------------- | |
| // API Keys | |
| // ---------------------------------------- | |
| // Get api keys | |
| this.app.get('/api/v1/apikey', async (req: Request, res: Response) => { | |
| const keys = await getAPIKeys() | |
| return res.json(keys) | |
| }) | |
| // Add new api key | |
| this.app.post('/api/v1/apikey', async (req: Request, res: Response) => { | |
| const keys = await addAPIKey(req.body.keyName) | |
| return res.json(keys) | |
| }) | |
| // Update api key | |
| this.app.put('/api/v1/apikey/:id', async (req: Request, res: Response) => { | |
| const keys = await updateAPIKey(req.params.id, req.body.keyName) | |
| return res.json(keys) | |
| }) | |
| // Delete new api key | |
| this.app.delete('/api/v1/apikey/:id', async (req: Request, res: Response) => { | |
| const keys = await deleteAPIKey(req.params.id) | |
| return res.json(keys) | |
| }) | |
| // ---------------------------------------- | |
| // Serve UI static | |
| // ---------------------------------------- | |
| const packagePath = getNodeModulesPackagePath('flowise-ui') | |
| const uiBuildPath = path.join(packagePath, 'build') | |
| const uiHtmlPath = path.join(packagePath, 'build', 'index.html') | |
| this.app.use('/', express.static(uiBuildPath)) | |
| // All other requests not handled will return React app | |
| this.app.use((req, res) => { | |
| res.sendFile(uiHtmlPath) | |
| }) | |
| } | |
| /** | |
| * Validate API Key | |
| * @param {Request} req | |
| * @param {Response} res | |
| * @param {ChatFlow} chatflow | |
| */ | |
| async validateKey(req: Request, res: Response, chatflow: ChatFlow) { | |
| const chatFlowApiKeyId = chatflow.apikeyid | |
| const authorizationHeader = (req.headers['Authorization'] as string) ?? (req.headers['authorization'] as string) ?? '' | |
| if (chatFlowApiKeyId && !authorizationHeader) return res.status(401).send(`Unauthorized`) | |
| const suppliedKey = authorizationHeader.split(`Bearer `).pop() | |
| if (chatFlowApiKeyId && suppliedKey) { | |
| const keys = await getAPIKeys() | |
| const apiSecret = keys.find((key) => key.id === chatFlowApiKeyId)?.apiSecret | |
| if (!compareKeys(apiSecret, suppliedKey)) return res.status(401).send(`Unauthorized`) | |
| } | |
| } | |
| /** | |
| * Start child process | |
| * @param {ChatFlow} chatflow | |
| * @param {IncomingInput} incomingInput | |
| * @param {INodeData} endingNodeData | |
| */ | |
| async startChildProcess(chatflow: ChatFlow, chatId: string, incomingInput: IncomingInput, endingNodeData?: INodeData) { | |
| try { | |
| const controller = new AbortController() | |
| const { signal } = controller | |
| let childpath = path.join(__dirname, '..', 'dist', 'ChildProcess.js') | |
| if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts' | |
| const childProcess = fork(childpath, [], { signal }) | |
| const value = { | |
| chatflow, | |
| chatId, | |
| incomingInput, | |
| componentNodes: cloneDeep(this.nodesPool.componentNodes), | |
| endingNodeData | |
| } as IRunChatflowMessageValue | |
| childProcess.send({ key: 'start', value } as IChildProcessMessage) | |
| let childProcessTimeout: NodeJS.Timeout | |
| return new Promise((resolve, reject) => { | |
| childProcess.on('message', async (message: IChildProcessMessage) => { | |
| if (message.key === 'finish') { | |
| const { result, addToChatFlowPool } = message.value as ICommonObject | |
| if (childProcessTimeout) { | |
| clearTimeout(childProcessTimeout) | |
| } | |
| if (Object.keys(addToChatFlowPool).length) { | |
| const { chatflowid, nodeToExecuteData, startingNodes, overrideConfig } = addToChatFlowPool | |
| this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, overrideConfig) | |
| } | |
| resolve(result) | |
| } | |
| if (message.key === 'start') { | |
| if (process.env.EXECUTION_TIMEOUT) { | |
| childProcessTimeout = setTimeout(async () => { | |
| childProcess.kill() | |
| resolve(undefined) | |
| }, parseInt(process.env.EXECUTION_TIMEOUT, 10)) | |
| } | |
| } | |
| if (message.key === 'error') { | |
| let errMessage = message.value as string | |
| if (childProcessTimeout) { | |
| clearTimeout(childProcessTimeout) | |
| } | |
| reject(errMessage) | |
| } | |
| }) | |
| }) | |
| } catch (err) { | |
| console.error(err) | |
| } | |
| } | |
| /** | |
| * Process Prediction | |
| * @param {Request} req | |
| * @param {Response} res | |
| * @param {Server} socketIO | |
| * @param {boolean} isInternal | |
| */ | |
| async processPrediction(req: Request, res: Response, socketIO?: Server, isInternal = false) { | |
| try { | |
| const chatflowid = req.params.id | |
| let incomingInput: IncomingInput = req.body | |
| let nodeToExecuteData: INodeData | |
| const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ | |
| id: chatflowid | |
| }) | |
| if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`) | |
| const chatId = await getChatId(chatflow.id) | |
| if (!chatId) return res.status(500).send(`Chatflow ${chatflowid} first message not found`) | |
| if (!isInternal) { | |
| await this.validateKey(req, res, chatflow) | |
| } | |
| let isStreamValid = false | |
| const files = (req.files as any[]) || [] | |
| if (files.length) { | |
| const overrideConfig: ICommonObject = { ...req.body } | |
| for (const file of files) { | |
| const fileData = fs.readFileSync(file.path, { encoding: 'base64' }) | |
| const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}` | |
| const fileInputField = mapMimeTypeToInputField(file.mimetype) | |
| if (overrideConfig[fileInputField]) { | |
| overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String]) | |
| } else { | |
| overrideConfig[fileInputField] = JSON.stringify([dataBase64String]) | |
| } | |
| } | |
| incomingInput = { | |
| question: req.body.question ?? 'hello', | |
| overrideConfig, | |
| history: [] | |
| } | |
| } | |
| /* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met: | |
| * - Node Data already exists in pool | |
| * - Still in sync (i.e the flow has not been modified since) | |
| * - Existing overrideConfig and new overrideConfig are the same | |
| * - Flow doesn't start with nodes that depend on incomingInput.question | |
| ***/ | |
| const isRebuildNeeded = () => { | |
| return ( | |
| Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) && | |
| this.chatflowPool.activeChatflows[chatflowid].inSync && | |
| isSameOverrideConfig( | |
| isInternal, | |
| this.chatflowPool.activeChatflows[chatflowid].overrideConfig, | |
| incomingInput.overrideConfig | |
| ) && | |
| !isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes) | |
| ) | |
| } | |
| if (process.env.EXECUTION_MODE === 'child') { | |
| if (isRebuildNeeded()) { | |
| nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData | |
| try { | |
| const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData) | |
| return res.json(result) | |
| } catch (error) { | |
| return res.status(500).send(error) | |
| } | |
| } else { | |
| try { | |
| const result = await this.startChildProcess(chatflow, chatId, incomingInput) | |
| return res.json(result) | |
| } catch (error) { | |
| return res.status(500).send(error) | |
| } | |
| } | |
| } else { | |
| /*** Get chatflows and prepare data ***/ | |
| const flowData = chatflow.flowData | |
| const parsedFlowData: IReactFlowObject = JSON.parse(flowData) | |
| const nodes = parsedFlowData.nodes | |
| const edges = parsedFlowData.edges | |
| if (isRebuildNeeded()) { | |
| nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData | |
| isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData) | |
| } else { | |
| /*** Get Ending Node with Directed Graph ***/ | |
| const { graph, nodeDependencies } = constructGraphs(nodes, edges) | |
| const directedGraph = graph | |
| const endingNodeId = getEndingNode(nodeDependencies, directedGraph) | |
| if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`) | |
| const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data | |
| if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`) | |
| if ( | |
| endingNodeData.outputs && | |
| Object.keys(endingNodeData.outputs).length && | |
| !Object.values(endingNodeData.outputs).includes(endingNodeData.name) | |
| ) { | |
| return res | |
| .status(500) | |
| .send( | |
| `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` | |
| ) | |
| } | |
| isStreamValid = isFlowValidForStream(nodes, endingNodeData) | |
| /*** Get Starting Nodes with Non-Directed Graph ***/ | |
| const constructedObj = constructGraphs(nodes, edges, true) | |
| const nonDirectedGraph = constructedObj.graph | |
| const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) | |
| /*** BFS to traverse from Starting Nodes to Ending Node ***/ | |
| const reactFlowNodes = await buildLangchain( | |
| startingNodeIds, | |
| nodes, | |
| graph, | |
| depthQueue, | |
| this.nodesPool.componentNodes, | |
| incomingInput.question, | |
| chatId, | |
| incomingInput?.overrideConfig | |
| ) | |
| const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) | |
| if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`) | |
| const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) | |
| nodeToExecuteData = reactFlowNodeData | |
| const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) | |
| this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig) | |
| } | |
| const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string | |
| const nodeModule = await import(nodeInstanceFilePath) | |
| const nodeInstance = new nodeModule.nodeClass() | |
| isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData) | |
| const result = isStreamValid | |
| ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { | |
| chatHistory: incomingInput.history, | |
| socketIO, | |
| socketIOClientId: incomingInput.socketIOClientId | |
| }) | |
| : await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history }) | |
| return res.json(result) | |
| } | |
| } catch (e: any) { | |
| return res.status(500).send(e.message) | |
| } | |
| } | |
| async stopApp() { | |
| try { | |
| const removePromises: any[] = [] | |
| await Promise.all(removePromises) | |
| } catch (e) { | |
| console.error(`❌[server]: Flowise Server shut down error: ${e}`) | |
| } | |
| } | |
| } | |
| /** | |
| * Get first chat message id | |
| * @param {string} chatflowid | |
| * @returns {string} | |
| */ | |
| export async function getChatId(chatflowid: string) { | |
| // first chatmessage id as the unique chat id | |
| const firstChatMessage = await getDataSource() | |
| .getRepository(ChatMessage) | |
| .createQueryBuilder('cm') | |
| .select('cm.id') | |
| .where('chatflowid = :chatflowid', { chatflowid }) | |
| .orderBy('cm.createdDate', 'ASC') | |
| .getOne() | |
| return firstChatMessage ? firstChatMessage.id : '' | |
| } | |
| let serverApp: App | undefined | |
| export async function start(): Promise<void> { | |
| serverApp = new App() | |
| const port = parseInt(process.env.PORT || '', 10) || 3000 | |
| const server = http.createServer(serverApp.app) | |
| const io = new Server(server, { | |
| cors: { | |
| origin: '*' | |
| } | |
| }) | |
| await serverApp.initDatabase() | |
| await serverApp.config(io) | |
| server.listen(port, () => { | |
| console.info(`⚡️[server]: Flowise Server is listening at ${port}`) | |
| }) | |
| } | |
| export function getInstance(): App | undefined { | |
| return serverApp | |
| } | |