Spaces:
Sleeping
Sleeping
| ; | |
| import stream from 'stream'; | |
| import utils from '../utils.js'; | |
| const kInternals = Symbol('internals'); | |
| class AxiosTransformStream extends stream.Transform{ | |
| constructor(options) { | |
| options = utils.toFlatObject(options, { | |
| maxRate: 0, | |
| chunkSize: 64 * 1024, | |
| minChunkSize: 100, | |
| timeWindow: 500, | |
| ticksRate: 2, | |
| samplesCount: 15 | |
| }, null, (prop, source) => { | |
| return !utils.isUndefined(source[prop]); | |
| }); | |
| super({ | |
| readableHighWaterMark: options.chunkSize | |
| }); | |
| const internals = this[kInternals] = { | |
| timeWindow: options.timeWindow, | |
| chunkSize: options.chunkSize, | |
| maxRate: options.maxRate, | |
| minChunkSize: options.minChunkSize, | |
| bytesSeen: 0, | |
| isCaptured: false, | |
| notifiedBytesLoaded: 0, | |
| ts: Date.now(), | |
| bytes: 0, | |
| onReadCallback: null | |
| }; | |
| this.on('newListener', event => { | |
| if (event === 'progress') { | |
| if (!internals.isCaptured) { | |
| internals.isCaptured = true; | |
| } | |
| } | |
| }); | |
| } | |
| _read(size) { | |
| const internals = this[kInternals]; | |
| if (internals.onReadCallback) { | |
| internals.onReadCallback(); | |
| } | |
| return super._read(size); | |
| } | |
| _transform(chunk, encoding, callback) { | |
| const internals = this[kInternals]; | |
| const maxRate = internals.maxRate; | |
| const readableHighWaterMark = this.readableHighWaterMark; | |
| const timeWindow = internals.timeWindow; | |
| const divider = 1000 / timeWindow; | |
| const bytesThreshold = (maxRate / divider); | |
| const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0; | |
| const pushChunk = (_chunk, _callback) => { | |
| const bytes = Buffer.byteLength(_chunk); | |
| internals.bytesSeen += bytes; | |
| internals.bytes += bytes; | |
| internals.isCaptured && this.emit('progress', internals.bytesSeen); | |
| if (this.push(_chunk)) { | |
| process.nextTick(_callback); | |
| } else { | |
| internals.onReadCallback = () => { | |
| internals.onReadCallback = null; | |
| process.nextTick(_callback); | |
| }; | |
| } | |
| } | |
| const transformChunk = (_chunk, _callback) => { | |
| const chunkSize = Buffer.byteLength(_chunk); | |
| let chunkRemainder = null; | |
| let maxChunkSize = readableHighWaterMark; | |
| let bytesLeft; | |
| let passed = 0; | |
| if (maxRate) { | |
| const now = Date.now(); | |
| if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) { | |
| internals.ts = now; | |
| bytesLeft = bytesThreshold - internals.bytes; | |
| internals.bytes = bytesLeft < 0 ? -bytesLeft : 0; | |
| passed = 0; | |
| } | |
| bytesLeft = bytesThreshold - internals.bytes; | |
| } | |
| if (maxRate) { | |
| if (bytesLeft <= 0) { | |
| // next time window | |
| return setTimeout(() => { | |
| _callback(null, _chunk); | |
| }, timeWindow - passed); | |
| } | |
| if (bytesLeft < maxChunkSize) { | |
| maxChunkSize = bytesLeft; | |
| } | |
| } | |
| if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) { | |
| chunkRemainder = _chunk.subarray(maxChunkSize); | |
| _chunk = _chunk.subarray(0, maxChunkSize); | |
| } | |
| pushChunk(_chunk, chunkRemainder ? () => { | |
| process.nextTick(_callback, null, chunkRemainder); | |
| } : _callback); | |
| }; | |
| transformChunk(chunk, function transformNextChunk(err, _chunk) { | |
| if (err) { | |
| return callback(err); | |
| } | |
| if (_chunk) { | |
| transformChunk(_chunk, transformNextChunk); | |
| } else { | |
| callback(null); | |
| } | |
| }); | |
| } | |
| } | |
| export default AxiosTransformStream; | |