| | import { promisify } from 'node:util' |
| | import { InvariantError } from '../../shared/lib/invariant-error' |
| | import { bindSnapshot } from '../app-render/async-local-storage' |
| |
|
| | type Execution = { |
| | state: ExecutionState |
| | queuedImmediates: QueueItem[] |
| | } |
| |
|
| | enum ExecutionState { |
| | Waiting = 1, |
| | Working = 2, |
| | Finished = 3, |
| | Abandoned = 4, |
| | } |
| |
|
| | let wasEnabledAtLeastOnce = false |
| |
|
| | let pendingNextTicks = 0 |
| | let currentExecution: Execution | null = null |
| |
|
| | const originalSetImmediate = globalThis.setImmediate |
| | const originalClearImmediate = globalThis.clearImmediate |
| | const originalNextTick = process.nextTick |
| |
|
| | export { originalSetImmediate as unpatchedSetImmediate } |
| |
|
| | function install() { |
| | if (process.env.NEXT_RUNTIME === 'edge') { |
| | |
| | |
| | return |
| | } else { |
| | debug?.('installing fast setImmediate patch') |
| |
|
| | const nodeTimers = require('node:timers') as typeof import('node:timers') |
| | globalThis.setImmediate = nodeTimers.setImmediate = |
| | |
| | patchedSetImmediate as unknown as typeof setImmediate |
| | globalThis.clearImmediate = nodeTimers.clearImmediate = |
| | patchedClearImmediate |
| |
|
| | const nodeTimersPromises = |
| | require('node:timers/promises') as typeof import('node:timers/promises') |
| | nodeTimersPromises.setImmediate = |
| | patchedSetImmediatePromise as typeof import('node:timers/promises').setImmediate |
| |
|
| | process.nextTick = patchedNextTick |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | export function DANGEROUSLY_runPendingImmediatesAfterCurrentTask() { |
| | if (process.env.NEXT_RUNTIME === 'edge') { |
| | throw new InvariantError( |
| | 'DANGEROUSLY_runPendingImmediatesAfterCurrentTask cannot be called in the edge runtime' |
| | ) |
| | } else { |
| | const execution = startCapturingImmediates() |
| |
|
| | try { |
| | scheduleWorkAfterNextTicksAndMicrotasks(execution) |
| | } catch (err) { |
| | |
| | if (execution.state === ExecutionState.Abandoned) { |
| | throw err |
| | } |
| | |
| | bail( |
| | execution, |
| | new InvariantError( |
| | 'An unexpected error occurred while starting to capture immediates', |
| | { |
| | cause: err, |
| | } |
| | ) |
| | ) |
| | } |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | export function expectNoPendingImmediates() { |
| | if (process.env.NEXT_RUNTIME === 'edge') { |
| | throw new InvariantError( |
| | 'expectNoPendingImmediates cannot be called in the edge runtime' |
| | ) |
| | } else { |
| | if (currentExecution !== null) { |
| | bail( |
| | currentExecution, |
| | new InvariantError( |
| | `Expected all captured immediates to have been executed (state: ${ExecutionState[currentExecution.state]})` |
| | ) |
| | ) |
| | } |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | function scheduleWorkAfterNextTicksAndMicrotasks(execution: Execution) { |
| | if (execution.state !== ExecutionState.Waiting) { |
| | throw new InvariantError( |
| | `scheduleWorkAfterTicksAndMicrotasks can only be called while waiting (state: ${ExecutionState[execution.state]})` |
| | ) |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | queueMicrotask(() => { |
| | |
| | |
| | originalNextTick(() => { |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | try { |
| | if ( |
| | execution.state === ExecutionState.Abandoned || |
| | currentExecution !== execution |
| | ) { |
| | debug?.(`scheduler :: the execution was abandoned`) |
| | return |
| | } |
| | if (pendingNextTicks > 0) { |
| | |
| | |
| | debug?.(`scheduler :: yielding to ${pendingNextTicks} nextTicks`) |
| | return scheduleWorkAfterNextTicksAndMicrotasks(execution) |
| | } |
| |
|
| | |
| | |
| | return performWork(execution) |
| | } catch (err) { |
| | |
| |
|
| | |
| | |
| | const executionAfterWork = execution as Execution |
| | if (executionAfterWork.state === ExecutionState.Abandoned) { |
| | throw err |
| | } |
| |
|
| | |
| | |
| | queueMicrotask(() => { |
| | bail( |
| | execution, |
| | new InvariantError( |
| | 'An unexpected error occurred while executing immediates', |
| | { cause: err } |
| | ) |
| | ) |
| | }) |
| | } |
| | }) |
| | }) |
| | } |
| |
|
| | |
| | function performWork(execution: Execution) { |
| | if (execution.state === ExecutionState.Abandoned) { |
| | return |
| | } |
| |
|
| | debug?.(`scheduler :: performing work`) |
| |
|
| | if (execution.state !== ExecutionState.Waiting) { |
| | throw new InvariantError( |
| | `performWork can only be called while waiting (state: ${ExecutionState[execution.state]})` |
| | ) |
| | } |
| | execution.state = ExecutionState.Working |
| |
|
| | const queueItem = takeNextActiveQueueItem(execution) |
| |
|
| | if (queueItem === null) { |
| | debug?.(`scheduler :: no immediates queued, exiting`) |
| | stopCapturingImmediates(execution) |
| | return |
| | } |
| |
|
| | debug?.(`scheduler :: executing queued immediate`) |
| |
|
| | const { immediateObject, callback, args } = queueItem |
| |
|
| | immediateObject[INTERNALS].queueItem = null |
| | clearQueueItem(queueItem) |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | let didThrow = false |
| | let thrownValue: unknown = undefined |
| | queueMicrotask(() => { |
| | if (didThrow) { |
| | debug?.('scheduler :: rethrowing sync error from immediate in microtask') |
| | throw thrownValue |
| | } |
| | }) |
| |
|
| | try { |
| | if (args !== null) { |
| | callback.apply(null, args) |
| | } else { |
| | callback() |
| | } |
| | } catch (err) { |
| | |
| | didThrow = true |
| | thrownValue = err |
| | } |
| |
|
| | |
| | |
| | |
| | execution.state = ExecutionState.Waiting |
| | scheduleWorkAfterNextTicksAndMicrotasks(execution) |
| | } |
| |
|
| | function takeNextActiveQueueItem(execution: Execution): ActiveQueueItem | null { |
| | |
| | |
| | |
| | const { queuedImmediates } = execution |
| |
|
| | let firstActiveItem: ActiveQueueItem | null = null |
| | let firstActiveItemIndex = -1 |
| | for (let i = 0; i < queuedImmediates.length; i++) { |
| | const item = queuedImmediates[i] |
| | if (!item.isCleared) { |
| | firstActiveItem = item |
| | firstActiveItemIndex = i |
| | break |
| | } |
| | } |
| |
|
| | if (firstActiveItem === null) { |
| | |
| |
|
| | |
| | if (queuedImmediates.length > 0) { |
| | queuedImmediates.length = 0 |
| | } |
| |
|
| | return null |
| | } |
| |
|
| | |
| | |
| | if (firstActiveItemIndex === 0) { |
| | |
| | |
| | queuedImmediates.shift() |
| | } else { |
| | queuedImmediates.splice(0, firstActiveItemIndex + 1) |
| | } |
| |
|
| | return firstActiveItem |
| | } |
| |
|
| | function startCapturingImmediates(): Execution { |
| | if (currentExecution !== null) { |
| | bail( |
| | currentExecution, |
| | new InvariantError( |
| | `Cannot start capturing immediates again without finishing the previous task (state: ${ExecutionState[currentExecution.state]})` |
| | ) |
| | ) |
| | } |
| | wasEnabledAtLeastOnce = true |
| |
|
| | const execution: Execution = { |
| | state: ExecutionState.Waiting, |
| | queuedImmediates: [], |
| | } |
| | currentExecution = execution |
| |
|
| | return execution |
| | } |
| |
|
| | function stopCapturingImmediates(execution: Execution) { |
| | if (execution.state === ExecutionState.Abandoned) { |
| | return |
| | } |
| |
|
| | |
| | |
| | |
| | if (execution.state !== ExecutionState.Working) { |
| | throw new InvariantError( |
| | `Cannot stop capturing immediates before execution is finished (state: ${ExecutionState[execution.state]})` |
| | ) |
| | } |
| |
|
| | execution.state = ExecutionState.Finished |
| |
|
| | if (currentExecution === execution) { |
| | currentExecution = null |
| | } |
| | } |
| |
|
| | function bail(execution: Execution, error: Error): never { |
| | |
| | |
| | |
| |
|
| | if (currentExecution === execution) { |
| | currentExecution = null |
| | } |
| |
|
| | execution.state = ExecutionState.Abandoned |
| |
|
| | |
| | |
| | |
| | |
| | |
| | for (const queueItem of execution.queuedImmediates) { |
| | if (queueItem.isCleared) { |
| | continue |
| | } |
| | scheduleQueuedImmediateAsNativeImmediate(queueItem) |
| | } |
| | execution.queuedImmediates.length = 0 |
| |
|
| | |
| | |
| |
|
| | throw error |
| | } |
| |
|
| | function scheduleQueuedImmediateAsNativeImmediate(queueItem: ActiveQueueItem) { |
| | const { callback, args, immediateObject } = queueItem |
| | const hasRef = immediateObject[INTERNALS].hasRef |
| |
|
| | clearQueueItem(queueItem) |
| |
|
| | const nativeImmediate = |
| | args !== null |
| | ? originalSetImmediate(callback, ...args) |
| | : originalSetImmediate(callback) |
| |
|
| | if (!hasRef) { |
| | nativeImmediate.unref() |
| | } |
| |
|
| | |
| | |
| | proxyQueuedImmediateToNativeImmediate(immediateObject, nativeImmediate) |
| | } |
| |
|
| | type QueueItem = ActiveQueueItem | ClearedQueueItem |
| | type ActiveQueueItem = { |
| | isCleared: false |
| | callback: (...args: any[]) => any |
| | args: any[] | null |
| | immediateObject: NextImmediate |
| | } |
| | type ClearedQueueItem = { |
| | isCleared: true |
| | callback: null |
| | args: null |
| | immediateObject: null |
| | } |
| |
|
| | function clearQueueItem(originalQueueItem: QueueItem) { |
| | const queueItem = originalQueueItem as ClearedQueueItem |
| | queueItem.isCleared = true |
| | queueItem.callback = null |
| | queueItem.args = null |
| | queueItem.immediateObject = null |
| | } |
| |
|
| | |
| |
|
| | function patchedNextTick<TArgs extends any[]>( |
| | callback: (...args: TArgs) => void, |
| | ...args: TArgs |
| | ): void |
| | function patchedNextTick() { |
| | if (currentExecution === null) { |
| | return originalNextTick.apply( |
| | null, |
| | |
| | arguments |
| | ) |
| | } |
| |
|
| | if (arguments.length === 0 || typeof arguments[0] !== 'function') { |
| | |
| | |
| | originalNextTick.apply( |
| | null, |
| | |
| | arguments |
| | ) |
| |
|
| | |
| | bail( |
| | currentExecution, |
| | new InvariantError( |
| | 'Expected process.nextTick to reject invalid arguments' |
| | ) |
| | ) |
| | } |
| |
|
| | debug?.( |
| | `scheduler :: process.nextTick called (previous pending: ${pendingNextTicks})` |
| | ) |
| |
|
| | const callback: (...args: any[]) => any = arguments[0] |
| | const args: any[] | null = |
| | arguments.length > 1 ? Array.prototype.slice.call(arguments, 1) : null |
| |
|
| | pendingNextTicks += 1 |
| | return originalNextTick(safelyRunNextTickCallback, callback, args) |
| | } |
| |
|
| | function safelyRunNextTickCallback( |
| | callback: (...args: any[]) => any, |
| | args: any[] | null |
| | ) { |
| | pendingNextTicks -= 1 |
| | debug?.( |
| | `scheduler :: process.nextTick executing (still pending: ${pendingNextTicks})` |
| | ) |
| |
|
| | |
| | |
| | |
| | try { |
| | if (args !== null) { |
| | callback.apply(null, args) |
| | } else { |
| | callback() |
| | } |
| | } catch (err) { |
| | |
| | |
| | |
| | |
| | |
| | queueMicrotask(() => { |
| | debug?.(`scheduler :: rethrowing sync error from nextTick in a microtask`) |
| | throw err |
| | }) |
| | } |
| | } |
| |
|
| | function patchedSetImmediate<TArgs extends any[]>( |
| | callback: (...args: TArgs) => void, |
| | ...args: TArgs |
| | ): NodeJS.Immediate |
| | function patchedSetImmediate(callback: (args: void) => void): NodeJS.Immediate |
| | function patchedSetImmediate(): NodeJS.Immediate { |
| | if (currentExecution === null) { |
| | return originalSetImmediate.apply( |
| | null, |
| | |
| | arguments |
| | ) |
| | } |
| |
|
| | if (arguments.length === 0 || typeof arguments[0] !== 'function') { |
| | |
| | |
| | originalSetImmediate.apply( |
| | null, |
| | |
| | arguments |
| | ) |
| |
|
| | |
| | bail( |
| | currentExecution, |
| | new InvariantError('Expected setImmediate to reject invalid arguments') |
| | ) |
| | } |
| |
|
| | const callback: (...args: any[]) => any = arguments[0] |
| | const args: any[] | null = |
| | arguments.length > 1 ? Array.prototype.slice.call(arguments, 1) : null |
| |
|
| | |
| | |
| | const callbackWithAsyncContext = bindSnapshot(callback) |
| |
|
| | const immediateObject = new NextImmediate() |
| |
|
| | const queueItem: ActiveQueueItem = { |
| | isCleared: false, |
| | callback: callbackWithAsyncContext, |
| | args, |
| | immediateObject, |
| | } |
| | currentExecution.queuedImmediates.push(queueItem) |
| |
|
| | immediateObject[INTERNALS].queueItem = queueItem |
| |
|
| | return immediateObject |
| | } |
| |
|
| | function patchedSetImmediatePromise<T = void>( |
| | value: T, |
| | options?: import('node:timers').TimerOptions |
| | ): Promise<T> { |
| | if (currentExecution === null) { |
| | const originalPromisify: (typeof setImmediate)['__promisify__'] = |
| | |
| | originalSetImmediate[promisify.custom] |
| | return originalPromisify(value, options) |
| | } |
| |
|
| | return new Promise<T>((resolve, reject) => { |
| | |
| | |
| | const signal = options?.signal |
| | if (signal && signal.aborted) { |
| | return reject(signal.reason) |
| | } |
| |
|
| | const immediate = patchedSetImmediate(resolve, value) |
| |
|
| | |
| | |
| | if (options?.ref === false) { |
| | immediate.unref() |
| | } |
| |
|
| | if (signal) { |
| | signal.addEventListener( |
| | 'abort', |
| | () => { |
| | patchedClearImmediate(immediate) |
| | reject(signal.reason) |
| | }, |
| | { once: true } |
| | ) |
| | } |
| | }) |
| | } |
| |
|
| | patchedSetImmediate[promisify.custom] = patchedSetImmediatePromise |
| |
|
| | const patchedClearImmediate = ( |
| | immediateObject: NodeJS.Immediate | undefined |
| | ) => { |
| | |
| | |
| | |
| | |
| | |
| | if ( |
| | wasEnabledAtLeastOnce && |
| | immediateObject && |
| | typeof immediateObject === 'object' && |
| | INTERNALS in immediateObject |
| | ) { |
| | ;(immediateObject as NextImmediate)[Symbol.dispose]() |
| | } else { |
| | originalClearImmediate(immediateObject) |
| | } |
| | } |
| |
|
| | |
| |
|
| | const INTERNALS: unique symbol = Symbol.for('next.Immediate.internals') |
| |
|
| | type NextImmediateInternals = |
| | | { |
| | |
| | hasRef: boolean |
| | queueItem: ActiveQueueItem | null |
| | nativeImmediate: null |
| | } |
| | | { |
| | hasRef: null |
| | queueItem: null |
| | nativeImmediate: NodeJS.Immediate |
| | } |
| |
|
| | function proxyQueuedImmediateToNativeImmediate( |
| | immediateObject: NextImmediate, |
| | nativeImmediate: NodeJS.Immediate |
| | ) { |
| | immediateObject[INTERNALS].hasRef = null |
| | immediateObject[INTERNALS].queueItem = null |
| | immediateObject[INTERNALS].nativeImmediate = nativeImmediate |
| | } |
| |
|
| | |
| | interface NativeImmediate extends NodeJS.Immediate {} |
| |
|
| | |
| | class NextImmediate implements NativeImmediate { |
| | [INTERNALS]: NextImmediateInternals = { |
| | queueItem: null, |
| | hasRef: true, |
| | nativeImmediate: null, |
| | } |
| | hasRef() { |
| | const internals = this[INTERNALS] |
| | if (internals.queueItem) { |
| | return internals.hasRef |
| | } else if (internals.nativeImmediate) { |
| | return internals.nativeImmediate.hasRef() |
| | } else { |
| | |
| | return false |
| | } |
| | } |
| | ref() { |
| | const internals = this[INTERNALS] |
| | if (internals.queueItem) { |
| | internals.hasRef = true |
| | } else if (internals.nativeImmediate) { |
| | internals.nativeImmediate.ref() |
| | } |
| | return this |
| | } |
| | unref() { |
| | const internals = this[INTERNALS] |
| | if (internals.queueItem) { |
| | internals.hasRef = false |
| | } else if (internals.nativeImmediate) { |
| | internals.nativeImmediate.unref() |
| | } |
| | return this |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | _onImmediate() {} |
| |
|
| | [Symbol.dispose]() { |
| | |
| | const internals = this[INTERNALS] |
| | if (internals.queueItem) { |
| | |
| | const queueItem = internals.queueItem |
| | internals.queueItem = null |
| | clearQueueItem(queueItem) |
| | } else if (internals.nativeImmediate) { |
| | internals.nativeImmediate[Symbol.dispose]() |
| | } |
| | } |
| | } |
| |
|
| | |
| |
|
| | const debug = |
| | process.env.NEXT_DEBUG_IMMEDIATES !== '1' |
| | ? undefined |
| | : (...args: any[]) => { |
| | if (process.env.NEXT_RUNTIME === 'edge') { |
| | throw new InvariantError( |
| | 'Fast setImmediate is not available in the edge runtime.' |
| | ) |
| | } else { |
| | const { inspect } = require('node:util') as typeof import('node:util') |
| | const { writeFileSync } = |
| | require('node:fs') as typeof import('node:fs') |
| |
|
| | let logLine = |
| | args |
| | .map((arg) => |
| | typeof arg === 'string' ? arg : inspect(arg, { colors: true }) |
| | ) |
| | .join(' ') + '\n' |
| |
|
| | logLine = '\x1B[2m' + logLine + '\x1B[22m' |
| | writeFileSync(process.stdout.fd, logLine) |
| | } |
| | } |
| |
|
| | |
| |
|
| | install() |
| |
|