import OpenAI from 'openai'; import { Workflow, WorkflowNode, WorkflowEdge } from '../../shared/types/index.js'; import { AIService } from './ai.service.js'; import { SystemService } from '../lib/system.js'; import axios from 'axios'; import crypto from 'crypto'; const MAX_STEPS = 50; // 防止死循环 export class WorkflowService { /** * 基于用户描述生成工作流 (Generative Workflow) */ static async generateWorkflowFromPrompt(prompt: string): Promise { console.log(`[Workflow] 正在生成工作流: ${prompt}`); const systemPrompt = ` 你是一个工作流生成专家。请根据用户的自然语言描述,生成一个 JSON 格式的工作流定义。 可用节点类型 (type): 1. input: 起始节点 (无参数) 2. llm: AI 处理节点 (参数: prompt, systemPrompt) - prompt 中可以使用 {{nodeId.output}} 或 {{input}} 引用变量 3. search: 搜索节点 (参数: query) - query 中可以使用变量 4. database: 数据库查询节点 (参数: sql) - sql 中可以使用变量,仅限 SELECT 5. knowledge_base: 知识库检索 (参数: kbId - 暂时留空或随机) 6. http: HTTP 请求 (参数: url, method, body) 输出格式要求: - 必须是合法的 JSON 对象 - 包含 "name", "description", "nodes" (数组), "edges" (数组) - nodes 必须包含 id, type, data: { label, params }, position: {x, y}, style - edges 必须包含 id, source, target - 节点位置 (position) 请合理分布,x 轴间隔 250,y 轴间隔 150,避免重叠 - 节点样式 (style) 请使用以下预设 (必须包含): - input: { background: '#EBF5FF', border: '2px solid #3B82F6', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } - llm: { background: '#FDF2F2', border: '2px solid #EF4444', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } - search: { background: '#FDF2F8', border: '2px solid #EC4899', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } - database: { background: '#EEF2FF', border: '2px solid #6366F1', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } - 其他: { background: '#F9FAFB', border: '2px solid #9CA3AF', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' } 示例输入: "帮我查一下特斯拉的股价,然后写个简评" 示例输出: { "name": "特斯拉股价简评", "description": "自动搜索股价并生成评论", "nodes": [ { "id": "start", "type": "input", "data": { "label": "输入主题" }, "position": { "x": 50, "y": 200 }, "style": { "background": "#EBF5FF", ... } }, { "id": "search", "type": "search", "data": { "label": "搜股价", "params": { "query": "{{input}} 股价" } }, "position": { "x": 300, "y": 200 }, "style": { "background": "#FDF2F8", ... } }, { "id": "ai", "type": "llm", "data": { "label": "生成简评", "params": { "prompt": "基于搜索结果:{{search.output}},评价当前股价。" } }, "position": { "x": 550, "y": 200 }, "style": { "background": "#FDF2F2", ... } } ], "edges": [ { "id": "e1", "source": "start", "target": "search" }, { "id": "e2", "source": "search", "target": "ai" } ] } 请直接返回 JSON,不要包含 Markdown 代码块标记。 `; try { // 构造类似 Chat 的请求,直接利用 AIService 调用 OpenAI // 由于 AIService.chatWithKnowledge 是为 RAG 设计的,我们这里手动调用 openai 以获得更好的控制 const completion = await AIService.openai.chat.completions.create({ model: 'Qwen/Qwen2.5-7B-Instruct', messages: [ { role: 'system', content: systemPrompt }, { role: 'user', content: prompt } ], temperature: 0.7, }); let jsonStr = completion.choices[0]?.message?.content || ''; // 清理可能的 Markdown 标记 jsonStr = jsonStr.replace(/```json/g, '').replace(/```/g, '').trim(); const workflowData = JSON.parse(jsonStr); // 补全 ID 和 版本 return { id: `WF_GEN_${crypto.randomBytes(4).toString('hex')}`, version: 1, status: 'draft', ...workflowData }; } catch (e: any) { console.error('生成工作流失败:', e); throw new Error('无法根据描述生成工作流,请重试或详细描述步骤。'); } } /** * 解析并执行工作流 (支持循环与条件分支 - Agentic Workflow) */ static async execute(workflow: Workflow, initialInput: any, userId?: string) { console.log(`[Workflow] 开始执行 Agentic 工作流: ${workflow.name} (ID: ${workflow.id})`); const nodes = workflow.nodes; const edges = workflow.edges; // 1. 寻找起始节点 (入度为 0 的节点) // 如果是循环图,可能没有入度为 0 的节点,需要指定 Start 节点 (这里简化为:没有 source 的节点) const inDegrees = new Map(); edges.forEach(e => inDegrees.set(e.target, (inDegrees.get(e.target) || 0) + 1)); const startNodes = nodes.filter(n => (inDegrees.get(n.id) || 0) === 0); if (startNodes.length === 0 && nodes.length > 0) { // 如果全是环,默认选第一个 startNodes.push(nodes[0]); } // 2. 初始化执行队列与上下文 // queue item: { nodeId, inputFromPrev } const queue: { nodeId: string, input: any }[] = startNodes.map(n => ({ nodeId: n.id, input: initialInput })); const context: Record = { input: initialInput }; const executionLogs: any[] = []; let stepCount = 0; let finalOutput = initialInput; // 3. 图遍历执行 while (queue.length > 0) { if (stepCount++ > MAX_STEPS) { console.warn(`[Workflow] 达到最大执行步数 (${MAX_STEPS}),强制终止`); break; } const { nodeId, input } = queue.shift()!; const node = nodes.find(n => n.id === nodeId); if (!node) continue; const startTime = Date.now(); try { // 解析参数 const params = WorkflowService.resolveVariables(node.data?.params || {}, context); // 执行节点 const output = await WorkflowService.executeNode(node, input, userId, params); // 更新上下文 context[node.id] = { output }; finalOutput = output; // 记录日志 executionLogs.push({ nodeId: node.id, type: node.type, status: 'SUCCESS', duration: Date.now() - startTime, output: typeof output === 'string' ? output.slice(0, 100) : 'Object' }); // 寻找后续节点 (处理条件分支) const outgoingEdges = edges.filter(e => e.source === nodeId); for (const edge of outgoingEdges) { let shouldTraverse = true; // 检查边上的条件 if (edge.condition) { try { // 允许在条件中使用 output 和 context // 简单沙箱: output 指代当前节点输出 const checkFn = new Function('output', 'context', `return ${edge.condition}`); shouldTraverse = checkFn(output, context); } catch (e) { console.warn(`[Workflow] 边条件计算失败: ${edge.condition}`, e); shouldTraverse = false; } } if (shouldTraverse) { queue.push({ nodeId: edge.target, input: output }); } } } catch (err: any) { console.error(`[Workflow] 节点 ${node.id} 执行失败:`, err); executionLogs.push({ nodeId: node.id, type: node.type, status: 'FAILED', error: err.message }); // 遇到错误是否终止?Agentic Flow 中可能需要错误处理分支,这里简化为终止 throw err; } } if (userId) { SystemService.logAudit(userId, 'WORKFLOW_EXECUTE', 'SUCCESS', { workflowId: workflow.id, steps: stepCount, logs: executionLogs }); } return { result: finalOutput, logs: executionLogs }; } // ... resolveVariables 和 executeNode 保持大部分不变,微调 executeNode private static resolveVariables(params: any, context: Record): any { if (typeof params === 'string') { return params.replace(/\{\{(.*?)\}\}/g, (_, path) => { const parts = path.trim().split('.'); let val: any = context; for (const part of parts) { val = val?.[part]; } if (val === undefined) return `{{${path}}}`; return typeof val === 'object' ? JSON.stringify(val) : String(val); }); } else if (Array.isArray(params)) { return params.map(p => WorkflowService.resolveVariables(p, context)); } else if (params !== null && typeof params === 'object') { const result: any = {}; for (const key in params) { result[key] = WorkflowService.resolveVariables(params[key], context); } return result; } return params; } private static async executeNode(node: WorkflowNode, input: any, userId?: string, params?: any) { console.log(`[Workflow] 执行节点: ${node.type} (${node.id})`); let retries = 3; let lastError; while (retries > 0) { try { switch (node.type) { case 'llm': { const query = params?.prompt || (typeof input === 'string' ? input : JSON.stringify(input)); const systemPrompt = params?.systemPrompt || `你是一个工作流中的 AI 节点。请基于以下输入进行处理并返回简洁结果:\n\n输入内容:${query}`; // 调用 AI 服务 (支持 RAG) const result = await AIService.chatWithKnowledge(userId || 'system', systemPrompt); // 收集流式输出 let assistantResponse = ''; for await (const chunk of result.stream) { assistantResponse += chunk.choices[0]?.delta?.content || ''; } return assistantResponse; } case 'http': { const { url, method = 'GET', headers = {}, body = {} } = params || {}; if (!url) throw new Error('HTTP 节点缺失 URL 参数'); const response = await axios({ url, method, headers: { 'Content-Type': 'application/json', ...headers }, data: method !== 'GET' ? body : undefined, timeout: 10000 }); return response.data; } case 'search': { const query = params?.query || (typeof input === 'string' ? input : JSON.stringify(input)); console.log(`[Workflow] 执行搜索: ${query}`); // 使用 AIService.scrapeUrl 结合搜索引擎 (这里用 DuckDuckGo HTML 模拟,或者直接抓取相关页面) // 为了演示效果,我们这里调用一个真实的模拟搜索服务,或者直接用 AIService.chatWithKnowledge 配合联网能力 // 简单起见,这里我们使用 DuckDuckGo Lite 爬虫模拟 try { // 构造 DuckDuckGo Lite 搜索 URL const searchUrl = `https://html.duckduckgo.com/html/?q=${encodeURIComponent(query)}`; const searchResultHtml = await AIService.scrapeUrl(searchUrl); // 简单提取搜索结果 (需要更复杂的解析逻辑,这里简化) // 截取前 1000 个字符作为搜索摘要 const summary = searchResultHtml.slice(0, 1500).replace(/\s+/g, ' '); return `[搜索结果: ${query}]\n${summary}...`; } catch (e) { console.warn('[Workflow] 搜索失败,返回模拟数据'); return `[搜索结果] 关于 "${query}" 的相关信息:\n1. ${query} 是当前热门话题...\n2. 最新动态显示 ${query} 正在快速发展...\n(搜索服务暂时不可用,这是模拟数据)`; } } case 'database': { const sql = params?.sql; if (!sql) throw new Error('Database 节点缺失 SQL 参数'); console.log(`[Workflow] 执行 SQL: ${sql}`); // 安全检查:只允许 SELECT if (!sql.trim().toUpperCase().startsWith('SELECT')) { throw new Error('出于安全考虑,工作流仅允许执行 SELECT 查询'); } // 执行查询 (使用 SQLite Fallback 或 Postgres) // 注意:这里需要根据当前环境选择 db try { // 简单模拟:如果 sql 包含 'users',返回模拟用户数据 if (sql.includes('users')) { return [ { id: 1, name: 'Alice', role: 'admin', active: true }, { id: 2, name: 'Bob', role: 'user', active: true }, { id: 3, name: 'Charlie', role: 'user', active: false } ]; } if (sql.includes('orders')) { return [ { id: 101, amount: 99.9, status: 'paid' }, { id: 102, amount: 19.9, status: 'pending' } ]; } return [{ result: 'No mock data for this query' }]; } catch (e: any) { throw new Error(`数据库查询失败: ${e.message}`); } } case 'condition': { // Condition 节点本身作为逻辑判断,可以返回 boolean 或通过 output 传递给边的 condition const { expression } = params || {}; if (!expression) return input; const isMatch = new Function('input', `return ${expression}`)(input); return isMatch; // 返回 true/false } case 'knowledge_base': { if (!userId) throw new Error('未授权,无法访问知识库'); const query = typeof input === 'string' ? input : JSON.stringify(input); const kbId = params?.kbId; const result = await AIService.chatWithKnowledge(userId, query, kbId); let assistantResponse = ''; for await (const chunk of result.stream) { assistantResponse += chunk.choices[0]?.delta?.content || ''; } return assistantResponse; } default: return input; } } catch (err: any) { console.warn(`[Workflow] 节点 ${node.id} 执行出错 (剩余重试 ${retries - 1}):`, err.message); lastError = err; retries--; if (retries === 0) throw lastError; await new Promise(resolve => setTimeout(resolve, 1000)); // 等待 1 秒后重试 } } } }