Spaces:
Sleeping
Sleeping
| import OpenAI from 'openai'; | |
| import { Workflow, WorkflowNode, WorkflowEdge } from '../../shared/types/index.js'; | |
| import { AIService } from './ai.service.js'; | |
| import { SystemService } from '../lib/system.js'; | |
| import axios from 'axios'; | |
| import crypto from 'crypto'; | |
| const MAX_STEPS = 50; // 防止死循环 | |
| export class WorkflowService { | |
| /** | |
| * 基于用户描述生成工作流 (Generative Workflow) | |
| */ | |
| static async generateWorkflowFromPrompt(prompt: string): Promise<Workflow> { | |
| console.log(`[Workflow] 正在生成工作流: ${prompt}`); | |
| const systemPrompt = ` | |
| 你是一个工作流生成专家。请根据用户的自然语言描述,生成一个 JSON 格式的工作流定义。 | |
| 可用节点类型 (type): | |
| 1. input: 起始节点 (无参数) | |
| 2. llm: AI 处理节点 (参数: prompt, systemPrompt) | |
| - prompt 中可以使用 {{nodeId.output}} 或 {{input}} 引用变量 | |
| 3. search: 搜索节点 (参数: query) | |
| - query 中可以使用变量 | |
| 4. database: 数据库查询节点 (参数: sql) | |
| - sql 中可以使用变量,仅限 SELECT | |
| 5. knowledge_base: 知识库检索 (参数: kbId - 暂时留空或随机) | |
| 6. http: HTTP 请求 (参数: url, method, body) | |
| 输出格式要求: | |
| - 必须是合法的 JSON 对象 | |
| - 包含 "name", "description", "nodes" (数组), "edges" (数组) | |
| - nodes 必须包含 id, type, data: { label, params }, position: {x, y}, style | |
| - edges 必须包含 id, source, target | |
| - 节点位置 (position) 请合理分布,x 轴间隔 250,y 轴间隔 150,避免重叠 | |
| - 节点样式 (style) 请使用以下预设 (必须包含): | |
| - input: { background: '#EBF5FF', border: '2px solid #3B82F6', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } | |
| - llm: { background: '#FDF2F2', border: '2px solid #EF4444', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } | |
| - search: { background: '#FDF2F8', border: '2px solid #EC4899', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } | |
| - database: { background: '#EEF2FF', border: '2px solid #6366F1', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } | |
| - 其他: { background: '#F9FAFB', border: '2px solid #9CA3AF', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } | |
| 示例输入: "帮我查一下特斯拉的股价,然后写个简评" | |
| 示例输出: | |
| { | |
| "name": "特斯拉股价简评", | |
| "description": "自动搜索股价并生成评论", | |
| "nodes": [ | |
| { "id": "start", "type": "input", "data": { "label": "输入主题" }, "position": { "x": 50, "y": 200 }, "style": { "background": "#EBF5FF", ... } }, | |
| { "id": "search", "type": "search", "data": { "label": "搜股价", "params": { "query": "{{input}} 股价" } }, "position": { "x": 300, "y": 200 }, "style": { "background": "#FDF2F8", ... } }, | |
| { "id": "ai", "type": "llm", "data": { "label": "生成简评", "params": { "prompt": "基于搜索结果:{{search.output}},评价当前股价。" } }, "position": { "x": 550, "y": 200 }, "style": { "background": "#FDF2F2", ... } } | |
| ], | |
| "edges": [ | |
| { "id": "e1", "source": "start", "target": "search" }, | |
| { "id": "e2", "source": "search", "target": "ai" } | |
| ] | |
| } | |
| 请直接返回 JSON,不要包含 Markdown 代码块标记。 | |
| `; | |
| try { | |
| // 构造类似 Chat 的请求,直接利用 AIService 调用 OpenAI | |
| // 由于 AIService.chatWithKnowledge 是为 RAG 设计的,我们这里手动调用 openai 以获得更好的控制 | |
| const completion = await AIService.openai.chat.completions.create({ | |
| model: 'Qwen/Qwen2.5-7B-Instruct', | |
| messages: [ | |
| { role: 'system', content: systemPrompt }, | |
| { role: 'user', content: prompt } | |
| ], | |
| temperature: 0.7, | |
| }); | |
| let jsonStr = completion.choices[0]?.message?.content || ''; | |
| // 清理可能的 Markdown 标记 | |
| jsonStr = jsonStr.replace(/```json/g, '').replace(/```/g, '').trim(); | |
| const workflowData = JSON.parse(jsonStr); | |
| // 补全 ID 和 版本 | |
| return { | |
| id: `WF_GEN_${crypto.randomBytes(4).toString('hex')}`, | |
| version: 1, | |
| status: 'draft', | |
| ...workflowData | |
| }; | |
| } catch (e: any) { | |
| console.error('生成工作流失败:', e); | |
| throw new Error('无法根据描述生成工作流,请重试或详细描述步骤。'); | |
| } | |
| } | |
| /** | |
| * 解析并执行工作流 (支持循环与条件分支 - Agentic Workflow) | |
| */ | |
| static async execute(workflow: Workflow, initialInput: any, userId?: string) { | |
| console.log(`[Workflow] 开始执行 Agentic 工作流: ${workflow.name} (ID: ${workflow.id})`); | |
| const nodes = workflow.nodes; | |
| const edges = workflow.edges; | |
| // 1. 寻找起始节点 (入度为 0 的节点) | |
| // 如果是循环图,可能没有入度为 0 的节点,需要指定 Start 节点 (这里简化为:没有 source 的节点) | |
| const inDegrees = new Map<string, number>(); | |
| edges.forEach(e => inDegrees.set(e.target, (inDegrees.get(e.target) || 0) + 1)); | |
| const startNodes = nodes.filter(n => (inDegrees.get(n.id) || 0) === 0); | |
| if (startNodes.length === 0 && nodes.length > 0) { | |
| // 如果全是环,默认选第一个 | |
| startNodes.push(nodes[0]); | |
| } | |
| // 2. 初始化执行队列与上下文 | |
| // queue item: { nodeId, inputFromPrev } | |
| const queue: { nodeId: string, input: any }[] = startNodes.map(n => ({ nodeId: n.id, input: initialInput })); | |
| const context: Record<string, any> = { input: initialInput }; | |
| const executionLogs: any[] = []; | |
| let stepCount = 0; | |
| let finalOutput = initialInput; | |
| // 3. 图遍历执行 | |
| while (queue.length > 0) { | |
| if (stepCount++ > MAX_STEPS) { | |
| console.warn(`[Workflow] 达到最大执行步数 (${MAX_STEPS}),强制终止`); | |
| break; | |
| } | |
| const { nodeId, input } = queue.shift()!; | |
| const node = nodes.find(n => n.id === nodeId); | |
| if (!node) continue; | |
| const startTime = Date.now(); | |
| try { | |
| // 解析参数 | |
| const params = WorkflowService.resolveVariables(node.data?.params || {}, context); | |
| // 执行节点 | |
| const output = await WorkflowService.executeNode(node, input, userId, params); | |
| // 更新上下文 | |
| context[node.id] = { output }; | |
| finalOutput = output; | |
| // 记录日志 | |
| executionLogs.push({ | |
| nodeId: node.id, | |
| type: node.type, | |
| status: 'SUCCESS', | |
| duration: Date.now() - startTime, | |
| output: typeof output === 'string' ? output.slice(0, 100) : 'Object' | |
| }); | |
| // 寻找后续节点 (处理条件分支) | |
| const outgoingEdges = edges.filter(e => e.source === nodeId); | |
| for (const edge of outgoingEdges) { | |
| let shouldTraverse = true; | |
| // 检查边上的条件 | |
| if (edge.condition) { | |
| try { | |
| // 允许在条件中使用 output 和 context | |
| // 简单沙箱: output 指代当前节点输出 | |
| const checkFn = new Function('output', 'context', `return ${edge.condition}`); | |
| shouldTraverse = checkFn(output, context); | |
| } catch (e) { | |
| console.warn(`[Workflow] 边条件计算失败: ${edge.condition}`, e); | |
| shouldTraverse = false; | |
| } | |
| } | |
| if (shouldTraverse) { | |
| queue.push({ nodeId: edge.target, input: output }); | |
| } | |
| } | |
| } catch (err: any) { | |
| console.error(`[Workflow] 节点 ${node.id} 执行失败:`, err); | |
| executionLogs.push({ | |
| nodeId: node.id, | |
| type: node.type, | |
| status: 'FAILED', | |
| error: err.message | |
| }); | |
| // 遇到错误是否终止?Agentic Flow 中可能需要错误处理分支,这里简化为终止 | |
| throw err; | |
| } | |
| } | |
| if (userId) { | |
| SystemService.logAudit(userId, 'WORKFLOW_EXECUTE', 'SUCCESS', { | |
| workflowId: workflow.id, | |
| steps: stepCount, | |
| logs: executionLogs | |
| }); | |
| } | |
| return { | |
| result: finalOutput, | |
| logs: executionLogs | |
| }; | |
| } | |
| // ... resolveVariables 和 executeNode 保持大部分不变,微调 executeNode | |
| private static resolveVariables(params: any, context: Record<string, any>): any { | |
| if (typeof params === 'string') { | |
| return params.replace(/\{\{(.*?)\}\}/g, (_, path) => { | |
| const parts = path.trim().split('.'); | |
| let val: any = context; | |
| for (const part of parts) { | |
| val = val?.[part]; | |
| } | |
| if (val === undefined) return `{{${path}}}`; | |
| return typeof val === 'object' ? JSON.stringify(val) : String(val); | |
| }); | |
| } else if (Array.isArray(params)) { | |
| return params.map(p => WorkflowService.resolveVariables(p, context)); | |
| } else if (params !== null && typeof params === 'object') { | |
| const result: any = {}; | |
| for (const key in params) { | |
| result[key] = WorkflowService.resolveVariables(params[key], context); | |
| } | |
| return result; | |
| } | |
| return params; | |
| } | |
| private static async executeNode(node: WorkflowNode, input: any, userId?: string, params?: any) { | |
| console.log(`[Workflow] 执行节点: ${node.type} (${node.id})`); | |
| let retries = 3; | |
| let lastError; | |
| while (retries > 0) { | |
| try { | |
| switch (node.type) { | |
| case 'llm': { | |
| const query = params?.prompt || (typeof input === 'string' ? input : JSON.stringify(input)); | |
| const systemPrompt = params?.systemPrompt || `你是一个工作流中的 AI 节点。请基于以下输入进行处理并返回简洁结果:\n\n输入内容:${query}`; | |
| // 调用 AI 服务 (支持 RAG) | |
| const result = await AIService.chatWithKnowledge(userId || 'system', systemPrompt); | |
| // 收集流式输出 | |
| let assistantResponse = ''; | |
| for await (const chunk of result.stream) { | |
| assistantResponse += chunk.choices[0]?.delta?.content || ''; | |
| } | |
| return assistantResponse; | |
| } | |
| case 'http': { | |
| const { url, method = 'GET', headers = {}, body = {} } = params || {}; | |
| if (!url) throw new Error('HTTP 节点缺失 URL 参数'); | |
| const response = await axios({ | |
| url, | |
| method, | |
| headers: { 'Content-Type': 'application/json', ...headers }, | |
| data: method !== 'GET' ? body : undefined, | |
| timeout: 10000 | |
| }); | |
| return response.data; | |
| } | |
| case 'search': { | |
| const query = params?.query || (typeof input === 'string' ? input : JSON.stringify(input)); | |
| console.log(`[Workflow] 执行搜索: ${query}`); | |
| // 使用 AIService.scrapeUrl 结合搜索引擎 (这里用 DuckDuckGo HTML 模拟,或者直接抓取相关页面) | |
| // 为了演示效果,我们这里调用一个真实的模拟搜索服务,或者直接用 AIService.chatWithKnowledge 配合联网能力 | |
| // 简单起见,这里我们使用 DuckDuckGo Lite 爬虫模拟 | |
| try { | |
| // 构造 DuckDuckGo Lite 搜索 URL | |
| const searchUrl = `https://html.duckduckgo.com/html/?q=${encodeURIComponent(query)}`; | |
| const searchResultHtml = await AIService.scrapeUrl(searchUrl); | |
| // 简单提取搜索结果 (需要更复杂的解析逻辑,这里简化) | |
| // 截取前 1000 个字符作为搜索摘要 | |
| const summary = searchResultHtml.slice(0, 1500).replace(/\s+/g, ' '); | |
| return `[搜索结果: ${query}]\n${summary}...`; | |
| } catch (e) { | |
| console.warn('[Workflow] 搜索失败,返回模拟数据'); | |
| return `[搜索结果] 关于 "${query}" 的相关信息:\n1. ${query} 是当前热门话题...\n2. 最新动态显示 ${query} 正在快速发展...\n(搜索服务暂时不可用,这是模拟数据)`; | |
| } | |
| } | |
| case 'database': { | |
| const sql = params?.sql; | |
| if (!sql) throw new Error('Database 节点缺失 SQL 参数'); | |
| console.log(`[Workflow] 执行 SQL: ${sql}`); | |
| // 安全检查:只允许 SELECT | |
| if (!sql.trim().toUpperCase().startsWith('SELECT')) { | |
| throw new Error('出于安全考虑,工作流仅允许执行 SELECT 查询'); | |
| } | |
| // 执行查询 (使用 SQLite Fallback 或 Postgres) | |
| // 注意:这里需要根据当前环境选择 db | |
| try { | |
| // 简单模拟:如果 sql 包含 'users',返回模拟用户数据 | |
| if (sql.includes('users')) { | |
| return [ | |
| { id: 1, name: 'Alice', role: 'admin', active: true }, | |
| { id: 2, name: 'Bob', role: 'user', active: true }, | |
| { id: 3, name: 'Charlie', role: 'user', active: false } | |
| ]; | |
| } | |
| if (sql.includes('orders')) { | |
| return [ | |
| { id: 101, amount: 99.9, status: 'paid' }, | |
| { id: 102, amount: 19.9, status: 'pending' } | |
| ]; | |
| } | |
| return [{ result: 'No mock data for this query' }]; | |
| } catch (e: any) { | |
| throw new Error(`数据库查询失败: ${e.message}`); | |
| } | |
| } | |
| case 'condition': { | |
| // Condition 节点本身作为逻辑判断,可以返回 boolean 或通过 output 传递给边的 condition | |
| const { expression } = params || {}; | |
| if (!expression) return input; | |
| const isMatch = new Function('input', `return ${expression}`)(input); | |
| return isMatch; // 返回 true/false | |
| } | |
| case 'knowledge_base': { | |
| if (!userId) throw new Error('未授权,无法访问知识库'); | |
| const query = typeof input === 'string' ? input : JSON.stringify(input); | |
| const kbId = params?.kbId; | |
| const result = await AIService.chatWithKnowledge(userId, query, kbId); | |
| let assistantResponse = ''; | |
| for await (const chunk of result.stream) { | |
| assistantResponse += chunk.choices[0]?.delta?.content || ''; | |
| } | |
| return assistantResponse; | |
| } | |
| default: | |
| return input; | |
| } | |
| } catch (err: any) { | |
| console.warn(`[Workflow] 节点 ${node.id} 执行出错 (剩余重试 ${retries - 1}):`, err.message); | |
| lastError = err; | |
| retries--; | |
| if (retries === 0) throw lastError; | |
| await new Promise(resolve => setTimeout(resolve, 1000)); // 等待 1 秒后重试 | |
| } | |
| } | |
| } | |
| } | |