144 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			144 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| 'use strict';
 | |
| 
 | |
| import stream from 'stream';
 | |
| import utils from '../utils.js';
 | |
| 
 | |
| const kInternals = Symbol('internals');
 | |
| 
 | |
| class AxiosTransformStream extends stream.Transform{
 | |
|   constructor(options) {
 | |
|     options = utils.toFlatObject(options, {
 | |
|       maxRate: 0,
 | |
|       chunkSize: 64 * 1024,
 | |
|       minChunkSize: 100,
 | |
|       timeWindow: 500,
 | |
|       ticksRate: 2,
 | |
|       samplesCount: 15
 | |
|     }, null, (prop, source) => {
 | |
|       return !utils.isUndefined(source[prop]);
 | |
|     });
 | |
| 
 | |
|     super({
 | |
|       readableHighWaterMark: options.chunkSize
 | |
|     });
 | |
| 
 | |
|     const internals = this[kInternals] = {
 | |
|       timeWindow: options.timeWindow,
 | |
|       chunkSize: options.chunkSize,
 | |
|       maxRate: options.maxRate,
 | |
|       minChunkSize: options.minChunkSize,
 | |
|       bytesSeen: 0,
 | |
|       isCaptured: false,
 | |
|       notifiedBytesLoaded: 0,
 | |
|       ts: Date.now(),
 | |
|       bytes: 0,
 | |
|       onReadCallback: null
 | |
|     };
 | |
| 
 | |
|     this.on('newListener', event => {
 | |
|       if (event === 'progress') {
 | |
|         if (!internals.isCaptured) {
 | |
|           internals.isCaptured = true;
 | |
|         }
 | |
|       }
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   _read(size) {
 | |
|     const internals = this[kInternals];
 | |
| 
 | |
|     if (internals.onReadCallback) {
 | |
|       internals.onReadCallback();
 | |
|     }
 | |
| 
 | |
|     return super._read(size);
 | |
|   }
 | |
| 
 | |
|   _transform(chunk, encoding, callback) {
 | |
|     const internals = this[kInternals];
 | |
|     const maxRate = internals.maxRate;
 | |
| 
 | |
|     const readableHighWaterMark = this.readableHighWaterMark;
 | |
| 
 | |
|     const timeWindow = internals.timeWindow;
 | |
| 
 | |
|     const divider = 1000 / timeWindow;
 | |
|     const bytesThreshold = (maxRate / divider);
 | |
|     const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
 | |
| 
 | |
|     const pushChunk = (_chunk, _callback) => {
 | |
|       const bytes = Buffer.byteLength(_chunk);
 | |
|       internals.bytesSeen += bytes;
 | |
|       internals.bytes += bytes;
 | |
| 
 | |
|       internals.isCaptured && this.emit('progress', internals.bytesSeen);
 | |
| 
 | |
|       if (this.push(_chunk)) {
 | |
|         process.nextTick(_callback);
 | |
|       } else {
 | |
|         internals.onReadCallback = () => {
 | |
|           internals.onReadCallback = null;
 | |
|           process.nextTick(_callback);
 | |
|         };
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     const transformChunk = (_chunk, _callback) => {
 | |
|       const chunkSize = Buffer.byteLength(_chunk);
 | |
|       let chunkRemainder = null;
 | |
|       let maxChunkSize = readableHighWaterMark;
 | |
|       let bytesLeft;
 | |
|       let passed = 0;
 | |
| 
 | |
|       if (maxRate) {
 | |
|         const now = Date.now();
 | |
| 
 | |
|         if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
 | |
|           internals.ts = now;
 | |
|           bytesLeft = bytesThreshold - internals.bytes;
 | |
|           internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
 | |
|           passed = 0;
 | |
|         }
 | |
| 
 | |
|         bytesLeft = bytesThreshold - internals.bytes;
 | |
|       }
 | |
| 
 | |
|       if (maxRate) {
 | |
|         if (bytesLeft <= 0) {
 | |
|           // next time window
 | |
|           return setTimeout(() => {
 | |
|             _callback(null, _chunk);
 | |
|           }, timeWindow - passed);
 | |
|         }
 | |
| 
 | |
|         if (bytesLeft < maxChunkSize) {
 | |
|           maxChunkSize = bytesLeft;
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
 | |
|         chunkRemainder = _chunk.subarray(maxChunkSize);
 | |
|         _chunk = _chunk.subarray(0, maxChunkSize);
 | |
|       }
 | |
| 
 | |
|       pushChunk(_chunk, chunkRemainder ? () => {
 | |
|         process.nextTick(_callback, null, chunkRemainder);
 | |
|       } : _callback);
 | |
|     };
 | |
| 
 | |
|     transformChunk(chunk, function transformNextChunk(err, _chunk) {
 | |
|       if (err) {
 | |
|         return callback(err);
 | |
|       }
 | |
| 
 | |
|       if (_chunk) {
 | |
|         transformChunk(_chunk, transformNextChunk);
 | |
|       } else {
 | |
|         callback(null);
 | |
|       }
 | |
|     });
 | |
|   }
 | |
| }
 | |
| 
 | |
| export default AxiosTransformStream;
 |