File size: 7,433 Bytes
e706de2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
/**
* Exercise 4: Implement Batch Processing
*
* Goal: Test and understand parallel execution with batch()
*
* Requirements:
* - Process multiple inputs in parallel using batch()
* - Measure performance difference vs sequential
* - Handle errors in batch processing
* - Understand Promise.all() behavior
*
* Example:
* const results = await runnable.batch([1, 2, 3, 4, 5]);
* // All 5 inputs process simultaneously
*/
import { Runnable } from '../../../../src/index.js';
/**
* DelayedMultiplierRunnable - Multiplies with a delay
*
* This simulates an async operation (like an API call or LLM inference)
* that takes time to complete.
*/
class DelayedMultiplierRunnable extends Runnable {
constructor(factor, delayMs = 100) {
super();
this.factor = factor;
this.delayMs = delayMs;
}
async _call(input, config) {
if (typeof input !== 'number') {
throw new Error('Input must be a number');
}
// Simulate async work
await new Promise(resolve => setTimeout(resolve, this.delayMs));
return input * this.factor;
}
toString() {
return `DelayedMultiplier(Γ${this.factor}, ${this.delayMs}ms)`;
}
}
/**
* TODO: Create a function that processes sequentially
*
* Process inputs one at a time (not in parallel)
*/
async function processSequentially(runnable, inputs) {
// TODO: Loop through inputs and call invoke() for each
// TODO: Return array of results
}
/**
* TODO: Create a function that processes in parallel
*
* Process all inputs at the same time using batch()
*/
async function processInParallel(runnable, inputs) {
// TODO: Use batch() to process all inputs at once
// TODO: Return array of results
}
// ============================================================================
// Tests
// ============================================================================
async function runTests() {
console.log('π§ͺ Testing Batch Processing...\n');
try {
// Test 1: Basic batch processing
console.log('Test 1: Basic batch processing');
const multiplier = new DelayedMultiplierRunnable(2, 100);
const inputs = [1, 2, 3, 4, 5];
const startTime = Date.now();
const results = await multiplier.batch(inputs);
const duration = Date.now() - startTime;
console.log(` Inputs: [${inputs.join(', ')}]`);
console.log(` Results: [${results.join(', ')}]`);
console.log(` Time: ${duration}ms`);
console.assert(results.length === 5, 'Should have 5 results');
console.assert(results[0] === 2, 'First result should be 2');
console.assert(results[4] === 10, 'Last result should be 10');
console.assert(duration < 200, 'Should complete in ~100ms (parallel), not 500ms (sequential)');
console.log('β
Batch processing works!\n');
// Test 2: Compare sequential vs parallel
console.log('Test 2: Sequential vs Parallel comparison');
const runnable = new DelayedMultiplierRunnable(3, 100);
const testInputs = [1, 2, 3, 4, 5];
console.log(' Processing sequentially...');
const seqStart = Date.now();
const seqResults = await processSequentially(runnable, testInputs);
const seqDuration = Date.now() - seqStart;
console.log(` Sequential: ${seqDuration}ms`);
console.log(' Processing in parallel...');
const parStart = Date.now();
const parResults = await processInParallel(runnable, testInputs);
const parDuration = Date.now() - parStart;
console.log(` Parallel: ${parDuration}ms`);
console.log(` Speedup: ${(seqDuration / parDuration).toFixed(1)}x faster`);
console.assert(parDuration < seqDuration / 2, 'Parallel should be much faster');
console.log('β
Parallel is significantly faster!\n');
// Test 3: Large batch
console.log('Test 3: Large batch (10 items)');
const largeBatch = Array.from({ length: 10 }, (_, i) => i + 1);
const startLarge = Date.now();
const largeResults = await multiplier.batch(largeBatch);
const durationLarge = Date.now() - startLarge;
console.log(` Processed ${largeBatch.length} items in ${durationLarge}ms`);
console.assert(largeResults.length === 10, 'Should process all items');
console.assert(durationLarge < 200, 'Should complete quickly due to parallelism');
console.log('β
Large batch works!\n');
// Test 4: Batch with errors
console.log('Test 4: Error handling in batch');
const mixedInputs = [1, 2, 'invalid', 4, 5];
try {
await multiplier.batch(mixedInputs);
console.log('β Should have thrown an error');
} catch (error) {
console.log(` Caught error: ${error.message}`);
console.log('β
Errors are caught in batch processing!\n');
}
// Test 5: Empty batch
console.log('Test 5: Empty batch');
const emptyResults = await multiplier.batch([]);
console.assert(emptyResults.length === 0, 'Empty batch should return empty array');
console.log('β
Empty batch handled correctly!\n');
// Test 6: Batch with pipeline
console.log('Test 6: Batch through a pipeline');
class AddConstant extends Runnable {
constructor(constant) {
super();
this.constant = constant;
}
async _call(input) {
await new Promise(resolve => setTimeout(resolve, 50));
return input + this.constant;
}
}
const pipeline = new DelayedMultiplierRunnable(2, 50)
.pipe(new AddConstant(10));
const pipelineInputs = [1, 2, 3];
const startPipeline = Date.now();
const pipelineResults = await pipeline.batch(pipelineInputs);
const durationPipeline = Date.now() - startPipeline;
console.log(` Inputs: [${pipelineInputs.join(', ')}]`);
console.log(` Results: [${pipelineResults.join(', ')}]`);
console.log(` Expected: [12, 14, 16] (multiply by 2, then add 10)`);
console.log(` Time: ${durationPipeline}ms`);
console.assert(pipelineResults[0] === 12, 'First should be 12');
console.assert(pipelineResults[2] === 16, 'Last should be 16');
console.log('β
Batch works through pipelines!\n');
console.log('π All tests passed!');
console.log('\nπ‘ Key Learnings:');
console.log(' β’ batch() processes inputs in parallel');
console.log(' β’ Much faster than sequential processing');
console.log(' β’ Uses Promise.all() under the hood');
console.log(' β’ All inputs must succeed (or all fail)');
console.log(' β’ Works with pipelines too!');
} catch (error) {
console.error('β Test failed:', error.message);
console.error(error.stack);
}
}
// Run tests
if (import.meta.url === `file://${process.argv[1]}`) {
runTests();
}
export {
DelayedMultiplierRunnable,
processSequentially,
processInParallel
}; |