144 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			144 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
'use strict';
 | 
						|
 | 
						|
import stream from 'stream';
 | 
						|
import utils from '../utils.js';
 | 
						|
 | 
						|
const kInternals = Symbol('internals');
 | 
						|
 | 
						|
class AxiosTransformStream extends stream.Transform{
 | 
						|
  constructor(options) {
 | 
						|
    options = utils.toFlatObject(options, {
 | 
						|
      maxRate: 0,
 | 
						|
      chunkSize: 64 * 1024,
 | 
						|
      minChunkSize: 100,
 | 
						|
      timeWindow: 500,
 | 
						|
      ticksRate: 2,
 | 
						|
      samplesCount: 15
 | 
						|
    }, null, (prop, source) => {
 | 
						|
      return !utils.isUndefined(source[prop]);
 | 
						|
    });
 | 
						|
 | 
						|
    super({
 | 
						|
      readableHighWaterMark: options.chunkSize
 | 
						|
    });
 | 
						|
 | 
						|
    const internals = this[kInternals] = {
 | 
						|
      timeWindow: options.timeWindow,
 | 
						|
      chunkSize: options.chunkSize,
 | 
						|
      maxRate: options.maxRate,
 | 
						|
      minChunkSize: options.minChunkSize,
 | 
						|
      bytesSeen: 0,
 | 
						|
      isCaptured: false,
 | 
						|
      notifiedBytesLoaded: 0,
 | 
						|
      ts: Date.now(),
 | 
						|
      bytes: 0,
 | 
						|
      onReadCallback: null
 | 
						|
    };
 | 
						|
 | 
						|
    this.on('newListener', event => {
 | 
						|
      if (event === 'progress') {
 | 
						|
        if (!internals.isCaptured) {
 | 
						|
          internals.isCaptured = true;
 | 
						|
        }
 | 
						|
      }
 | 
						|
    });
 | 
						|
  }
 | 
						|
 | 
						|
  _read(size) {
 | 
						|
    const internals = this[kInternals];
 | 
						|
 | 
						|
    if (internals.onReadCallback) {
 | 
						|
      internals.onReadCallback();
 | 
						|
    }
 | 
						|
 | 
						|
    return super._read(size);
 | 
						|
  }
 | 
						|
 | 
						|
  _transform(chunk, encoding, callback) {
 | 
						|
    const internals = this[kInternals];
 | 
						|
    const maxRate = internals.maxRate;
 | 
						|
 | 
						|
    const readableHighWaterMark = this.readableHighWaterMark;
 | 
						|
 | 
						|
    const timeWindow = internals.timeWindow;
 | 
						|
 | 
						|
    const divider = 1000 / timeWindow;
 | 
						|
    const bytesThreshold = (maxRate / divider);
 | 
						|
    const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
 | 
						|
 | 
						|
    const pushChunk = (_chunk, _callback) => {
 | 
						|
      const bytes = Buffer.byteLength(_chunk);
 | 
						|
      internals.bytesSeen += bytes;
 | 
						|
      internals.bytes += bytes;
 | 
						|
 | 
						|
      internals.isCaptured && this.emit('progress', internals.bytesSeen);
 | 
						|
 | 
						|
      if (this.push(_chunk)) {
 | 
						|
        process.nextTick(_callback);
 | 
						|
      } else {
 | 
						|
        internals.onReadCallback = () => {
 | 
						|
          internals.onReadCallback = null;
 | 
						|
          process.nextTick(_callback);
 | 
						|
        };
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    const transformChunk = (_chunk, _callback) => {
 | 
						|
      const chunkSize = Buffer.byteLength(_chunk);
 | 
						|
      let chunkRemainder = null;
 | 
						|
      let maxChunkSize = readableHighWaterMark;
 | 
						|
      let bytesLeft;
 | 
						|
      let passed = 0;
 | 
						|
 | 
						|
      if (maxRate) {
 | 
						|
        const now = Date.now();
 | 
						|
 | 
						|
        if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
 | 
						|
          internals.ts = now;
 | 
						|
          bytesLeft = bytesThreshold - internals.bytes;
 | 
						|
          internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
 | 
						|
          passed = 0;
 | 
						|
        }
 | 
						|
 | 
						|
        bytesLeft = bytesThreshold - internals.bytes;
 | 
						|
      }
 | 
						|
 | 
						|
      if (maxRate) {
 | 
						|
        if (bytesLeft <= 0) {
 | 
						|
          // next time window
 | 
						|
          return setTimeout(() => {
 | 
						|
            _callback(null, _chunk);
 | 
						|
          }, timeWindow - passed);
 | 
						|
        }
 | 
						|
 | 
						|
        if (bytesLeft < maxChunkSize) {
 | 
						|
          maxChunkSize = bytesLeft;
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
 | 
						|
        chunkRemainder = _chunk.subarray(maxChunkSize);
 | 
						|
        _chunk = _chunk.subarray(0, maxChunkSize);
 | 
						|
      }
 | 
						|
 | 
						|
      pushChunk(_chunk, chunkRemainder ? () => {
 | 
						|
        process.nextTick(_callback, null, chunkRemainder);
 | 
						|
      } : _callback);
 | 
						|
    };
 | 
						|
 | 
						|
    transformChunk(chunk, function transformNextChunk(err, _chunk) {
 | 
						|
      if (err) {
 | 
						|
        return callback(err);
 | 
						|
      }
 | 
						|
 | 
						|
      if (_chunk) {
 | 
						|
        transformChunk(_chunk, transformNextChunk);
 | 
						|
      } else {
 | 
						|
        callback(null);
 | 
						|
      }
 | 
						|
    });
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
export default AxiosTransformStream;
 |