|
|
const { FLOW_TYPES } = require("./flowTypes"); |
|
|
const executeApiCall = require("./executors/api-call"); |
|
|
const executeLLMInstruction = require("./executors/llm-instruction"); |
|
|
const executeWebScraping = require("./executors/web-scraping"); |
|
|
const { Telemetry } = require("../../models/telemetry"); |
|
|
const { safeJsonParse } = require("../http"); |
|
|
|
|
|
class FlowExecutor { |
|
|
constructor() { |
|
|
this.variables = {}; |
|
|
this.introspect = (...args) => console.log("[introspect] ", ...args); |
|
|
this.logger = console.info; |
|
|
this.aibitat = null; |
|
|
} |
|
|
|
|
|
attachLogging(introspectFn = null, loggerFn = null) { |
|
|
this.introspect = |
|
|
introspectFn || ((...args) => console.log("[introspect] ", ...args)); |
|
|
this.logger = loggerFn || console.info; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
getValueFromPath(obj = {}, path = "") { |
|
|
if (typeof obj === "string") obj = safeJsonParse(obj, {}); |
|
|
|
|
|
if ( |
|
|
!obj || |
|
|
!path || |
|
|
typeof obj !== "object" || |
|
|
Object.keys(obj).length === 0 || |
|
|
typeof path !== "string" |
|
|
) |
|
|
return ""; |
|
|
|
|
|
|
|
|
const parts = []; |
|
|
let currentPart = ""; |
|
|
let inBrackets = false; |
|
|
|
|
|
for (let i = 0; i < path.length; i++) { |
|
|
const char = path[i]; |
|
|
if (char === "[") { |
|
|
inBrackets = true; |
|
|
if (currentPart) { |
|
|
parts.push(currentPart); |
|
|
currentPart = ""; |
|
|
} |
|
|
currentPart += char; |
|
|
} else if (char === "]") { |
|
|
inBrackets = false; |
|
|
currentPart += char; |
|
|
parts.push(currentPart); |
|
|
currentPart = ""; |
|
|
} else if (char === "." && !inBrackets) { |
|
|
if (currentPart) { |
|
|
parts.push(currentPart); |
|
|
currentPart = ""; |
|
|
} |
|
|
} else { |
|
|
currentPart += char; |
|
|
} |
|
|
} |
|
|
|
|
|
if (currentPart) parts.push(currentPart); |
|
|
let current = obj; |
|
|
|
|
|
for (const part of parts) { |
|
|
if (current === null || typeof current !== "object") return undefined; |
|
|
|
|
|
|
|
|
if (part.startsWith("[") && part.endsWith("]")) { |
|
|
const key = part.slice(1, -1); |
|
|
const cleanKey = key.replace(/^['"]|['"]$/g, ""); |
|
|
|
|
|
if (!isNaN(cleanKey)) { |
|
|
if (!Array.isArray(current)) return undefined; |
|
|
current = current[parseInt(cleanKey)]; |
|
|
} else { |
|
|
if (!(cleanKey in current)) return undefined; |
|
|
current = current[cleanKey]; |
|
|
} |
|
|
} else { |
|
|
|
|
|
if (!(part in current)) return undefined; |
|
|
current = current[part]; |
|
|
} |
|
|
|
|
|
if (current === undefined || current === null) return undefined; |
|
|
} |
|
|
|
|
|
return typeof current === "object" ? JSON.stringify(current) : current; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
replaceVariables(config) { |
|
|
const deepReplace = (obj) => { |
|
|
if (typeof obj === "string") { |
|
|
return obj.replace(/\${([^}]+)}/g, (match, varName) => { |
|
|
const value = this.getValueFromPath(this.variables, varName); |
|
|
return value !== undefined ? value : match; |
|
|
}); |
|
|
} |
|
|
|
|
|
if (Array.isArray(obj)) return obj.map((item) => deepReplace(item)); |
|
|
|
|
|
if (obj && typeof obj === "object") { |
|
|
const result = {}; |
|
|
for (const [key, value] of Object.entries(obj)) { |
|
|
result[key] = deepReplace(value); |
|
|
} |
|
|
return result; |
|
|
} |
|
|
return obj; |
|
|
}; |
|
|
|
|
|
return deepReplace(config); |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async executeStep(step) { |
|
|
const config = this.replaceVariables(step.config); |
|
|
let result; |
|
|
|
|
|
const context = { |
|
|
introspect: this.introspect, |
|
|
variables: this.variables, |
|
|
logger: this.logger, |
|
|
aibitat: this.aibitat, |
|
|
}; |
|
|
|
|
|
switch (step.type) { |
|
|
case FLOW_TYPES.START.type: |
|
|
|
|
|
if (config.variables) { |
|
|
config.variables.forEach((v) => { |
|
|
if (v.name && !this.variables[v.name]) { |
|
|
this.variables[v.name] = v.value || ""; |
|
|
} |
|
|
}); |
|
|
} |
|
|
result = this.variables; |
|
|
break; |
|
|
case FLOW_TYPES.API_CALL.type: |
|
|
result = await executeApiCall(config, context); |
|
|
break; |
|
|
case FLOW_TYPES.LLM_INSTRUCTION.type: |
|
|
result = await executeLLMInstruction(config, context); |
|
|
break; |
|
|
case FLOW_TYPES.WEB_SCRAPING.type: |
|
|
result = await executeWebScraping(config, context); |
|
|
break; |
|
|
default: |
|
|
throw new Error(`Unknown flow type: ${step.type}`); |
|
|
} |
|
|
|
|
|
|
|
|
if (config.resultVariable || config.responseVariable) { |
|
|
const varName = config.resultVariable || config.responseVariable; |
|
|
this.variables[varName] = result; |
|
|
} |
|
|
|
|
|
|
|
|
if (config.directOutput) result = { directOutput: true, result }; |
|
|
return result; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async executeFlow(flow, initialVariables = {}, aibitat) { |
|
|
await Telemetry.sendTelemetry("agent_flow_execution_started"); |
|
|
|
|
|
|
|
|
this.variables = { |
|
|
...( |
|
|
flow.config.steps.find((s) => s.type === "start")?.config?.variables || |
|
|
[] |
|
|
).reduce((acc, v) => ({ ...acc, [v.name]: v.value }), {}), |
|
|
...initialVariables, |
|
|
}; |
|
|
|
|
|
this.aibitat = aibitat; |
|
|
this.attachLogging(aibitat?.introspect, aibitat?.handlerProps?.log); |
|
|
const results = []; |
|
|
let directOutputResult = null; |
|
|
|
|
|
for (const step of flow.config.steps) { |
|
|
try { |
|
|
const result = await this.executeStep(step); |
|
|
|
|
|
|
|
|
|
|
|
if (result?.directOutput) { |
|
|
directOutputResult = result.result; |
|
|
break; |
|
|
} |
|
|
|
|
|
results.push({ success: true, result }); |
|
|
} catch (error) { |
|
|
results.push({ success: false, error: error.message }); |
|
|
break; |
|
|
} |
|
|
} |
|
|
|
|
|
return { |
|
|
success: results.every((r) => r.success), |
|
|
results, |
|
|
variables: this.variables, |
|
|
directOutput: directOutputResult, |
|
|
}; |
|
|
} |
|
|
} |
|
|
|
|
|
module.exports = { |
|
|
FlowExecutor, |
|
|
FLOW_TYPES, |
|
|
}; |
|
|
|