410 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			410 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls$" }] */
 | |
| 
 | |
| 'use strict';
 | |
| 
 | |
| const net = require('net');
 | |
| const tls = require('tls');
 | |
| const { randomFillSync } = require('crypto');
 | |
| 
 | |
| const PerMessageDeflate = require('./permessage-deflate');
 | |
| const { EMPTY_BUFFER } = require('./constants');
 | |
| const { isValidStatusCode } = require('./validation');
 | |
| const { mask: applyMask, toBuffer } = require('./buffer-util');
 | |
| 
 | |
| const mask = Buffer.alloc(4);
 | |
| 
 | |
| /**
 | |
|  * HyBi Sender implementation.
 | |
|  */
 | |
| class Sender {
 | |
|   /**
 | |
|    * Creates a Sender instance.
 | |
|    *
 | |
|    * @param {(net.Socket|tls.Socket)} socket The connection socket
 | |
|    * @param {Object} [extensions] An object containing the negotiated extensions
 | |
|    */
 | |
|   constructor(socket, extensions) {
 | |
|     this._extensions = extensions || {};
 | |
|     this._socket = socket;
 | |
| 
 | |
|     this._firstFragment = true;
 | |
|     this._compress = false;
 | |
| 
 | |
|     this._bufferedBytes = 0;
 | |
|     this._deflating = false;
 | |
|     this._queue = [];
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Frames a piece of data according to the HyBi WebSocket protocol.
 | |
|    *
 | |
|    * @param {Buffer} data The data to frame
 | |
|    * @param {Object} options Options object
 | |
|    * @param {Number} options.opcode The opcode
 | |
|    * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
 | |
|    *     modified
 | |
|    * @param {Boolean} [options.fin=false] Specifies whether or not to set the
 | |
|    *     FIN bit
 | |
|    * @param {Boolean} [options.mask=false] Specifies whether or not to mask
 | |
|    *     `data`
 | |
|    * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
 | |
|    *     RSV1 bit
 | |
|    * @return {Buffer[]} The framed data as a list of `Buffer` instances
 | |
|    * @public
 | |
|    */
 | |
|   static frame(data, options) {
 | |
|     const merge = options.mask && options.readOnly;
 | |
|     let offset = options.mask ? 6 : 2;
 | |
|     let payloadLength = data.length;
 | |
| 
 | |
|     if (data.length >= 65536) {
 | |
|       offset += 8;
 | |
|       payloadLength = 127;
 | |
|     } else if (data.length > 125) {
 | |
|       offset += 2;
 | |
|       payloadLength = 126;
 | |
|     }
 | |
| 
 | |
|     const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
 | |
| 
 | |
|     target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
 | |
|     if (options.rsv1) target[0] |= 0x40;
 | |
| 
 | |
|     target[1] = payloadLength;
 | |
| 
 | |
|     if (payloadLength === 126) {
 | |
|       target.writeUInt16BE(data.length, 2);
 | |
|     } else if (payloadLength === 127) {
 | |
|       target.writeUInt32BE(0, 2);
 | |
|       target.writeUInt32BE(data.length, 6);
 | |
|     }
 | |
| 
 | |
|     if (!options.mask) return [target, data];
 | |
| 
 | |
|     randomFillSync(mask, 0, 4);
 | |
| 
 | |
|     target[1] |= 0x80;
 | |
|     target[offset - 4] = mask[0];
 | |
|     target[offset - 3] = mask[1];
 | |
|     target[offset - 2] = mask[2];
 | |
|     target[offset - 1] = mask[3];
 | |
| 
 | |
|     if (merge) {
 | |
|       applyMask(data, mask, target, offset, data.length);
 | |
|       return [target];
 | |
|     }
 | |
| 
 | |
|     applyMask(data, mask, data, 0, data.length);
 | |
|     return [target, data];
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Sends a close message to the other peer.
 | |
|    *
 | |
|    * @param {Number} [code] The status code component of the body
 | |
|    * @param {String} [data] The message component of the body
 | |
|    * @param {Boolean} [mask=false] Specifies whether or not to mask the message
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @public
 | |
|    */
 | |
|   close(code, data, mask, cb) {
 | |
|     let buf;
 | |
| 
 | |
|     if (code === undefined) {
 | |
|       buf = EMPTY_BUFFER;
 | |
|     } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
 | |
|       throw new TypeError('First argument must be a valid error code number');
 | |
|     } else if (data === undefined || data === '') {
 | |
|       buf = Buffer.allocUnsafe(2);
 | |
|       buf.writeUInt16BE(code, 0);
 | |
|     } else {
 | |
|       const length = Buffer.byteLength(data);
 | |
| 
 | |
|       if (length > 123) {
 | |
|         throw new RangeError('The message must not be greater than 123 bytes');
 | |
|       }
 | |
| 
 | |
|       buf = Buffer.allocUnsafe(2 + length);
 | |
|       buf.writeUInt16BE(code, 0);
 | |
|       buf.write(data, 2);
 | |
|     }
 | |
| 
 | |
|     if (this._deflating) {
 | |
|       this.enqueue([this.doClose, buf, mask, cb]);
 | |
|     } else {
 | |
|       this.doClose(buf, mask, cb);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Frames and sends a close message.
 | |
|    *
 | |
|    * @param {Buffer} data The message to send
 | |
|    * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @private
 | |
|    */
 | |
|   doClose(data, mask, cb) {
 | |
|     this.sendFrame(
 | |
|       Sender.frame(data, {
 | |
|         fin: true,
 | |
|         rsv1: false,
 | |
|         opcode: 0x08,
 | |
|         mask,
 | |
|         readOnly: false
 | |
|       }),
 | |
|       cb
 | |
|     );
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Sends a ping message to the other peer.
 | |
|    *
 | |
|    * @param {*} data The message to send
 | |
|    * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @public
 | |
|    */
 | |
|   ping(data, mask, cb) {
 | |
|     const buf = toBuffer(data);
 | |
| 
 | |
|     if (buf.length > 125) {
 | |
|       throw new RangeError('The data size must not be greater than 125 bytes');
 | |
|     }
 | |
| 
 | |
|     if (this._deflating) {
 | |
|       this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
 | |
|     } else {
 | |
|       this.doPing(buf, mask, toBuffer.readOnly, cb);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Frames and sends a ping message.
 | |
|    *
 | |
|    * @param {Buffer} data The message to send
 | |
|    * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | |
|    * @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @private
 | |
|    */
 | |
|   doPing(data, mask, readOnly, cb) {
 | |
|     this.sendFrame(
 | |
|       Sender.frame(data, {
 | |
|         fin: true,
 | |
|         rsv1: false,
 | |
|         opcode: 0x09,
 | |
|         mask,
 | |
|         readOnly
 | |
|       }),
 | |
|       cb
 | |
|     );
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Sends a pong message to the other peer.
 | |
|    *
 | |
|    * @param {*} data The message to send
 | |
|    * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @public
 | |
|    */
 | |
|   pong(data, mask, cb) {
 | |
|     const buf = toBuffer(data);
 | |
| 
 | |
|     if (buf.length > 125) {
 | |
|       throw new RangeError('The data size must not be greater than 125 bytes');
 | |
|     }
 | |
| 
 | |
|     if (this._deflating) {
 | |
|       this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
 | |
|     } else {
 | |
|       this.doPong(buf, mask, toBuffer.readOnly, cb);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Frames and sends a pong message.
 | |
|    *
 | |
|    * @param {Buffer} data The message to send
 | |
|    * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | |
|    * @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @private
 | |
|    */
 | |
|   doPong(data, mask, readOnly, cb) {
 | |
|     this.sendFrame(
 | |
|       Sender.frame(data, {
 | |
|         fin: true,
 | |
|         rsv1: false,
 | |
|         opcode: 0x0a,
 | |
|         mask,
 | |
|         readOnly
 | |
|       }),
 | |
|       cb
 | |
|     );
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Sends a data message to the other peer.
 | |
|    *
 | |
|    * @param {*} data The message to send
 | |
|    * @param {Object} options Options object
 | |
|    * @param {Boolean} [options.compress=false] Specifies whether or not to
 | |
|    *     compress `data`
 | |
|    * @param {Boolean} [options.binary=false] Specifies whether `data` is binary
 | |
|    *     or text
 | |
|    * @param {Boolean} [options.fin=false] Specifies whether the fragment is the
 | |
|    *     last one
 | |
|    * @param {Boolean} [options.mask=false] Specifies whether or not to mask
 | |
|    *     `data`
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @public
 | |
|    */
 | |
|   send(data, options, cb) {
 | |
|     const buf = toBuffer(data);
 | |
|     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
 | |
|     let opcode = options.binary ? 2 : 1;
 | |
|     let rsv1 = options.compress;
 | |
| 
 | |
|     if (this._firstFragment) {
 | |
|       this._firstFragment = false;
 | |
|       if (rsv1 && perMessageDeflate) {
 | |
|         rsv1 = buf.length >= perMessageDeflate._threshold;
 | |
|       }
 | |
|       this._compress = rsv1;
 | |
|     } else {
 | |
|       rsv1 = false;
 | |
|       opcode = 0;
 | |
|     }
 | |
| 
 | |
|     if (options.fin) this._firstFragment = true;
 | |
| 
 | |
|     if (perMessageDeflate) {
 | |
|       const opts = {
 | |
|         fin: options.fin,
 | |
|         rsv1,
 | |
|         opcode,
 | |
|         mask: options.mask,
 | |
|         readOnly: toBuffer.readOnly
 | |
|       };
 | |
| 
 | |
|       if (this._deflating) {
 | |
|         this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
 | |
|       } else {
 | |
|         this.dispatch(buf, this._compress, opts, cb);
 | |
|       }
 | |
|     } else {
 | |
|       this.sendFrame(
 | |
|         Sender.frame(buf, {
 | |
|           fin: options.fin,
 | |
|           rsv1: false,
 | |
|           opcode,
 | |
|           mask: options.mask,
 | |
|           readOnly: toBuffer.readOnly
 | |
|         }),
 | |
|         cb
 | |
|       );
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Dispatches a data message.
 | |
|    *
 | |
|    * @param {Buffer} data The message to send
 | |
|    * @param {Boolean} [compress=false] Specifies whether or not to compress
 | |
|    *     `data`
 | |
|    * @param {Object} options Options object
 | |
|    * @param {Number} options.opcode The opcode
 | |
|    * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
 | |
|    *     modified
 | |
|    * @param {Boolean} [options.fin=false] Specifies whether or not to set the
 | |
|    *     FIN bit
 | |
|    * @param {Boolean} [options.mask=false] Specifies whether or not to mask
 | |
|    *     `data`
 | |
|    * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
 | |
|    *     RSV1 bit
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @private
 | |
|    */
 | |
|   dispatch(data, compress, options, cb) {
 | |
|     if (!compress) {
 | |
|       this.sendFrame(Sender.frame(data, options), cb);
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
 | |
| 
 | |
|     this._bufferedBytes += data.length;
 | |
|     this._deflating = true;
 | |
|     perMessageDeflate.compress(data, options.fin, (_, buf) => {
 | |
|       if (this._socket.destroyed) {
 | |
|         const err = new Error(
 | |
|           'The socket was closed while data was being compressed'
 | |
|         );
 | |
| 
 | |
|         if (typeof cb === 'function') cb(err);
 | |
| 
 | |
|         for (let i = 0; i < this._queue.length; i++) {
 | |
|           const callback = this._queue[i][4];
 | |
| 
 | |
|           if (typeof callback === 'function') callback(err);
 | |
|         }
 | |
| 
 | |
|         return;
 | |
|       }
 | |
| 
 | |
|       this._bufferedBytes -= data.length;
 | |
|       this._deflating = false;
 | |
|       options.readOnly = false;
 | |
|       this.sendFrame(Sender.frame(buf, options), cb);
 | |
|       this.dequeue();
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Executes queued send operations.
 | |
|    *
 | |
|    * @private
 | |
|    */
 | |
|   dequeue() {
 | |
|     while (!this._deflating && this._queue.length) {
 | |
|       const params = this._queue.shift();
 | |
| 
 | |
|       this._bufferedBytes -= params[1].length;
 | |
|       Reflect.apply(params[0], this, params.slice(1));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Enqueues a send operation.
 | |
|    *
 | |
|    * @param {Array} params Send operation parameters.
 | |
|    * @private
 | |
|    */
 | |
|   enqueue(params) {
 | |
|     this._bufferedBytes += params[1].length;
 | |
|     this._queue.push(params);
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Sends a frame.
 | |
|    *
 | |
|    * @param {Buffer[]} list The frame to send
 | |
|    * @param {Function} [cb] Callback
 | |
|    * @private
 | |
|    */
 | |
|   sendFrame(list, cb) {
 | |
|     if (list.length === 2) {
 | |
|       this._socket.cork();
 | |
|       this._socket.write(list[0]);
 | |
|       this._socket.write(list[1], cb);
 | |
|       this._socket.uncork();
 | |
|     } else {
 | |
|       this._socket.write(list[0], cb);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| module.exports = Sender;
 |