File size: 11,218 Bytes
e706de2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
/**

 * Solution 4: Batch Processing

 *

 * This solution demonstrates:

 * - Parallel execution with batch()

 * - Performance comparison vs sequential

 * - Error handling in parallel operations

 * - Understanding Promise.all() behavior

 */

import { Runnable } from '../../../../src/index.js';

/**

 * DelayedMultiplierRunnable - Multiplies with a delay

 */
class DelayedMultiplierRunnable extends Runnable {
    constructor(factor, delayMs = 100) {
        super();
        this.factor = factor;
        this.delayMs = delayMs;
    }

    async _call(input, config) {
        if (typeof input !== 'number') {
            throw new Error(`Input must be a number, got ${typeof input}`);
        }

        // Log if in debug mode
        if (config?.debug) {
            console.log(`   Processing ${input} (will take ${this.delayMs}ms)...`);
        }

        // Simulate async work (like LLM inference or API call)
        await new Promise(resolve => setTimeout(resolve, this.delayMs));

        const result = input * this.factor;

        if (config?.debug) {
            console.log(`   Completed ${input} β†’ ${result}`);
        }

        return result;
    }

    toString() {
        return `DelayedMultiplier(Γ—${this.factor}, ${this.delayMs}ms)`;
    }
}

/**

 * Process inputs sequentially (one at a time)

 */
async function processSequentially(runnable, inputs) {
    const results = [];

    for (const input of inputs) {
        const result = await runnable.invoke(input);
        results.push(result);
    }

    return results;
}

/**

 * Process inputs in parallel (all at once)

 */
async function processInParallel(runnable, inputs) {
    // This is exactly what batch() does internally
    return await runnable.batch(inputs);
}

/**

 * Bonus: Process in chunks (controlled parallelism)

 */
async function processInChunks(runnable, inputs, chunkSize = 3) {
    const results = [];

    // Process in chunks to avoid overwhelming the system
    for (let i = 0; i < inputs.length; i += chunkSize) {
        const chunk = inputs.slice(i, i + chunkSize);
        const chunkResults = await runnable.batch(chunk);
        results.push(...chunkResults);
    }

    return results;
}

// ============================================================================
// Tests
// ============================================================================

async function runTests() {
    console.log('πŸ§ͺ Testing Batch Processing Solution...\n');

    try {
        // Test 1: Basic batch processing
        console.log('Test 1: Basic batch processing');
        const multiplier = new DelayedMultiplierRunnable(2, 100);
        const inputs = [1, 2, 3, 4, 5];

        const startTime = Date.now();
        const results = await multiplier.batch(inputs);
        const duration = Date.now() - startTime;

        console.log(`   Inputs:  [${inputs.join(', ')}]`);
        console.log(`   Results: [${results.join(', ')}]`);
        console.log(`   Time:    ${duration}ms`);
        console.assert(results.length === 5, 'Should have 5 results');
        console.assert(results[0] === 2, 'First result should be 2');
        console.assert(results[4] === 10, 'Last result should be 10');
        console.assert(duration < 200, 'Should complete in ~100ms (parallel), not 500ms (sequential)');
        console.log('βœ… Batch processing works!\n');

        // Test 2: Compare sequential vs parallel
        console.log('Test 2: Sequential vs Parallel comparison');
        const runnable = new DelayedMultiplierRunnable(3, 100);
        const testInputs = [1, 2, 3, 4, 5];

        console.log('   Processing sequentially...');
        const seqStart = Date.now();
        const seqResults = await processSequentially(runnable, testInputs);
        const seqDuration = Date.now() - seqStart;
        console.log(`   Sequential: ${seqDuration}ms (${testInputs.length} Γ— 100ms)`);

        console.log('   Processing in parallel...');
        const parStart = Date.now();
        const parResults = await processInParallel(runnable, testInputs);
        const parDuration = Date.now() - parStart;
        console.log(`   Parallel:   ${parDuration}ms (max of all operations)`);

        const speedup = (seqDuration / parDuration).toFixed(1);
        console.log(`   Speedup:    ${speedup}x faster πŸš€`);

        console.assert(
            JSON.stringify(seqResults) === JSON.stringify(parResults),
            'Results should be identical'
        );
        console.assert(parDuration < seqDuration / 2, 'Parallel should be much faster');
        console.log('βœ… Parallel is significantly faster!\n');

        // Test 3: Large batch
        console.log('Test 3: Large batch (10 items)');
        const largeBatch = Array.from({ length: 10 }, (_, i) => i + 1);
        const startLarge = Date.now();
        const largeResults = await multiplier.batch(largeBatch);
        const durationLarge = Date.now() - startLarge;

        console.log(`   Input: [1, 2, 3, ..., 10]`);
        console.log(`   Processed ${largeBatch.length} items in ${durationLarge}ms`);
        console.log(`   Sequential would take: ~${largeBatch.length * 100}ms`);
        console.log(`   Actual time: ${durationLarge}ms`);
        console.assert(largeResults.length === 10, 'Should process all items');
        console.assert(durationLarge < 200, 'Should complete quickly due to parallelism');
        console.log('βœ… Large batch works!\n');

        // Test 4: Batch with errors
        console.log('Test 4: Error handling in batch');
        const mixedInputs = [1, 2, 'invalid', 4, 5];

        try {
            await multiplier.batch(mixedInputs);
            console.log('❌ Should have thrown an error');
        } catch (error) {
            console.log(`   Caught error: ${error.message}`);
            console.log(`   Note: When one fails, all fail (Promise.all behavior)`);
            console.log('βœ… Errors are caught in batch processing!\n');
        }

        // Test 5: Empty batch
        console.log('Test 5: Empty batch');
        const emptyResults = await multiplier.batch([]);
        console.assert(emptyResults.length === 0, 'Empty batch should return empty array');
        console.log('βœ… Empty batch handled correctly!\n');

        // Test 6: Batch with pipeline
        console.log('Test 6: Batch through a pipeline');
        class AddConstant extends Runnable {
            constructor(constant) {
                super();
                this.constant = constant;
            }
            async _call(input) {
                await new Promise(resolve => setTimeout(resolve, 50));
                return input + this.constant;
            }
        }

        const pipeline = new DelayedMultiplierRunnable(2, 50)
            .pipe(new AddConstant(10));

        const pipelineInputs = [1, 2, 3];
        const startPipeline = Date.now();
        const pipelineResults = await pipeline.batch(pipelineInputs);
        const durationPipeline = Date.now() - startPipeline;

        console.log(`   Inputs:     [${pipelineInputs.join(', ')}]`);
        console.log(`   Results:    [${pipelineResults.join(', ')}]`);
        console.log(`   Expected:   [12, 14, 16] (Γ—2, then +10)`);
        console.log(`   Time:       ${durationPipeline}ms`);
        console.log(`   Sequential: would take ~${pipelineInputs.length * 100}ms`);
        console.assert(pipelineResults[0] === 12, 'First should be 12');
        console.assert(pipelineResults[2] === 16, 'Last should be 16');
        console.log('βœ… Batch works through pipelines!\n');

        // Test 7: Chunked processing
        console.log('Test 7: Chunked processing (controlled parallelism)');
        const manyInputs = Array.from({ length: 12 }, (_, i) => i + 1);

        console.log('   Processing 12 items in chunks of 3...');
        const startChunked = Date.now();
        const chunkedResults = await processInChunks(
            new DelayedMultiplierRunnable(2, 100),
            manyInputs,
            3 // chunk size
        );
        const durationChunked = Date.now() - startChunked;

        console.log(`   Time: ${durationChunked}ms`);
        console.log(`   Expected: ~400ms (4 chunks Γ— 100ms each)`);
        console.log(`   All parallel would be: ~100ms`);
        console.log(`   Sequential would be: ~1200ms`);
        console.assert(chunkedResults.length === 12, 'Should process all items');
        console.assert(
            durationChunked > 300 && durationChunked < 600,
            'Should take time for 4 chunks'
        );
        console.log('βœ… Chunked processing works!\n');

        // Test 8: Debug mode
        console.log('Test 8: Debug mode to see parallel execution');
        const debugMultiplier = new DelayedMultiplierRunnable(5, 100);
        console.log('   Watch items process in parallel:');
        await debugMultiplier.batch([1, 2, 3], { debug: true });
        console.log('βœ… Debug mode shows parallel execution!\n');

        // Test 9: Performance metrics
        console.log('Test 9: Detailed performance analysis');
        const sizes = [1, 2, 5, 10];
        const delay = 100;

        console.log('   Items | Sequential | Parallel | Speedup');
        console.log('   ------|------------|----------|--------');

        for (const size of sizes) {
            const inputs = Array.from({ length: size }, (_, i) => i + 1);
            const perf = new DelayedMultiplierRunnable(2, delay);

            const seqStart = Date.now();
            await processSequentially(perf, inputs);
            const seqTime = Date.now() - seqStart;

            const parStart = Date.now();
            await processInParallel(perf, inputs);
            const parTime = Date.now() - parStart;

            const speedup = (seqTime / parTime).toFixed(1);
            console.log(`   ${size.toString().padStart(5)} | ${seqTime.toString().padStart(10)}ms | ${parTime.toString().padStart(8)}ms | ${speedup.padStart(6)}x`);
        }
        console.log('βœ… Performance scales with parallelism!\n');

        console.log('πŸŽ‰ All tests passed!');
        console.log('\nπŸ’‘ Key Learnings:');
        console.log('   β€’ batch() uses Promise.all() for parallel execution');
        console.log('   β€’ N items with 100ms delay: sequential = NΓ—100ms, parallel = 100ms');
        console.log('   β€’ One error in batch causes all to fail');
        console.log('   β€’ Chunked processing balances speed and resource usage');
        console.log('   β€’ Pipelines work with batch processing');
        console.log('   β€’ Perfect for processing multiple LLM requests simultaneously');
    } catch (error) {
        console.error('❌ Test failed:', error.message);
        console.error(error.stack);
    }
}

// Run tests
if (import.meta.url === `file://${process.argv[1]}`) {
    runTests();
}

export {
    DelayedMultiplierRunnable,
    processSequentially,
    processInParallel,
    processInChunks
};