Buckets:
| var PassThrough = require('readable-stream').PassThrough | |
| var inherits = require('inherits') | |
| var p = require('process-nextick-args') | |
| function Cloneable (stream, opts) { | |
| if (!(this instanceof Cloneable)) { | |
| return new Cloneable(stream, opts) | |
| } | |
| var objectMode = stream._readableState.objectMode | |
| this._original = stream | |
| this._clonesCount = 1 | |
| opts = opts || {} | |
| opts.objectMode = objectMode | |
| PassThrough.call(this, opts) | |
| forwardDestroy(stream, this) | |
| this.on('newListener', onData) | |
| this.once('resume', onResume) | |
| this._hasListener = true | |
| } | |
| inherits(Cloneable, PassThrough) | |
| function onData (event, listener) { | |
| if (event === 'data' || event === 'readable') { | |
| this._hasListener = false | |
| this.removeListener('newListener', onData) | |
| this.removeListener('resume', onResume) | |
| p.nextTick(clonePiped, this) | |
| } | |
| } | |
| function onResume () { | |
| this._hasListener = false | |
| this.removeListener('newListener', onData) | |
| p.nextTick(clonePiped, this) | |
| } | |
| Cloneable.prototype.clone = function () { | |
| if (!this._original) { | |
| throw new Error('already started') | |
| } | |
| this._clonesCount++ | |
| // the events added by the clone should not count | |
| // for starting the flow | |
| this.removeListener('newListener', onData) | |
| var clone = new Clone(this) | |
| if (this._hasListener) { | |
| this.on('newListener', onData) | |
| } | |
| return clone | |
| } | |
| Cloneable.prototype._destroy = function (err, cb) { | |
| if (!err) { | |
| this.push(null) | |
| this.end() | |
| this.emit('close') | |
| } | |
| p.nextTick(cb, err) | |
| } | |
| function forwardDestroy (src, dest) { | |
| src.on('error', destroy) | |
| src.on('close', onClose) | |
| function destroy (err) { | |
| src.removeListener('close', onClose) | |
| dest.destroy(err) | |
| } | |
| function onClose () { | |
| dest.end() | |
| } | |
| } | |
| function clonePiped (that) { | |
| if (--that._clonesCount === 0 && !that._readableState.destroyed) { | |
| that._original.pipe(that) | |
| that._original = undefined | |
| } | |
| } | |
| function Clone (parent, opts) { | |
| if (!(this instanceof Clone)) { | |
| return new Clone(parent, opts) | |
| } | |
| var objectMode = parent._readableState.objectMode | |
| opts = opts || {} | |
| opts.objectMode = objectMode | |
| this.parent = parent | |
| PassThrough.call(this, opts) | |
| forwardDestroy(parent, this) | |
| parent.pipe(this) | |
| // the events added by the clone should not count | |
| // for starting the flow | |
| // so we add the newListener handle after we are done | |
| this.on('newListener', onDataClone) | |
| this.on('resume', onResumeClone) | |
| } | |
| function onDataClone (event, listener) { | |
| // We start the flow once all clones are piped or destroyed | |
| if (event === 'data' || event === 'readable' || event === 'close') { | |
| p.nextTick(clonePiped, this.parent) | |
| this.removeListener('newListener', onDataClone) | |
| } | |
| } | |
| function onResumeClone () { | |
| this.removeListener('newListener', onDataClone) | |
| p.nextTick(clonePiped, this.parent) | |
| } | |
| inherits(Clone, PassThrough) | |
| Clone.prototype.clone = function () { | |
| return this.parent.clone() | |
| } | |
| Cloneable.isCloneable = function (stream) { | |
| return stream instanceof Cloneable || stream instanceof Clone | |
| } | |
| Clone.prototype._destroy = function (err, cb) { | |
| if (!err) { | |
| this.push(null) | |
| this.end() | |
| this.emit('close') | |
| } | |
| p.nextTick(cb, err) | |
| } | |
| module.exports = Cloneable | |
Xet Storage Details
- Size:
- 3.23 kB
- Xet hash:
- bdb521f48859278580a9d0ef21245b1f0a63769110c00e5b3621f27a00abf8d7
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.