File size: 4,416 Bytes
e706de2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import RunnableConfig from "./context.js";
import {CallbackManager} from "../utils/index.js";

/**

 * Runnable - Base class for all composable components

 *

 * Every Runnable must implement the _call() method.

 * This base class provides invoke, stream, batch, and pipe.

 */
export class Runnable {
    constructor() {
        this.name = this.constructor.name;
    }

    /**

     * Main execution method - processes a single input

     *

     * @param {any} input - The input to process

     * @param {Object} config - Optional configuration

     * @returns {Promise<any>} The processed output

     */
    async invoke(input, config = {}) {
        // Normalize config to RunnableConfig instance
        const runnableConfig = config instanceof RunnableConfig
            ? config
            : new RunnableConfig(config);

        // Create callback manager
        const callbackManager = new CallbackManager(runnableConfig.callbacks);

        try {
            // Notify callbacks: starting
            await callbackManager.handleStart(this, input, runnableConfig);

            // Execute the runnable
            const output = await this._call(input, runnableConfig);

            // Notify callbacks: success
            await callbackManager.handleEnd(this, output, runnableConfig);

            return output;
        } catch (error) {
            // Notify callbacks: error
            await callbackManager.handleError(this, error, runnableConfig);
            throw error;
        }
    }

    /**

     * Internal method that subclasses must implement

     *

     * @param {any} input - The input to process

     * @param {Object} config - Optional configuration

     * @returns {Promise<any>} The processed output

     */
    async _call(input, config) {
        throw new Error(
            `${this.name} must implement _call() method`
        );
    }

    /**

     * Stream output in chunks

     *

     * @param {any} input - The input to process

     * @param {Object} config - Optional configuration

     * @yields {any} Output chunks

     */
    async* stream(input, config = {}) {
        // Default implementation: just yield the full result
        // Subclasses can override for true streaming
        const result = await this.invoke(input, config);
        yield result;
    }

    /**

     * Internal streaming method for subclasses

     * Override this for custom streaming behavior

     */
    async* _stream(input, config) {
        yield await this._call(input, config);
    }

    /**

     * Process multiple inputs in parallel

     *

     * @param {Array<any>} inputs - Array of inputs to process

     * @param {Object} config - Optional configuration

     * @returns {Promise<Array<any>>} Array of outputs

     */
    async batch(inputs, config = {}) {
        // Use Promise.all for parallel execution
        return await Promise.all(
            inputs.map(input => this.invoke(input, config))
        );
    }

    /**

     * Compose this Runnable with another

     * Creates a new Runnable that runs both in sequence

     *

     * @param {Runnable} other - The Runnable to pipe to

     * @returns {RunnableSequence} A new composed Runnable

     */
    pipe(other) {
        return new RunnableSequence([this, other]);
    }
}

/**

 * RunnableSequence - Chains multiple Runnables together

 *

 * Output of one becomes input of the next

 */
export class RunnableSequence extends Runnable {
    constructor(steps) {
        super();
        this.steps = steps; // Array of Runnables
    }

    async _call(input, config) {
        let output = input;

        // Run through each step sequentially
        for (const step of this.steps) {
            output = await step.invoke(output, config);
        }

        return output;
    }

    async *_stream(input, config) {
        let output = input;

        // Stream through all steps
        for (let i = 0; i < this.steps.length - 1; i++) {
            output = await this.steps[i].invoke(output, config);
        }

        // Only stream the last step
        yield* this.steps[this.steps.length - 1].stream(output, config);
    }

    // pipe() returns a new sequence with the added step
    pipe(other) {
        return new RunnableSequence([...this.steps, other]);
    }
}