Buckets:
| const EE = require('node:events') | |
| const { pipeline, PassThrough } = require('node:stream') | |
| const pino = require('../pino.js') | |
| const build = require('pino-abstract-transport') | |
| const loadTransportStreamBuilder = require('./transport-stream') | |
| // This file is not checked by the code coverage tool, | |
| // as it is not reliable. | |
| /* istanbul ignore file */ | |
| /* | |
| * > Multiple targets & pipelines | |
| * | |
| * | |
| * ┌─────────────────────────────────────────────────┐ ┌─────┐ | |
| * │ │ │ p │ | |
| * │ │ │ i │ | |
| * │ target │ │ n │ | |
| * │ │ ────────────────────────────────┼────┤ o │ | |
| * │ targets │ target │ │ . │ | |
| * │ ────────────► │ ────────────────────────────────┼────┤ m │ source | |
| * │ │ target │ │ u │ │ | |
| * │ │ ────────────────────────────────┼────┤ l │ │write | |
| * │ │ │ │ t │ ▼ | |
| * │ │ pipeline ┌───────────────┐ │ │ i │ ┌────────┐ | |
| * │ │ ──────────► │ PassThrough ├───┼────┤ s ├──────┤ │ | |
| * │ │ └───────────────┘ │ │ t │ write│ Thread │ | |
| * │ │ │ │ r │◄─────┤ Stream │ | |
| * │ │ pipeline ┌───────────────┐ │ │ e │ │ │ | |
| * │ │ ──────────► │ PassThrough ├───┼────┤ a │ └────────┘ | |
| * │ └───────────────┘ │ │ m │ | |
| * │ │ │ │ | |
| * └─────────────────────────────────────────────────┘ └─────┘ | |
| * | |
| * | |
| * | |
| * > One single pipeline or target | |
| * | |
| * | |
| * source | |
| * │ | |
| * ┌────────────────────────────────────────────────┐ │write | |
| * │ │ ▼ | |
| * │ │ ┌────────┐ | |
| * │ targets │ target │ │ │ | |
| * │ ────────────► │ ──────────────────────────────┤ │ │ | |
| * │ │ │ │ │ | |
| * │ ├──────┤ │ | |
| * │ │ │ │ | |
| * │ │ │ │ | |
| * │ OR │ │ │ | |
| * │ │ │ │ | |
| * │ │ │ │ | |
| * │ ┌──────────────┐ │ │ │ | |
| * │ targets │ pipeline │ │ │ │ Thread │ | |
| * │ ────────────► │ ────────────►│ PassThrough ├─┤ │ Stream │ | |
| * │ │ │ │ │ │ │ | |
| * │ └──────────────┘ │ │ │ | |
| * │ │ │ │ | |
| * │ OR │ write│ │ | |
| * │ │◄─────┤ │ | |
| * │ │ │ │ | |
| * │ ┌──────────────┐ │ │ │ | |
| * │ pipeline │ │ │ │ │ | |
| * │ ──────────────►│ PassThrough ├────────────────┤ │ │ | |
| * │ │ │ │ │ │ | |
| * │ └──────────────┘ │ └────────┘ | |
| * │ │ | |
| * │ │ | |
| * └────────────────────────────────────────────────┘ | |
| */ | |
| module.exports = async function ({ targets, pipelines, levels, dedupe }) { | |
| const targetStreams = [] | |
| // Process targets | |
| if (targets && targets.length) { | |
| targets = await Promise.all(targets.map(async (t) => { | |
| const fn = await loadTransportStreamBuilder(t.target) | |
| const stream = await fn(t.options) | |
| return { | |
| level: t.level, | |
| stream | |
| } | |
| })) | |
| targetStreams.push(...targets) | |
| } | |
| // Process pipelines | |
| if (pipelines && pipelines.length) { | |
| pipelines = await Promise.all( | |
| pipelines.map(async (p) => { | |
| let level | |
| const pipeDests = await Promise.all( | |
| p.map(async (t) => { | |
| // level assigned to pipeline is duplicated over all its targets, just store it | |
| level = t.level | |
| const fn = await loadTransportStreamBuilder(t.target) | |
| const stream = await fn(t.options) | |
| return stream | |
| } | |
| )) | |
| return { | |
| level, | |
| stream: createPipeline(pipeDests) | |
| } | |
| }) | |
| ) | |
| targetStreams.push(...pipelines) | |
| } | |
| // Skip building the multistream step if either one single pipeline or target is defined and | |
| // return directly the stream instance back to TreadStream. | |
| // This is equivalent to define either: | |
| // | |
| // pino.transport({ target: ... }) | |
| // | |
| // OR | |
| // | |
| // pino.transport({ pipeline: ... }) | |
| if (targetStreams.length === 1) { | |
| return targetStreams[0].stream | |
| } else { | |
| return build(process, { | |
| parse: 'lines', | |
| metadata: true, | |
| close (err, cb) { | |
| let expected = 0 | |
| for (const transport of targetStreams) { | |
| expected++ | |
| transport.stream.on('close', closeCb) | |
| transport.stream.end() | |
| } | |
| function closeCb () { | |
| if (--expected === 0) { | |
| cb(err) | |
| } | |
| } | |
| } | |
| }) | |
| } | |
| // TODO: Why split2 was not used for pipelines? | |
| function process (stream) { | |
| const multi = pino.multistream(targetStreams, { levels, dedupe }) | |
| // TODO manage backpressure | |
| stream.on('data', function (chunk) { | |
| const { lastTime, lastMsg, lastObj, lastLevel } = this | |
| multi.lastLevel = lastLevel | |
| multi.lastTime = lastTime | |
| multi.lastMsg = lastMsg | |
| multi.lastObj = lastObj | |
| // TODO handle backpressure | |
| multi.write(chunk + '\n') | |
| }) | |
| } | |
| /** | |
| * Creates a pipeline using the provided streams and return an instance of `PassThrough` stream | |
| * as a source for the pipeline. | |
| * | |
| * @param {(TransformStream|WritableStream)[]} streams An array of streams. | |
| * All intermediate streams in the array *MUST* be `Transform` streams and only the last one `Writable`. | |
| * @returns A `PassThrough` stream instance representing the source stream of the pipeline | |
| */ | |
| function createPipeline (streams) { | |
| const ee = new EE() | |
| const stream = new PassThrough({ | |
| autoDestroy: true, | |
| destroy (_, cb) { | |
| ee.on('error', cb) | |
| ee.on('closed', cb) | |
| } | |
| }) | |
| pipeline(stream, ...streams, function (err) { | |
| if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { | |
| ee.emit('error', err) | |
| return | |
| } | |
| ee.emit('closed') | |
| }) | |
| return stream | |
| } | |
| } | |
Xet Storage Details
- Size:
- 8.96 kB
- Xet hash:
- be9640e5482c4dbc42026a8df89212de2cda2a1aad79f38574380f9f66f52310
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.