410 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			410 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls$" }] */
 | 
						|
 | 
						|
'use strict';
 | 
						|
 | 
						|
const net = require('net');
 | 
						|
const tls = require('tls');
 | 
						|
const { randomFillSync } = require('crypto');
 | 
						|
 | 
						|
const PerMessageDeflate = require('./permessage-deflate');
 | 
						|
const { EMPTY_BUFFER } = require('./constants');
 | 
						|
const { isValidStatusCode } = require('./validation');
 | 
						|
const { mask: applyMask, toBuffer } = require('./buffer-util');
 | 
						|
 | 
						|
const mask = Buffer.alloc(4);
 | 
						|
 | 
						|
/**
 | 
						|
 * HyBi Sender implementation.
 | 
						|
 */
 | 
						|
class Sender {
 | 
						|
  /**
 | 
						|
   * Creates a Sender instance.
 | 
						|
   *
 | 
						|
   * @param {(net.Socket|tls.Socket)} socket The connection socket
 | 
						|
   * @param {Object} [extensions] An object containing the negotiated extensions
 | 
						|
   */
 | 
						|
  constructor(socket, extensions) {
 | 
						|
    this._extensions = extensions || {};
 | 
						|
    this._socket = socket;
 | 
						|
 | 
						|
    this._firstFragment = true;
 | 
						|
    this._compress = false;
 | 
						|
 | 
						|
    this._bufferedBytes = 0;
 | 
						|
    this._deflating = false;
 | 
						|
    this._queue = [];
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Frames a piece of data according to the HyBi WebSocket protocol.
 | 
						|
   *
 | 
						|
   * @param {Buffer} data The data to frame
 | 
						|
   * @param {Object} options Options object
 | 
						|
   * @param {Number} options.opcode The opcode
 | 
						|
   * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
 | 
						|
   *     modified
 | 
						|
   * @param {Boolean} [options.fin=false] Specifies whether or not to set the
 | 
						|
   *     FIN bit
 | 
						|
   * @param {Boolean} [options.mask=false] Specifies whether or not to mask
 | 
						|
   *     `data`
 | 
						|
   * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
 | 
						|
   *     RSV1 bit
 | 
						|
   * @return {Buffer[]} The framed data as a list of `Buffer` instances
 | 
						|
   * @public
 | 
						|
   */
 | 
						|
  static frame(data, options) {
 | 
						|
    const merge = options.mask && options.readOnly;
 | 
						|
    let offset = options.mask ? 6 : 2;
 | 
						|
    let payloadLength = data.length;
 | 
						|
 | 
						|
    if (data.length >= 65536) {
 | 
						|
      offset += 8;
 | 
						|
      payloadLength = 127;
 | 
						|
    } else if (data.length > 125) {
 | 
						|
      offset += 2;
 | 
						|
      payloadLength = 126;
 | 
						|
    }
 | 
						|
 | 
						|
    const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
 | 
						|
 | 
						|
    target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
 | 
						|
    if (options.rsv1) target[0] |= 0x40;
 | 
						|
 | 
						|
    target[1] = payloadLength;
 | 
						|
 | 
						|
    if (payloadLength === 126) {
 | 
						|
      target.writeUInt16BE(data.length, 2);
 | 
						|
    } else if (payloadLength === 127) {
 | 
						|
      target.writeUInt32BE(0, 2);
 | 
						|
      target.writeUInt32BE(data.length, 6);
 | 
						|
    }
 | 
						|
 | 
						|
    if (!options.mask) return [target, data];
 | 
						|
 | 
						|
    randomFillSync(mask, 0, 4);
 | 
						|
 | 
						|
    target[1] |= 0x80;
 | 
						|
    target[offset - 4] = mask[0];
 | 
						|
    target[offset - 3] = mask[1];
 | 
						|
    target[offset - 2] = mask[2];
 | 
						|
    target[offset - 1] = mask[3];
 | 
						|
 | 
						|
    if (merge) {
 | 
						|
      applyMask(data, mask, target, offset, data.length);
 | 
						|
      return [target];
 | 
						|
    }
 | 
						|
 | 
						|
    applyMask(data, mask, data, 0, data.length);
 | 
						|
    return [target, data];
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Sends a close message to the other peer.
 | 
						|
   *
 | 
						|
   * @param {Number} [code] The status code component of the body
 | 
						|
   * @param {String} [data] The message component of the body
 | 
						|
   * @param {Boolean} [mask=false] Specifies whether or not to mask the message
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @public
 | 
						|
   */
 | 
						|
  close(code, data, mask, cb) {
 | 
						|
    let buf;
 | 
						|
 | 
						|
    if (code === undefined) {
 | 
						|
      buf = EMPTY_BUFFER;
 | 
						|
    } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
 | 
						|
      throw new TypeError('First argument must be a valid error code number');
 | 
						|
    } else if (data === undefined || data === '') {
 | 
						|
      buf = Buffer.allocUnsafe(2);
 | 
						|
      buf.writeUInt16BE(code, 0);
 | 
						|
    } else {
 | 
						|
      const length = Buffer.byteLength(data);
 | 
						|
 | 
						|
      if (length > 123) {
 | 
						|
        throw new RangeError('The message must not be greater than 123 bytes');
 | 
						|
      }
 | 
						|
 | 
						|
      buf = Buffer.allocUnsafe(2 + length);
 | 
						|
      buf.writeUInt16BE(code, 0);
 | 
						|
      buf.write(data, 2);
 | 
						|
    }
 | 
						|
 | 
						|
    if (this._deflating) {
 | 
						|
      this.enqueue([this.doClose, buf, mask, cb]);
 | 
						|
    } else {
 | 
						|
      this.doClose(buf, mask, cb);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Frames and sends a close message.
 | 
						|
   *
 | 
						|
   * @param {Buffer} data The message to send
 | 
						|
   * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @private
 | 
						|
   */
 | 
						|
  doClose(data, mask, cb) {
 | 
						|
    this.sendFrame(
 | 
						|
      Sender.frame(data, {
 | 
						|
        fin: true,
 | 
						|
        rsv1: false,
 | 
						|
        opcode: 0x08,
 | 
						|
        mask,
 | 
						|
        readOnly: false
 | 
						|
      }),
 | 
						|
      cb
 | 
						|
    );
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Sends a ping message to the other peer.
 | 
						|
   *
 | 
						|
   * @param {*} data The message to send
 | 
						|
   * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @public
 | 
						|
   */
 | 
						|
  ping(data, mask, cb) {
 | 
						|
    const buf = toBuffer(data);
 | 
						|
 | 
						|
    if (buf.length > 125) {
 | 
						|
      throw new RangeError('The data size must not be greater than 125 bytes');
 | 
						|
    }
 | 
						|
 | 
						|
    if (this._deflating) {
 | 
						|
      this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
 | 
						|
    } else {
 | 
						|
      this.doPing(buf, mask, toBuffer.readOnly, cb);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Frames and sends a ping message.
 | 
						|
   *
 | 
						|
   * @param {Buffer} data The message to send
 | 
						|
   * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | 
						|
   * @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @private
 | 
						|
   */
 | 
						|
  doPing(data, mask, readOnly, cb) {
 | 
						|
    this.sendFrame(
 | 
						|
      Sender.frame(data, {
 | 
						|
        fin: true,
 | 
						|
        rsv1: false,
 | 
						|
        opcode: 0x09,
 | 
						|
        mask,
 | 
						|
        readOnly
 | 
						|
      }),
 | 
						|
      cb
 | 
						|
    );
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Sends a pong message to the other peer.
 | 
						|
   *
 | 
						|
   * @param {*} data The message to send
 | 
						|
   * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @public
 | 
						|
   */
 | 
						|
  pong(data, mask, cb) {
 | 
						|
    const buf = toBuffer(data);
 | 
						|
 | 
						|
    if (buf.length > 125) {
 | 
						|
      throw new RangeError('The data size must not be greater than 125 bytes');
 | 
						|
    }
 | 
						|
 | 
						|
    if (this._deflating) {
 | 
						|
      this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
 | 
						|
    } else {
 | 
						|
      this.doPong(buf, mask, toBuffer.readOnly, cb);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Frames and sends a pong message.
 | 
						|
   *
 | 
						|
   * @param {Buffer} data The message to send
 | 
						|
   * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
 | 
						|
   * @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @private
 | 
						|
   */
 | 
						|
  doPong(data, mask, readOnly, cb) {
 | 
						|
    this.sendFrame(
 | 
						|
      Sender.frame(data, {
 | 
						|
        fin: true,
 | 
						|
        rsv1: false,
 | 
						|
        opcode: 0x0a,
 | 
						|
        mask,
 | 
						|
        readOnly
 | 
						|
      }),
 | 
						|
      cb
 | 
						|
    );
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Sends a data message to the other peer.
 | 
						|
   *
 | 
						|
   * @param {*} data The message to send
 | 
						|
   * @param {Object} options Options object
 | 
						|
   * @param {Boolean} [options.compress=false] Specifies whether or not to
 | 
						|
   *     compress `data`
 | 
						|
   * @param {Boolean} [options.binary=false] Specifies whether `data` is binary
 | 
						|
   *     or text
 | 
						|
   * @param {Boolean} [options.fin=false] Specifies whether the fragment is the
 | 
						|
   *     last one
 | 
						|
   * @param {Boolean} [options.mask=false] Specifies whether or not to mask
 | 
						|
   *     `data`
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @public
 | 
						|
   */
 | 
						|
  send(data, options, cb) {
 | 
						|
    const buf = toBuffer(data);
 | 
						|
    const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
 | 
						|
    let opcode = options.binary ? 2 : 1;
 | 
						|
    let rsv1 = options.compress;
 | 
						|
 | 
						|
    if (this._firstFragment) {
 | 
						|
      this._firstFragment = false;
 | 
						|
      if (rsv1 && perMessageDeflate) {
 | 
						|
        rsv1 = buf.length >= perMessageDeflate._threshold;
 | 
						|
      }
 | 
						|
      this._compress = rsv1;
 | 
						|
    } else {
 | 
						|
      rsv1 = false;
 | 
						|
      opcode = 0;
 | 
						|
    }
 | 
						|
 | 
						|
    if (options.fin) this._firstFragment = true;
 | 
						|
 | 
						|
    if (perMessageDeflate) {
 | 
						|
      const opts = {
 | 
						|
        fin: options.fin,
 | 
						|
        rsv1,
 | 
						|
        opcode,
 | 
						|
        mask: options.mask,
 | 
						|
        readOnly: toBuffer.readOnly
 | 
						|
      };
 | 
						|
 | 
						|
      if (this._deflating) {
 | 
						|
        this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
 | 
						|
      } else {
 | 
						|
        this.dispatch(buf, this._compress, opts, cb);
 | 
						|
      }
 | 
						|
    } else {
 | 
						|
      this.sendFrame(
 | 
						|
        Sender.frame(buf, {
 | 
						|
          fin: options.fin,
 | 
						|
          rsv1: false,
 | 
						|
          opcode,
 | 
						|
          mask: options.mask,
 | 
						|
          readOnly: toBuffer.readOnly
 | 
						|
        }),
 | 
						|
        cb
 | 
						|
      );
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Dispatches a data message.
 | 
						|
   *
 | 
						|
   * @param {Buffer} data The message to send
 | 
						|
   * @param {Boolean} [compress=false] Specifies whether or not to compress
 | 
						|
   *     `data`
 | 
						|
   * @param {Object} options Options object
 | 
						|
   * @param {Number} options.opcode The opcode
 | 
						|
   * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
 | 
						|
   *     modified
 | 
						|
   * @param {Boolean} [options.fin=false] Specifies whether or not to set the
 | 
						|
   *     FIN bit
 | 
						|
   * @param {Boolean} [options.mask=false] Specifies whether or not to mask
 | 
						|
   *     `data`
 | 
						|
   * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
 | 
						|
   *     RSV1 bit
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @private
 | 
						|
   */
 | 
						|
  dispatch(data, compress, options, cb) {
 | 
						|
    if (!compress) {
 | 
						|
      this.sendFrame(Sender.frame(data, options), cb);
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
 | 
						|
 | 
						|
    this._bufferedBytes += data.length;
 | 
						|
    this._deflating = true;
 | 
						|
    perMessageDeflate.compress(data, options.fin, (_, buf) => {
 | 
						|
      if (this._socket.destroyed) {
 | 
						|
        const err = new Error(
 | 
						|
          'The socket was closed while data was being compressed'
 | 
						|
        );
 | 
						|
 | 
						|
        if (typeof cb === 'function') cb(err);
 | 
						|
 | 
						|
        for (let i = 0; i < this._queue.length; i++) {
 | 
						|
          const callback = this._queue[i][4];
 | 
						|
 | 
						|
          if (typeof callback === 'function') callback(err);
 | 
						|
        }
 | 
						|
 | 
						|
        return;
 | 
						|
      }
 | 
						|
 | 
						|
      this._bufferedBytes -= data.length;
 | 
						|
      this._deflating = false;
 | 
						|
      options.readOnly = false;
 | 
						|
      this.sendFrame(Sender.frame(buf, options), cb);
 | 
						|
      this.dequeue();
 | 
						|
    });
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Executes queued send operations.
 | 
						|
   *
 | 
						|
   * @private
 | 
						|
   */
 | 
						|
  dequeue() {
 | 
						|
    while (!this._deflating && this._queue.length) {
 | 
						|
      const params = this._queue.shift();
 | 
						|
 | 
						|
      this._bufferedBytes -= params[1].length;
 | 
						|
      Reflect.apply(params[0], this, params.slice(1));
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Enqueues a send operation.
 | 
						|
   *
 | 
						|
   * @param {Array} params Send operation parameters.
 | 
						|
   * @private
 | 
						|
   */
 | 
						|
  enqueue(params) {
 | 
						|
    this._bufferedBytes += params[1].length;
 | 
						|
    this._queue.push(params);
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Sends a frame.
 | 
						|
   *
 | 
						|
   * @param {Buffer[]} list The frame to send
 | 
						|
   * @param {Function} [cb] Callback
 | 
						|
   * @private
 | 
						|
   */
 | 
						|
  sendFrame(list, cb) {
 | 
						|
    if (list.length === 2) {
 | 
						|
      this._socket.cork();
 | 
						|
      this._socket.write(list[0]);
 | 
						|
      this._socket.write(list[1], cb);
 | 
						|
      this._socket.uncork();
 | 
						|
    } else {
 | 
						|
      this._socket.write(list[0], cb);
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
module.exports = Sender;
 |