codex-ai-platform / api /services /workflow.service.ts
3v324v23's picture
fixbug:OpenAI
1c098c5
import OpenAI from 'openai';
import { Workflow, WorkflowNode, WorkflowEdge } from '../../shared/types/index.js';
import { AIService } from './ai.service.js';
import { SystemService } from '../lib/system.js';
import axios from 'axios';
import crypto from 'crypto';
const MAX_STEPS = 50; // 防止死循环
export class WorkflowService {
/**
* 基于用户描述生成工作流 (Generative Workflow)
*/
static async generateWorkflowFromPrompt(prompt: string): Promise<Workflow> {
console.log(`[Workflow] 正在生成工作流: ${prompt}`);
const systemPrompt = `
你是一个工作流生成专家。请根据用户的自然语言描述,生成一个 JSON 格式的工作流定义。
可用节点类型 (type):
1. input: 起始节点 (无参数)
2. llm: AI 处理节点 (参数: prompt, systemPrompt)
- prompt 中可以使用 {{nodeId.output}} 或 {{input}} 引用变量
3. search: 搜索节点 (参数: query)
- query 中可以使用变量
4. database: 数据库查询节点 (参数: sql)
- sql 中可以使用变量,仅限 SELECT
5. knowledge_base: 知识库检索 (参数: kbId - 暂时留空或随机)
6. http: HTTP 请求 (参数: url, method, body)
输出格式要求:
- 必须是合法的 JSON 对象
- 包含 "name", "description", "nodes" (数组), "edges" (数组)
- nodes 必须包含 id, type, data: { label, params }, position: {x, y}, style
- edges 必须包含 id, source, target
- 节点位置 (position) 请合理分布,x 轴间隔 250,y 轴间隔 150,避免重叠
- 节点样式 (style) 请使用以下预设 (必须包含):
- input: { background: '#EBF5FF', border: '2px solid #3B82F6', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' }
- llm: { background: '#FDF2F2', border: '2px solid #EF4444', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' }
- search: { background: '#FDF2F8', border: '2px solid #EC4899', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' }
- database: { background: '#EEF2FF', border: '2px solid #6366F1', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' }
- 其他: { background: '#F9FAFB', border: '2px solid #9CA3AF', borderRadius: '12px', padding: '12px', width: '160px', textAlign: 'center', fontWeight: '700' }
示例输入: "帮我查一下特斯拉的股价,然后写个简评"
示例输出:
{
"name": "特斯拉股价简评",
"description": "自动搜索股价并生成评论",
"nodes": [
{ "id": "start", "type": "input", "data": { "label": "输入主题" }, "position": { "x": 50, "y": 200 }, "style": { "background": "#EBF5FF", ... } },
{ "id": "search", "type": "search", "data": { "label": "搜股价", "params": { "query": "{{input}} 股价" } }, "position": { "x": 300, "y": 200 }, "style": { "background": "#FDF2F8", ... } },
{ "id": "ai", "type": "llm", "data": { "label": "生成简评", "params": { "prompt": "基于搜索结果:{{search.output}},评价当前股价。" } }, "position": { "x": 550, "y": 200 }, "style": { "background": "#FDF2F2", ... } }
],
"edges": [
{ "id": "e1", "source": "start", "target": "search" },
{ "id": "e2", "source": "search", "target": "ai" }
]
}
请直接返回 JSON,不要包含 Markdown 代码块标记。
`;
try {
// 构造类似 Chat 的请求,直接利用 AIService 调用 OpenAI
// 由于 AIService.chatWithKnowledge 是为 RAG 设计的,我们这里手动调用 openai 以获得更好的控制
const completion = await AIService.openai.chat.completions.create({
model: 'Qwen/Qwen2.5-7B-Instruct',
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: prompt }
],
temperature: 0.7,
});
let jsonStr = completion.choices[0]?.message?.content || '';
// 清理可能的 Markdown 标记
jsonStr = jsonStr.replace(/```json/g, '').replace(/```/g, '').trim();
const workflowData = JSON.parse(jsonStr);
// 补全 ID 和 版本
return {
id: `WF_GEN_${crypto.randomBytes(4).toString('hex')}`,
version: 1,
status: 'draft',
...workflowData
};
} catch (e: any) {
console.error('生成工作流失败:', e);
throw new Error('无法根据描述生成工作流,请重试或详细描述步骤。');
}
}
/**
* 解析并执行工作流 (支持循环与条件分支 - Agentic Workflow)
*/
static async execute(workflow: Workflow, initialInput: any, userId?: string) {
console.log(`[Workflow] 开始执行 Agentic 工作流: ${workflow.name} (ID: ${workflow.id})`);
const nodes = workflow.nodes;
const edges = workflow.edges;
// 1. 寻找起始节点 (入度为 0 的节点)
// 如果是循环图,可能没有入度为 0 的节点,需要指定 Start 节点 (这里简化为:没有 source 的节点)
const inDegrees = new Map<string, number>();
edges.forEach(e => inDegrees.set(e.target, (inDegrees.get(e.target) || 0) + 1));
const startNodes = nodes.filter(n => (inDegrees.get(n.id) || 0) === 0);
if (startNodes.length === 0 && nodes.length > 0) {
// 如果全是环,默认选第一个
startNodes.push(nodes[0]);
}
// 2. 初始化执行队列与上下文
// queue item: { nodeId, inputFromPrev }
const queue: { nodeId: string, input: any }[] = startNodes.map(n => ({ nodeId: n.id, input: initialInput }));
const context: Record<string, any> = { input: initialInput };
const executionLogs: any[] = [];
let stepCount = 0;
let finalOutput = initialInput;
// 3. 图遍历执行
while (queue.length > 0) {
if (stepCount++ > MAX_STEPS) {
console.warn(`[Workflow] 达到最大执行步数 (${MAX_STEPS}),强制终止`);
break;
}
const { nodeId, input } = queue.shift()!;
const node = nodes.find(n => n.id === nodeId);
if (!node) continue;
const startTime = Date.now();
try {
// 解析参数
const params = WorkflowService.resolveVariables(node.data?.params || {}, context);
// 执行节点
const output = await WorkflowService.executeNode(node, input, userId, params);
// 更新上下文
context[node.id] = { output };
finalOutput = output;
// 记录日志
executionLogs.push({
nodeId: node.id,
type: node.type,
status: 'SUCCESS',
duration: Date.now() - startTime,
output: typeof output === 'string' ? output.slice(0, 100) : 'Object'
});
// 寻找后续节点 (处理条件分支)
const outgoingEdges = edges.filter(e => e.source === nodeId);
for (const edge of outgoingEdges) {
let shouldTraverse = true;
// 检查边上的条件
if (edge.condition) {
try {
// 允许在条件中使用 output 和 context
// 简单沙箱: output 指代当前节点输出
const checkFn = new Function('output', 'context', `return ${edge.condition}`);
shouldTraverse = checkFn(output, context);
} catch (e) {
console.warn(`[Workflow] 边条件计算失败: ${edge.condition}`, e);
shouldTraverse = false;
}
}
if (shouldTraverse) {
queue.push({ nodeId: edge.target, input: output });
}
}
} catch (err: any) {
console.error(`[Workflow] 节点 ${node.id} 执行失败:`, err);
executionLogs.push({
nodeId: node.id,
type: node.type,
status: 'FAILED',
error: err.message
});
// 遇到错误是否终止?Agentic Flow 中可能需要错误处理分支,这里简化为终止
throw err;
}
}
if (userId) {
SystemService.logAudit(userId, 'WORKFLOW_EXECUTE', 'SUCCESS', {
workflowId: workflow.id,
steps: stepCount,
logs: executionLogs
});
}
return {
result: finalOutput,
logs: executionLogs
};
}
// ... resolveVariables 和 executeNode 保持大部分不变,微调 executeNode
private static resolveVariables(params: any, context: Record<string, any>): any {
if (typeof params === 'string') {
return params.replace(/\{\{(.*?)\}\}/g, (_, path) => {
const parts = path.trim().split('.');
let val: any = context;
for (const part of parts) {
val = val?.[part];
}
if (val === undefined) return `{{${path}}}`;
return typeof val === 'object' ? JSON.stringify(val) : String(val);
});
} else if (Array.isArray(params)) {
return params.map(p => WorkflowService.resolveVariables(p, context));
} else if (params !== null && typeof params === 'object') {
const result: any = {};
for (const key in params) {
result[key] = WorkflowService.resolveVariables(params[key], context);
}
return result;
}
return params;
}
private static async executeNode(node: WorkflowNode, input: any, userId?: string, params?: any) {
console.log(`[Workflow] 执行节点: ${node.type} (${node.id})`);
let retries = 3;
let lastError;
while (retries > 0) {
try {
switch (node.type) {
case 'llm': {
const query = params?.prompt || (typeof input === 'string' ? input : JSON.stringify(input));
const systemPrompt = params?.systemPrompt || `你是一个工作流中的 AI 节点。请基于以下输入进行处理并返回简洁结果:\n\n输入内容:${query}`;
// 调用 AI 服务 (支持 RAG)
const result = await AIService.chatWithKnowledge(userId || 'system', systemPrompt);
// 收集流式输出
let assistantResponse = '';
for await (const chunk of result.stream) {
assistantResponse += chunk.choices[0]?.delta?.content || '';
}
return assistantResponse;
}
case 'http': {
const { url, method = 'GET', headers = {}, body = {} } = params || {};
if (!url) throw new Error('HTTP 节点缺失 URL 参数');
const response = await axios({
url,
method,
headers: { 'Content-Type': 'application/json', ...headers },
data: method !== 'GET' ? body : undefined,
timeout: 10000
});
return response.data;
}
case 'search': {
const query = params?.query || (typeof input === 'string' ? input : JSON.stringify(input));
console.log(`[Workflow] 执行搜索: ${query}`);
// 使用 AIService.scrapeUrl 结合搜索引擎 (这里用 DuckDuckGo HTML 模拟,或者直接抓取相关页面)
// 为了演示效果,我们这里调用一个真实的模拟搜索服务,或者直接用 AIService.chatWithKnowledge 配合联网能力
// 简单起见,这里我们使用 DuckDuckGo Lite 爬虫模拟
try {
// 构造 DuckDuckGo Lite 搜索 URL
const searchUrl = `https://html.duckduckgo.com/html/?q=${encodeURIComponent(query)}`;
const searchResultHtml = await AIService.scrapeUrl(searchUrl);
// 简单提取搜索结果 (需要更复杂的解析逻辑,这里简化)
// 截取前 1000 个字符作为搜索摘要
const summary = searchResultHtml.slice(0, 1500).replace(/\s+/g, ' ');
return `[搜索结果: ${query}]\n${summary}...`;
} catch (e) {
console.warn('[Workflow] 搜索失败,返回模拟数据');
return `[搜索结果] 关于 "${query}" 的相关信息:\n1. ${query} 是当前热门话题...\n2. 最新动态显示 ${query} 正在快速发展...\n(搜索服务暂时不可用,这是模拟数据)`;
}
}
case 'database': {
const sql = params?.sql;
if (!sql) throw new Error('Database 节点缺失 SQL 参数');
console.log(`[Workflow] 执行 SQL: ${sql}`);
// 安全检查:只允许 SELECT
if (!sql.trim().toUpperCase().startsWith('SELECT')) {
throw new Error('出于安全考虑,工作流仅允许执行 SELECT 查询');
}
// 执行查询 (使用 SQLite Fallback 或 Postgres)
// 注意:这里需要根据当前环境选择 db
try {
// 简单模拟:如果 sql 包含 'users',返回模拟用户数据
if (sql.includes('users')) {
return [
{ id: 1, name: 'Alice', role: 'admin', active: true },
{ id: 2, name: 'Bob', role: 'user', active: true },
{ id: 3, name: 'Charlie', role: 'user', active: false }
];
}
if (sql.includes('orders')) {
return [
{ id: 101, amount: 99.9, status: 'paid' },
{ id: 102, amount: 19.9, status: 'pending' }
];
}
return [{ result: 'No mock data for this query' }];
} catch (e: any) {
throw new Error(`数据库查询失败: ${e.message}`);
}
}
case 'condition': {
// Condition 节点本身作为逻辑判断,可以返回 boolean 或通过 output 传递给边的 condition
const { expression } = params || {};
if (!expression) return input;
const isMatch = new Function('input', `return ${expression}`)(input);
return isMatch; // 返回 true/false
}
case 'knowledge_base': {
if (!userId) throw new Error('未授权,无法访问知识库');
const query = typeof input === 'string' ? input : JSON.stringify(input);
const kbId = params?.kbId;
const result = await AIService.chatWithKnowledge(userId, query, kbId);
let assistantResponse = '';
for await (const chunk of result.stream) {
assistantResponse += chunk.choices[0]?.delta?.content || '';
}
return assistantResponse;
}
default:
return input;
}
} catch (err: any) {
console.warn(`[Workflow] 节点 ${node.id} 执行出错 (剩余重试 ${retries - 1}):`, err.message);
lastError = err;
retries--;
if (retries === 0) throw lastError;
await new Promise(resolve => setTimeout(resolve, 1000)); // 等待 1 秒后重试
}
}
}
}