|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import { Runnable } from '../../../../src/index.js';
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DelayedMultiplierRunnable extends Runnable {
|
|
|
constructor(factor, delayMs = 100) {
|
|
|
super();
|
|
|
this.factor = factor;
|
|
|
this.delayMs = delayMs;
|
|
|
}
|
|
|
|
|
|
async _call(input, config) {
|
|
|
if (typeof input !== 'number') {
|
|
|
throw new Error(`Input must be a number, got ${typeof input}`);
|
|
|
}
|
|
|
|
|
|
|
|
|
if (config?.debug) {
|
|
|
console.log(` Processing ${input} (will take ${this.delayMs}ms)...`);
|
|
|
}
|
|
|
|
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, this.delayMs));
|
|
|
|
|
|
const result = input * this.factor;
|
|
|
|
|
|
if (config?.debug) {
|
|
|
console.log(` Completed ${input} β ${result}`);
|
|
|
}
|
|
|
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
toString() {
|
|
|
return `DelayedMultiplier(Γ${this.factor}, ${this.delayMs}ms)`;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function processSequentially(runnable, inputs) {
|
|
|
const results = [];
|
|
|
|
|
|
for (const input of inputs) {
|
|
|
const result = await runnable.invoke(input);
|
|
|
results.push(result);
|
|
|
}
|
|
|
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function processInParallel(runnable, inputs) {
|
|
|
|
|
|
return await runnable.batch(inputs);
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function processInChunks(runnable, inputs, chunkSize = 3) {
|
|
|
const results = [];
|
|
|
|
|
|
|
|
|
for (let i = 0; i < inputs.length; i += chunkSize) {
|
|
|
const chunk = inputs.slice(i, i + chunkSize);
|
|
|
const chunkResults = await runnable.batch(chunk);
|
|
|
results.push(...chunkResults);
|
|
|
}
|
|
|
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function runTests() {
|
|
|
console.log('π§ͺ Testing Batch Processing Solution...\n');
|
|
|
|
|
|
try {
|
|
|
|
|
|
console.log('Test 1: Basic batch processing');
|
|
|
const multiplier = new DelayedMultiplierRunnable(2, 100);
|
|
|
const inputs = [1, 2, 3, 4, 5];
|
|
|
|
|
|
const startTime = Date.now();
|
|
|
const results = await multiplier.batch(inputs);
|
|
|
const duration = Date.now() - startTime;
|
|
|
|
|
|
console.log(` Inputs: [${inputs.join(', ')}]`);
|
|
|
console.log(` Results: [${results.join(', ')}]`);
|
|
|
console.log(` Time: ${duration}ms`);
|
|
|
console.assert(results.length === 5, 'Should have 5 results');
|
|
|
console.assert(results[0] === 2, 'First result should be 2');
|
|
|
console.assert(results[4] === 10, 'Last result should be 10');
|
|
|
console.assert(duration < 200, 'Should complete in ~100ms (parallel), not 500ms (sequential)');
|
|
|
console.log('β
Batch processing works!\n');
|
|
|
|
|
|
|
|
|
console.log('Test 2: Sequential vs Parallel comparison');
|
|
|
const runnable = new DelayedMultiplierRunnable(3, 100);
|
|
|
const testInputs = [1, 2, 3, 4, 5];
|
|
|
|
|
|
console.log(' Processing sequentially...');
|
|
|
const seqStart = Date.now();
|
|
|
const seqResults = await processSequentially(runnable, testInputs);
|
|
|
const seqDuration = Date.now() - seqStart;
|
|
|
console.log(` Sequential: ${seqDuration}ms (${testInputs.length} Γ 100ms)`);
|
|
|
|
|
|
console.log(' Processing in parallel...');
|
|
|
const parStart = Date.now();
|
|
|
const parResults = await processInParallel(runnable, testInputs);
|
|
|
const parDuration = Date.now() - parStart;
|
|
|
console.log(` Parallel: ${parDuration}ms (max of all operations)`);
|
|
|
|
|
|
const speedup = (seqDuration / parDuration).toFixed(1);
|
|
|
console.log(` Speedup: ${speedup}x faster π`);
|
|
|
|
|
|
console.assert(
|
|
|
JSON.stringify(seqResults) === JSON.stringify(parResults),
|
|
|
'Results should be identical'
|
|
|
);
|
|
|
console.assert(parDuration < seqDuration / 2, 'Parallel should be much faster');
|
|
|
console.log('β
Parallel is significantly faster!\n');
|
|
|
|
|
|
|
|
|
console.log('Test 3: Large batch (10 items)');
|
|
|
const largeBatch = Array.from({ length: 10 }, (_, i) => i + 1);
|
|
|
const startLarge = Date.now();
|
|
|
const largeResults = await multiplier.batch(largeBatch);
|
|
|
const durationLarge = Date.now() - startLarge;
|
|
|
|
|
|
console.log(` Input: [1, 2, 3, ..., 10]`);
|
|
|
console.log(` Processed ${largeBatch.length} items in ${durationLarge}ms`);
|
|
|
console.log(` Sequential would take: ~${largeBatch.length * 100}ms`);
|
|
|
console.log(` Actual time: ${durationLarge}ms`);
|
|
|
console.assert(largeResults.length === 10, 'Should process all items');
|
|
|
console.assert(durationLarge < 200, 'Should complete quickly due to parallelism');
|
|
|
console.log('β
Large batch works!\n');
|
|
|
|
|
|
|
|
|
console.log('Test 4: Error handling in batch');
|
|
|
const mixedInputs = [1, 2, 'invalid', 4, 5];
|
|
|
|
|
|
try {
|
|
|
await multiplier.batch(mixedInputs);
|
|
|
console.log('β Should have thrown an error');
|
|
|
} catch (error) {
|
|
|
console.log(` Caught error: ${error.message}`);
|
|
|
console.log(` Note: When one fails, all fail (Promise.all behavior)`);
|
|
|
console.log('β
Errors are caught in batch processing!\n');
|
|
|
}
|
|
|
|
|
|
|
|
|
console.log('Test 5: Empty batch');
|
|
|
const emptyResults = await multiplier.batch([]);
|
|
|
console.assert(emptyResults.length === 0, 'Empty batch should return empty array');
|
|
|
console.log('β
Empty batch handled correctly!\n');
|
|
|
|
|
|
|
|
|
console.log('Test 6: Batch through a pipeline');
|
|
|
class AddConstant extends Runnable {
|
|
|
constructor(constant) {
|
|
|
super();
|
|
|
this.constant = constant;
|
|
|
}
|
|
|
async _call(input) {
|
|
|
await new Promise(resolve => setTimeout(resolve, 50));
|
|
|
return input + this.constant;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
const pipeline = new DelayedMultiplierRunnable(2, 50)
|
|
|
.pipe(new AddConstant(10));
|
|
|
|
|
|
const pipelineInputs = [1, 2, 3];
|
|
|
const startPipeline = Date.now();
|
|
|
const pipelineResults = await pipeline.batch(pipelineInputs);
|
|
|
const durationPipeline = Date.now() - startPipeline;
|
|
|
|
|
|
console.log(` Inputs: [${pipelineInputs.join(', ')}]`);
|
|
|
console.log(` Results: [${pipelineResults.join(', ')}]`);
|
|
|
console.log(` Expected: [12, 14, 16] (Γ2, then +10)`);
|
|
|
console.log(` Time: ${durationPipeline}ms`);
|
|
|
console.log(` Sequential: would take ~${pipelineInputs.length * 100}ms`);
|
|
|
console.assert(pipelineResults[0] === 12, 'First should be 12');
|
|
|
console.assert(pipelineResults[2] === 16, 'Last should be 16');
|
|
|
console.log('β
Batch works through pipelines!\n');
|
|
|
|
|
|
|
|
|
console.log('Test 7: Chunked processing (controlled parallelism)');
|
|
|
const manyInputs = Array.from({ length: 12 }, (_, i) => i + 1);
|
|
|
|
|
|
console.log(' Processing 12 items in chunks of 3...');
|
|
|
const startChunked = Date.now();
|
|
|
const chunkedResults = await processInChunks(
|
|
|
new DelayedMultiplierRunnable(2, 100),
|
|
|
manyInputs,
|
|
|
3
|
|
|
);
|
|
|
const durationChunked = Date.now() - startChunked;
|
|
|
|
|
|
console.log(` Time: ${durationChunked}ms`);
|
|
|
console.log(` Expected: ~400ms (4 chunks Γ 100ms each)`);
|
|
|
console.log(` All parallel would be: ~100ms`);
|
|
|
console.log(` Sequential would be: ~1200ms`);
|
|
|
console.assert(chunkedResults.length === 12, 'Should process all items');
|
|
|
console.assert(
|
|
|
durationChunked > 300 && durationChunked < 600,
|
|
|
'Should take time for 4 chunks'
|
|
|
);
|
|
|
console.log('β
Chunked processing works!\n');
|
|
|
|
|
|
|
|
|
console.log('Test 8: Debug mode to see parallel execution');
|
|
|
const debugMultiplier = new DelayedMultiplierRunnable(5, 100);
|
|
|
console.log(' Watch items process in parallel:');
|
|
|
await debugMultiplier.batch([1, 2, 3], { debug: true });
|
|
|
console.log('β
Debug mode shows parallel execution!\n');
|
|
|
|
|
|
|
|
|
console.log('Test 9: Detailed performance analysis');
|
|
|
const sizes = [1, 2, 5, 10];
|
|
|
const delay = 100;
|
|
|
|
|
|
console.log(' Items | Sequential | Parallel | Speedup');
|
|
|
console.log(' ------|------------|----------|--------');
|
|
|
|
|
|
for (const size of sizes) {
|
|
|
const inputs = Array.from({ length: size }, (_, i) => i + 1);
|
|
|
const perf = new DelayedMultiplierRunnable(2, delay);
|
|
|
|
|
|
const seqStart = Date.now();
|
|
|
await processSequentially(perf, inputs);
|
|
|
const seqTime = Date.now() - seqStart;
|
|
|
|
|
|
const parStart = Date.now();
|
|
|
await processInParallel(perf, inputs);
|
|
|
const parTime = Date.now() - parStart;
|
|
|
|
|
|
const speedup = (seqTime / parTime).toFixed(1);
|
|
|
console.log(` ${size.toString().padStart(5)} | ${seqTime.toString().padStart(10)}ms | ${parTime.toString().padStart(8)}ms | ${speedup.padStart(6)}x`);
|
|
|
}
|
|
|
console.log('β
Performance scales with parallelism!\n');
|
|
|
|
|
|
console.log('π All tests passed!');
|
|
|
console.log('\nπ‘ Key Learnings:');
|
|
|
console.log(' β’ batch() uses Promise.all() for parallel execution');
|
|
|
console.log(' β’ N items with 100ms delay: sequential = NΓ100ms, parallel = 100ms');
|
|
|
console.log(' β’ One error in batch causes all to fail');
|
|
|
console.log(' β’ Chunked processing balances speed and resource usage');
|
|
|
console.log(' β’ Pipelines work with batch processing');
|
|
|
console.log(' β’ Perfect for processing multiple LLM requests simultaneously');
|
|
|
} catch (error) {
|
|
|
console.error('β Test failed:', error.message);
|
|
|
console.error(error.stack);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
if (import.meta.url === `file://${process.argv[1]}`) {
|
|
|
runTests();
|
|
|
}
|
|
|
|
|
|
export {
|
|
|
DelayedMultiplierRunnable,
|
|
|
processSequentially,
|
|
|
processInParallel,
|
|
|
processInChunks
|
|
|
}; |