180 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			180 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| 'use strict';
 | |
| 
 | |
| var _Object$setPrototypeO;
 | |
| function _defineProperty(obj, key, value) { key = _toPropertyKey(key); if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
 | |
| function _toPropertyKey(arg) { var key = _toPrimitive(arg, "string"); return typeof key === "symbol" ? key : String(key); }
 | |
| function _toPrimitive(input, hint) { if (typeof input !== "object" || input === null) return input; var prim = input[Symbol.toPrimitive]; if (prim !== undefined) { var res = prim.call(input, hint || "default"); if (typeof res !== "object") return res; throw new TypeError("@@toPrimitive must return a primitive value."); } return (hint === "string" ? String : Number)(input); }
 | |
| var finished = require('./end-of-stream');
 | |
| var kLastResolve = Symbol('lastResolve');
 | |
| var kLastReject = Symbol('lastReject');
 | |
| var kError = Symbol('error');
 | |
| var kEnded = Symbol('ended');
 | |
| var kLastPromise = Symbol('lastPromise');
 | |
| var kHandlePromise = Symbol('handlePromise');
 | |
| var kStream = Symbol('stream');
 | |
| function createIterResult(value, done) {
 | |
|   return {
 | |
|     value: value,
 | |
|     done: done
 | |
|   };
 | |
| }
 | |
| function readAndResolve(iter) {
 | |
|   var resolve = iter[kLastResolve];
 | |
|   if (resolve !== null) {
 | |
|     var data = iter[kStream].read();
 | |
|     // we defer if data is null
 | |
|     // we can be expecting either 'end' or
 | |
|     // 'error'
 | |
|     if (data !== null) {
 | |
|       iter[kLastPromise] = null;
 | |
|       iter[kLastResolve] = null;
 | |
|       iter[kLastReject] = null;
 | |
|       resolve(createIterResult(data, false));
 | |
|     }
 | |
|   }
 | |
| }
 | |
| function onReadable(iter) {
 | |
|   // we wait for the next tick, because it might
 | |
|   // emit an error with process.nextTick
 | |
|   process.nextTick(readAndResolve, iter);
 | |
| }
 | |
| function wrapForNext(lastPromise, iter) {
 | |
|   return function (resolve, reject) {
 | |
|     lastPromise.then(function () {
 | |
|       if (iter[kEnded]) {
 | |
|         resolve(createIterResult(undefined, true));
 | |
|         return;
 | |
|       }
 | |
|       iter[kHandlePromise](resolve, reject);
 | |
|     }, reject);
 | |
|   };
 | |
| }
 | |
| var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
 | |
| var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
 | |
|   get stream() {
 | |
|     return this[kStream];
 | |
|   },
 | |
|   next: function next() {
 | |
|     var _this = this;
 | |
|     // if we have detected an error in the meanwhile
 | |
|     // reject straight away
 | |
|     var error = this[kError];
 | |
|     if (error !== null) {
 | |
|       return Promise.reject(error);
 | |
|     }
 | |
|     if (this[kEnded]) {
 | |
|       return Promise.resolve(createIterResult(undefined, true));
 | |
|     }
 | |
|     if (this[kStream].destroyed) {
 | |
|       // We need to defer via nextTick because if .destroy(err) is
 | |
|       // called, the error will be emitted via nextTick, and
 | |
|       // we cannot guarantee that there is no error lingering around
 | |
|       // waiting to be emitted.
 | |
|       return new Promise(function (resolve, reject) {
 | |
|         process.nextTick(function () {
 | |
|           if (_this[kError]) {
 | |
|             reject(_this[kError]);
 | |
|           } else {
 | |
|             resolve(createIterResult(undefined, true));
 | |
|           }
 | |
|         });
 | |
|       });
 | |
|     }
 | |
| 
 | |
|     // if we have multiple next() calls
 | |
|     // we will wait for the previous Promise to finish
 | |
|     // this logic is optimized to support for await loops,
 | |
|     // where next() is only called once at a time
 | |
|     var lastPromise = this[kLastPromise];
 | |
|     var promise;
 | |
|     if (lastPromise) {
 | |
|       promise = new Promise(wrapForNext(lastPromise, this));
 | |
|     } else {
 | |
|       // fast path needed to support multiple this.push()
 | |
|       // without triggering the next() queue
 | |
|       var data = this[kStream].read();
 | |
|       if (data !== null) {
 | |
|         return Promise.resolve(createIterResult(data, false));
 | |
|       }
 | |
|       promise = new Promise(this[kHandlePromise]);
 | |
|     }
 | |
|     this[kLastPromise] = promise;
 | |
|     return promise;
 | |
|   }
 | |
| }, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
 | |
|   return this;
 | |
| }), _defineProperty(_Object$setPrototypeO, "return", function _return() {
 | |
|   var _this2 = this;
 | |
|   // destroy(err, cb) is a private API
 | |
|   // we can guarantee we have that here, because we control the
 | |
|   // Readable class this is attached to
 | |
|   return new Promise(function (resolve, reject) {
 | |
|     _this2[kStream].destroy(null, function (err) {
 | |
|       if (err) {
 | |
|         reject(err);
 | |
|         return;
 | |
|       }
 | |
|       resolve(createIterResult(undefined, true));
 | |
|     });
 | |
|   });
 | |
| }), _Object$setPrototypeO), AsyncIteratorPrototype);
 | |
| var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
 | |
|   var _Object$create;
 | |
|   var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
 | |
|     value: stream,
 | |
|     writable: true
 | |
|   }), _defineProperty(_Object$create, kLastResolve, {
 | |
|     value: null,
 | |
|     writable: true
 | |
|   }), _defineProperty(_Object$create, kLastReject, {
 | |
|     value: null,
 | |
|     writable: true
 | |
|   }), _defineProperty(_Object$create, kError, {
 | |
|     value: null,
 | |
|     writable: true
 | |
|   }), _defineProperty(_Object$create, kEnded, {
 | |
|     value: stream._readableState.endEmitted,
 | |
|     writable: true
 | |
|   }), _defineProperty(_Object$create, kHandlePromise, {
 | |
|     value: function value(resolve, reject) {
 | |
|       var data = iterator[kStream].read();
 | |
|       if (data) {
 | |
|         iterator[kLastPromise] = null;
 | |
|         iterator[kLastResolve] = null;
 | |
|         iterator[kLastReject] = null;
 | |
|         resolve(createIterResult(data, false));
 | |
|       } else {
 | |
|         iterator[kLastResolve] = resolve;
 | |
|         iterator[kLastReject] = reject;
 | |
|       }
 | |
|     },
 | |
|     writable: true
 | |
|   }), _Object$create));
 | |
|   iterator[kLastPromise] = null;
 | |
|   finished(stream, function (err) {
 | |
|     if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
 | |
|       var reject = iterator[kLastReject];
 | |
|       // reject if we are waiting for data in the Promise
 | |
|       // returned by next() and store the error
 | |
|       if (reject !== null) {
 | |
|         iterator[kLastPromise] = null;
 | |
|         iterator[kLastResolve] = null;
 | |
|         iterator[kLastReject] = null;
 | |
|         reject(err);
 | |
|       }
 | |
|       iterator[kError] = err;
 | |
|       return;
 | |
|     }
 | |
|     var resolve = iterator[kLastResolve];
 | |
|     if (resolve !== null) {
 | |
|       iterator[kLastPromise] = null;
 | |
|       iterator[kLastResolve] = null;
 | |
|       iterator[kLastReject] = null;
 | |
|       resolve(createIterResult(undefined, true));
 | |
|     }
 | |
|     iterator[kEnded] = true;
 | |
|   });
 | |
|   stream.on('readable', onReadable.bind(null, iterator));
 | |
|   return iterator;
 | |
| };
 | |
| module.exports = createReadableStreamAsyncIterator; |