| import { createLoggerWithContext } from "@midday/logger"; |
|
|
| const logger = createLoggerWithContext("worker:batch"); |
|
|
| |
| |
| |
| |
| export async function processBatch<T, R>( |
| items: T[], |
| batchSize: number, |
| processor: (batch: T[]) => Promise<R[]>, |
| ): Promise<R[]> { |
| const results: R[] = []; |
|
|
| for (let i = 0; i < items.length; i += batchSize) { |
| const batch = items.slice(i, i + batchSize); |
| try { |
| const batchResults = await processor(batch); |
| results.push(...batchResults); |
| } catch (error) { |
| |
| |
| logger.error(`Batch processing failed at index ${i}`, { |
| error: error instanceof Error ? error.message : String(error), |
| }); |
| throw error; |
| } |
| } |
|
|
| return results; |
| } |
|
|
| |
| |
| |
| |
| export async function processBatchWithErrorIsolation<T, R>( |
| items: T[], |
| batchSize: number, |
| processor: (batch: T[]) => Promise<R[]>, |
| ): Promise<{ |
| results: R[]; |
| errors: Array<{ index: number; error: unknown }>; |
| }> { |
| const results: R[] = []; |
| const errors: Array<{ index: number; error: unknown }> = []; |
|
|
| for (let i = 0; i < items.length; i += batchSize) { |
| const batch = items.slice(i, i + batchSize); |
| const batchIndex = Math.floor(i / batchSize); |
|
|
| try { |
| const batchResults = await processor(batch); |
| results.push(...batchResults); |
| } catch (error) { |
| errors.push({ index: batchIndex, error }); |
| |
| } |
| } |
|
|
| return { results, errors }; |
| } |
|
|
| |
| |
| |
| |
| export async function processBatchParallel<T, R>( |
| items: T[], |
| batchSize: number, |
| concurrency: number, |
| processor: (batch: T[]) => Promise<R[]>, |
| ): Promise<R[]> { |
| const results: R[] = []; |
| const batches: T[][] = []; |
|
|
| |
| for (let i = 0; i < items.length; i += batchSize) { |
| batches.push(items.slice(i, i + batchSize)); |
| } |
|
|
| |
| for (let i = 0; i < batches.length; i += concurrency) { |
| const concurrentBatches = batches.slice(i, i + concurrency); |
| const batchResults = await Promise.all( |
| concurrentBatches.map((batch) => processor(batch)), |
| ); |
| results.push(...batchResults.flat()); |
| } |
|
|
| return results; |
| } |
|
|