File size: 4,416 Bytes
e706de2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import RunnableConfig from "./context.js";
import {CallbackManager} from "../utils/index.js";
/**
* Runnable - Base class for all composable components
*
* Every Runnable must implement the _call() method.
* This base class provides invoke, stream, batch, and pipe.
*/
export class Runnable {
constructor() {
this.name = this.constructor.name;
}
/**
* Main execution method - processes a single input
*
* @param {any} input - The input to process
* @param {Object} config - Optional configuration
* @returns {Promise<any>} The processed output
*/
async invoke(input, config = {}) {
// Normalize config to RunnableConfig instance
const runnableConfig = config instanceof RunnableConfig
? config
: new RunnableConfig(config);
// Create callback manager
const callbackManager = new CallbackManager(runnableConfig.callbacks);
try {
// Notify callbacks: starting
await callbackManager.handleStart(this, input, runnableConfig);
// Execute the runnable
const output = await this._call(input, runnableConfig);
// Notify callbacks: success
await callbackManager.handleEnd(this, output, runnableConfig);
return output;
} catch (error) {
// Notify callbacks: error
await callbackManager.handleError(this, error, runnableConfig);
throw error;
}
}
/**
* Internal method that subclasses must implement
*
* @param {any} input - The input to process
* @param {Object} config - Optional configuration
* @returns {Promise<any>} The processed output
*/
async _call(input, config) {
throw new Error(
`${this.name} must implement _call() method`
);
}
/**
* Stream output in chunks
*
* @param {any} input - The input to process
* @param {Object} config - Optional configuration
* @yields {any} Output chunks
*/
async* stream(input, config = {}) {
// Default implementation: just yield the full result
// Subclasses can override for true streaming
const result = await this.invoke(input, config);
yield result;
}
/**
* Internal streaming method for subclasses
* Override this for custom streaming behavior
*/
async* _stream(input, config) {
yield await this._call(input, config);
}
/**
* Process multiple inputs in parallel
*
* @param {Array<any>} inputs - Array of inputs to process
* @param {Object} config - Optional configuration
* @returns {Promise<Array<any>>} Array of outputs
*/
async batch(inputs, config = {}) {
// Use Promise.all for parallel execution
return await Promise.all(
inputs.map(input => this.invoke(input, config))
);
}
/**
* Compose this Runnable with another
* Creates a new Runnable that runs both in sequence
*
* @param {Runnable} other - The Runnable to pipe to
* @returns {RunnableSequence} A new composed Runnable
*/
pipe(other) {
return new RunnableSequence([this, other]);
}
}
/**
* RunnableSequence - Chains multiple Runnables together
*
* Output of one becomes input of the next
*/
export class RunnableSequence extends Runnable {
constructor(steps) {
super();
this.steps = steps; // Array of Runnables
}
async _call(input, config) {
let output = input;
// Run through each step sequentially
for (const step of this.steps) {
output = await step.invoke(output, config);
}
return output;
}
async *_stream(input, config) {
let output = input;
// Stream through all steps
for (let i = 0; i < this.steps.length - 1; i++) {
output = await this.steps[i].invoke(output, config);
}
// Only stream the last step
yield* this.steps[this.steps.length - 1].stream(output, config);
}
// pipe() returns a new sequence with the added step
pipe(other) {
return new RunnableSequence([...this.steps, other]);
}
} |