File size: 3,229 Bytes
00df61d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | 'use strict'
var PassThrough = require('readable-stream').PassThrough
var inherits = require('inherits')
var p = require('process-nextick-args')
function Cloneable (stream, opts) {
if (!(this instanceof Cloneable)) {
return new Cloneable(stream, opts)
}
var objectMode = stream._readableState.objectMode
this._original = stream
this._clonesCount = 1
opts = opts || {}
opts.objectMode = objectMode
PassThrough.call(this, opts)
forwardDestroy(stream, this)
this.on('newListener', onData)
this.once('resume', onResume)
this._hasListener = true
}
inherits(Cloneable, PassThrough)
function onData (event, listener) {
if (event === 'data' || event === 'readable') {
this._hasListener = false
this.removeListener('newListener', onData)
this.removeListener('resume', onResume)
p.nextTick(clonePiped, this)
}
}
function onResume () {
this._hasListener = false
this.removeListener('newListener', onData)
p.nextTick(clonePiped, this)
}
Cloneable.prototype.clone = function () {
if (!this._original) {
throw new Error('already started')
}
this._clonesCount++
// the events added by the clone should not count
// for starting the flow
this.removeListener('newListener', onData)
var clone = new Clone(this)
if (this._hasListener) {
this.on('newListener', onData)
}
return clone
}
Cloneable.prototype._destroy = function (err, cb) {
if (!err) {
this.push(null)
this.end()
this.emit('close')
}
p.nextTick(cb, err)
}
function forwardDestroy (src, dest) {
src.on('error', destroy)
src.on('close', onClose)
function destroy (err) {
src.removeListener('close', onClose)
dest.destroy(err)
}
function onClose () {
dest.end()
}
}
function clonePiped (that) {
if (--that._clonesCount === 0 && !that._readableState.destroyed) {
that._original.pipe(that)
that._original = undefined
}
}
function Clone (parent, opts) {
if (!(this instanceof Clone)) {
return new Clone(parent, opts)
}
var objectMode = parent._readableState.objectMode
opts = opts || {}
opts.objectMode = objectMode
this.parent = parent
PassThrough.call(this, opts)
forwardDestroy(parent, this)
parent.pipe(this)
// the events added by the clone should not count
// for starting the flow
// so we add the newListener handle after we are done
this.on('newListener', onDataClone)
this.on('resume', onResumeClone)
}
function onDataClone (event, listener) {
// We start the flow once all clones are piped or destroyed
if (event === 'data' || event === 'readable' || event === 'close') {
p.nextTick(clonePiped, this.parent)
this.removeListener('newListener', onDataClone)
}
}
function onResumeClone () {
this.removeListener('newListener', onDataClone)
p.nextTick(clonePiped, this.parent)
}
inherits(Clone, PassThrough)
Clone.prototype.clone = function () {
return this.parent.clone()
}
Cloneable.isCloneable = function (stream) {
return stream instanceof Cloneable || stream instanceof Clone
}
Clone.prototype._destroy = function (err, cb) {
if (!err) {
this.push(null)
this.end()
this.emit('close')
}
p.nextTick(cb, err)
}
module.exports = Cloneable
|