| | var util = require('util'); |
| | var Stream = require('stream').Stream; |
| | var DelayedStream = require('delayed-stream'); |
| |
|
| | module.exports = CombinedStream; |
| | function CombinedStream() { |
| | this.writable = false; |
| | this.readable = true; |
| | this.dataSize = 0; |
| | this.maxDataSize = 2 * 1024 * 1024; |
| | this.pauseStreams = true; |
| |
|
| | this._released = false; |
| | this._streams = []; |
| | this._currentStream = null; |
| | this._insideLoop = false; |
| | this._pendingNext = false; |
| | } |
| | util.inherits(CombinedStream, Stream); |
| |
|
| | CombinedStream.create = function(options) { |
| | var combinedStream = new this(); |
| |
|
| | options = options || {}; |
| | for (var option in options) { |
| | combinedStream[option] = options[option]; |
| | } |
| |
|
| | return combinedStream; |
| | }; |
| |
|
| | CombinedStream.isStreamLike = function(stream) { |
| | return (typeof stream !== 'function') |
| | && (typeof stream !== 'string') |
| | && (typeof stream !== 'boolean') |
| | && (typeof stream !== 'number') |
| | && (!Buffer.isBuffer(stream)); |
| | }; |
| |
|
| | CombinedStream.prototype.append = function(stream) { |
| | var isStreamLike = CombinedStream.isStreamLike(stream); |
| |
|
| | if (isStreamLike) { |
| | if (!(stream instanceof DelayedStream)) { |
| | var newStream = DelayedStream.create(stream, { |
| | maxDataSize: Infinity, |
| | pauseStream: this.pauseStreams, |
| | }); |
| | stream.on('data', this._checkDataSize.bind(this)); |
| | stream = newStream; |
| | } |
| |
|
| | this._handleErrors(stream); |
| |
|
| | if (this.pauseStreams) { |
| | stream.pause(); |
| | } |
| | } |
| |
|
| | this._streams.push(stream); |
| | return this; |
| | }; |
| |
|
| | CombinedStream.prototype.pipe = function(dest, options) { |
| | Stream.prototype.pipe.call(this, dest, options); |
| | this.resume(); |
| | return dest; |
| | }; |
| |
|
| | CombinedStream.prototype._getNext = function() { |
| | this._currentStream = null; |
| |
|
| | if (this._insideLoop) { |
| | this._pendingNext = true; |
| | return; |
| | } |
| |
|
| | this._insideLoop = true; |
| | try { |
| | do { |
| | this._pendingNext = false; |
| | this._realGetNext(); |
| | } while (this._pendingNext); |
| | } finally { |
| | this._insideLoop = false; |
| | } |
| | }; |
| |
|
| | CombinedStream.prototype._realGetNext = function() { |
| | var stream = this._streams.shift(); |
| |
|
| |
|
| | if (typeof stream == 'undefined') { |
| | this.end(); |
| | return; |
| | } |
| |
|
| | if (typeof stream !== 'function') { |
| | this._pipeNext(stream); |
| | return; |
| | } |
| |
|
| | var getStream = stream; |
| | getStream(function(stream) { |
| | var isStreamLike = CombinedStream.isStreamLike(stream); |
| | if (isStreamLike) { |
| | stream.on('data', this._checkDataSize.bind(this)); |
| | this._handleErrors(stream); |
| | } |
| |
|
| | this._pipeNext(stream); |
| | }.bind(this)); |
| | }; |
| |
|
| | CombinedStream.prototype._pipeNext = function(stream) { |
| | this._currentStream = stream; |
| |
|
| | var isStreamLike = CombinedStream.isStreamLike(stream); |
| | if (isStreamLike) { |
| | stream.on('end', this._getNext.bind(this)); |
| | stream.pipe(this, {end: false}); |
| | return; |
| | } |
| |
|
| | var value = stream; |
| | this.write(value); |
| | this._getNext(); |
| | }; |
| |
|
| | CombinedStream.prototype._handleErrors = function(stream) { |
| | var self = this; |
| | stream.on('error', function(err) { |
| | self._emitError(err); |
| | }); |
| | }; |
| |
|
| | CombinedStream.prototype.write = function(data) { |
| | this.emit('data', data); |
| | }; |
| |
|
| | CombinedStream.prototype.pause = function() { |
| | if (!this.pauseStreams) { |
| | return; |
| | } |
| |
|
| | if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause(); |
| | this.emit('pause'); |
| | }; |
| |
|
| | CombinedStream.prototype.resume = function() { |
| | if (!this._released) { |
| | this._released = true; |
| | this.writable = true; |
| | this._getNext(); |
| | } |
| |
|
| | if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume(); |
| | this.emit('resume'); |
| | }; |
| |
|
| | CombinedStream.prototype.end = function() { |
| | this._reset(); |
| | this.emit('end'); |
| | }; |
| |
|
| | CombinedStream.prototype.destroy = function() { |
| | this._reset(); |
| | this.emit('close'); |
| | }; |
| |
|
| | CombinedStream.prototype._reset = function() { |
| | this.writable = false; |
| | this._streams = []; |
| | this._currentStream = null; |
| | }; |
| |
|
| | CombinedStream.prototype._checkDataSize = function() { |
| | this._updateDataSize(); |
| | if (this.dataSize <= this.maxDataSize) { |
| | return; |
| | } |
| |
|
| | var message = |
| | 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'; |
| | this._emitError(new Error(message)); |
| | }; |
| |
|
| | CombinedStream.prototype._updateDataSize = function() { |
| | this.dataSize = 0; |
| |
|
| | var self = this; |
| | this._streams.forEach(function(stream) { |
| | if (!stream.dataSize) { |
| | return; |
| | } |
| |
|
| | self.dataSize += stream.dataSize; |
| | }); |
| |
|
| | if (this._currentStream && this._currentStream.dataSize) { |
| | this.dataSize += this._currentStream.dataSize; |
| | } |
| | }; |
| |
|
| | CombinedStream.prototype._emitError = function(err) { |
| | this._reset(); |
| | this.emit('error', err); |
| | }; |
| |
|