Spaces:
Runtime error
Runtime error
| ; | |
| import stream from 'stream'; | |
| import utils from '../utils.js'; | |
| import throttle from './throttle.js'; | |
| import speedometer from './speedometer.js'; | |
| const kInternals = Symbol('internals'); | |
| class AxiosTransformStream extends stream.Transform{ | |
| constructor(options) { | |
| options = utils.toFlatObject(options, { | |
| maxRate: 0, | |
| chunkSize: 64 * 1024, | |
| minChunkSize: 100, | |
| timeWindow: 500, | |
| ticksRate: 2, | |
| samplesCount: 15 | |
| }, null, (prop, source) => { | |
| return !utils.isUndefined(source[prop]); | |
| }); | |
| super({ | |
| readableHighWaterMark: options.chunkSize | |
| }); | |
| const self = this; | |
| const internals = this[kInternals] = { | |
| length: options.length, | |
| timeWindow: options.timeWindow, | |
| ticksRate: options.ticksRate, | |
| chunkSize: options.chunkSize, | |
| maxRate: options.maxRate, | |
| minChunkSize: options.minChunkSize, | |
| bytesSeen: 0, | |
| isCaptured: false, | |
| notifiedBytesLoaded: 0, | |
| ts: Date.now(), | |
| bytes: 0, | |
| onReadCallback: null | |
| }; | |
| const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow); | |
| this.on('newListener', event => { | |
| if (event === 'progress') { | |
| if (!internals.isCaptured) { | |
| internals.isCaptured = true; | |
| } | |
| } | |
| }); | |
| let bytesNotified = 0; | |
| internals.updateProgress = throttle(function throttledHandler() { | |
| const totalBytes = internals.length; | |
| const bytesTransferred = internals.bytesSeen; | |
| const progressBytes = bytesTransferred - bytesNotified; | |
| if (!progressBytes || self.destroyed) return; | |
| const rate = _speedometer(progressBytes); | |
| bytesNotified = bytesTransferred; | |
| process.nextTick(() => { | |
| self.emit('progress', { | |
| 'loaded': bytesTransferred, | |
| 'total': totalBytes, | |
| 'progress': totalBytes ? (bytesTransferred / totalBytes) : undefined, | |
| 'bytes': progressBytes, | |
| 'rate': rate ? rate : undefined, | |
| 'estimated': rate && totalBytes && bytesTransferred <= totalBytes ? | |
| (totalBytes - bytesTransferred) / rate : undefined | |
| }); | |
| }); | |
| }, internals.ticksRate); | |
| const onFinish = () => { | |
| internals.updateProgress(true); | |
| }; | |
| this.once('end', onFinish); | |
| this.once('error', onFinish); | |
| } | |
| _read(size) { | |
| const internals = this[kInternals]; | |
| if (internals.onReadCallback) { | |
| internals.onReadCallback(); | |
| } | |
| return super._read(size); | |
| } | |
| _transform(chunk, encoding, callback) { | |
| const self = this; | |
| const internals = this[kInternals]; | |
| const maxRate = internals.maxRate; | |
| const readableHighWaterMark = this.readableHighWaterMark; | |
| const timeWindow = internals.timeWindow; | |
| const divider = 1000 / timeWindow; | |
| const bytesThreshold = (maxRate / divider); | |
| const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0; | |
| function pushChunk(_chunk, _callback) { | |
| const bytes = Buffer.byteLength(_chunk); | |
| internals.bytesSeen += bytes; | |
| internals.bytes += bytes; | |
| if (internals.isCaptured) { | |
| internals.updateProgress(); | |
| } | |
| if (self.push(_chunk)) { | |
| process.nextTick(_callback); | |
| } else { | |
| internals.onReadCallback = () => { | |
| internals.onReadCallback = null; | |
| process.nextTick(_callback); | |
| }; | |
| } | |
| } | |
| const transformChunk = (_chunk, _callback) => { | |
| const chunkSize = Buffer.byteLength(_chunk); | |
| let chunkRemainder = null; | |
| let maxChunkSize = readableHighWaterMark; | |
| let bytesLeft; | |
| let passed = 0; | |
| if (maxRate) { | |
| const now = Date.now(); | |
| if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) { | |
| internals.ts = now; | |
| bytesLeft = bytesThreshold - internals.bytes; | |
| internals.bytes = bytesLeft < 0 ? -bytesLeft : 0; | |
| passed = 0; | |
| } | |
| bytesLeft = bytesThreshold - internals.bytes; | |
| } | |
| if (maxRate) { | |
| if (bytesLeft <= 0) { | |
| // next time window | |
| return setTimeout(() => { | |
| _callback(null, _chunk); | |
| }, timeWindow - passed); | |
| } | |
| if (bytesLeft < maxChunkSize) { | |
| maxChunkSize = bytesLeft; | |
| } | |
| } | |
| if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) { | |
| chunkRemainder = _chunk.subarray(maxChunkSize); | |
| _chunk = _chunk.subarray(0, maxChunkSize); | |
| } | |
| pushChunk(_chunk, chunkRemainder ? () => { | |
| process.nextTick(_callback, null, chunkRemainder); | |
| } : _callback); | |
| }; | |
| transformChunk(chunk, function transformNextChunk(err, _chunk) { | |
| if (err) { | |
| return callback(err); | |
| } | |
| if (_chunk) { | |
| transformChunk(_chunk, transformNextChunk); | |
| } else { | |
| callback(null); | |
| } | |
| }); | |
| } | |
| setLength(length) { | |
| this[kInternals].length = +length; | |
| return this; | |
| } | |
| } | |
| export default AxiosTransformStream; | |