Buckets:
| const { realImport, realRequire } = require('real-require') | |
| const { workerData, parentPort } = require('worker_threads') | |
| const { WRITE_INDEX, READ_INDEX } = require('./indexes') | |
| const { waitDiff } = require('./wait') | |
| const { | |
| dataBuf, | |
| filename, | |
| stateBuf | |
| } = workerData | |
| let destination | |
| const state = new Int32Array(stateBuf) | |
| const data = Buffer.from(dataBuf) | |
| // Keep the event loop alive - Atomics.waitAsync promises don't prevent worker exit | |
| const keepAlive = setInterval(() => {}, 60 * 60 * 1000) | |
| async function start () { | |
| let worker | |
| try { | |
| worker = (await realImport(filename)) | |
| } catch (error) { | |
| // A yarn user that tries to start a ThreadStream for an external module | |
| // provides a filename pointing to a zip file. | |
| // eg. require.resolve('pino-elasticsearch') // returns /foo/pino-elasticsearch-npm-6.1.0-0c03079478-6915435172.zip/bar.js | |
| // The `import` will fail to try to load it. | |
| // This catch block executes the `require` fallback to load the module correctly. | |
| // In fact, yarn modifies the `require` function to manage the zipped path. | |
| // More details at https://github.com/pinojs/pino/pull/1113 | |
| // The error codes may change based on the node.js version (ENOTDIR > 12, ERR_MODULE_NOT_FOUND <= 12 ) | |
| if ((error.code === 'ENOTDIR' || error.code === 'ERR_MODULE_NOT_FOUND') && | |
| filename.startsWith('file://')) { | |
| worker = realRequire(decodeURIComponent(filename.replace('file://', ''))) | |
| } else if (error.code === undefined || error.code === 'ERR_VM_DYNAMIC_IMPORT_CALLBACK_MISSING') { | |
| // When bundled with pkg, an undefined error is thrown when called with realImport | |
| // When bundled with pkg and using node v20, an ERR_VM_DYNAMIC_IMPORT_CALLBACK_MISSING error is thrown when called with realImport | |
| // More info at: https://github.com/pinojs/thread-stream/issues/143 | |
| try { | |
| worker = realRequire(decodeURIComponent(filename.replace(process.platform === 'win32' ? 'file:///' : 'file://', ''))) | |
| } catch { | |
| throw error | |
| } | |
| } else if (filename.endsWith('.ts') || filename.endsWith('.cts')) { | |
| // Native TypeScript import failed (type stripping not enabled). | |
| // Fall back to ts-node for TypeScript files. | |
| try { | |
| if (!process[Symbol.for('ts-node.register.instance')]) { | |
| realRequire('ts-node/register') | |
| } else if (process.env.TS_NODE_DEV) { | |
| realRequire('ts-node-dev') | |
| } | |
| worker = realRequire(decodeURIComponent(filename.replace(process.platform === 'win32' ? 'file:///' : 'file://', ''))) | |
| } catch { | |
| throw error | |
| } | |
| } else { | |
| throw error | |
| } | |
| } | |
| // Depending on how the default export is performed, and on how the code is | |
| // transpiled, we may find cases of two nested "default" objects. | |
| // See https://github.com/pinojs/pino/issues/1243#issuecomment-982774762 | |
| if (typeof worker === 'object') worker = worker.default | |
| if (typeof worker === 'object') worker = worker.default | |
| destination = await worker(workerData.workerData) | |
| destination.on('error', function (err) { | |
| Atomics.store(state, WRITE_INDEX, -2) | |
| Atomics.notify(state, WRITE_INDEX) | |
| Atomics.store(state, READ_INDEX, -2) | |
| Atomics.notify(state, READ_INDEX) | |
| parentPort.postMessage({ | |
| code: 'ERROR', | |
| err | |
| }) | |
| }) | |
| destination.on('close', function () { | |
| // process._rawDebug('worker close emitted') | |
| const end = Atomics.load(state, WRITE_INDEX) | |
| Atomics.store(state, READ_INDEX, end) | |
| Atomics.notify(state, READ_INDEX) | |
| clearInterval(keepAlive) | |
| setImmediate(() => { | |
| process.exit(0) | |
| }) | |
| }) | |
| } | |
| // No .catch() handler, | |
| // in case there is an error it goes | |
| // to unhandledRejection | |
| start().then(function () { | |
| parentPort.postMessage({ | |
| code: 'READY' | |
| }) | |
| process.nextTick(run) | |
| }) | |
| function run () { | |
| const current = Atomics.load(state, READ_INDEX) | |
| const end = Atomics.load(state, WRITE_INDEX) | |
| // process._rawDebug(`pre state ${current} ${end}`) | |
| if (end === current) { | |
| if (end === data.length) { | |
| waitDiff(state, READ_INDEX, end, Infinity, run) | |
| } else { | |
| waitDiff(state, WRITE_INDEX, end, Infinity, run) | |
| } | |
| return | |
| } | |
| // process._rawDebug(`post state ${current} ${end}`) | |
| if (end === -1) { | |
| // process._rawDebug('end') | |
| destination.end() | |
| return | |
| } | |
| const toWrite = data.toString('utf8', current, end) | |
| // process._rawDebug('worker writing: ' + toWrite) | |
| const res = destination.write(toWrite) | |
| if (res) { | |
| Atomics.store(state, READ_INDEX, end) | |
| Atomics.notify(state, READ_INDEX) | |
| setImmediate(run) | |
| } else { | |
| destination.once('drain', function () { | |
| Atomics.store(state, READ_INDEX, end) | |
| Atomics.notify(state, READ_INDEX) | |
| run() | |
| }) | |
| } | |
| } | |
| process.on('unhandledRejection', function (err) { | |
| parentPort.postMessage({ | |
| code: 'ERROR', | |
| err | |
| }) | |
| process.exit(1) | |
| }) | |
| process.on('uncaughtException', function (err) { | |
| parentPort.postMessage({ | |
| code: 'ERROR', | |
| err | |
| }) | |
| process.exit(1) | |
| }) | |
| process.once('exit', exitCode => { | |
| if (exitCode !== 0) { | |
| process.exit(exitCode) | |
| return | |
| } | |
| if (destination?.writableNeedDrain && !destination?.writableEnded) { | |
| parentPort.postMessage({ | |
| code: 'WARNING', | |
| err: new Error('ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream') | |
| }) | |
| } | |
| process.exit(0) | |
| }) | |
Xet Storage Details
- Size:
- 5.49 kB
- Xet hash:
- eef804ba0c41af7eeb8f880c4fe0d118ebd5bd5957402a46ad5024a5389c39a1
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.