From 3bd436c5e1e40c65298294f8c1424ecd2cae7118 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 4 Jan 2022 17:25:06 +0100 Subject: [PATCH 1/3] perf: try to avoid buffer allocations --- lib/sender.js | 83 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 29 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 2417656d7..c149b7852 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -46,7 +46,7 @@ class Sender { /** * Frames a piece of data according to the HyBi WebSocket protocol. * - * @param {Buffer} data The data to frame + * @param {Buffer|String} data The data to frame * @param {Object} options Options object * @param {Boolean} [options.fin=false] Specifies whether or not to set the * FIN bit @@ -58,7 +58,7 @@ class Sender { * key * @param {Number} options.opcode The opcode * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be - * modified + * modified. Only relevant if data is a `Buffer`. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the * RSV1 bit * @return {Buffer[]} The framed data as a list of `Buffer` instances @@ -80,12 +80,31 @@ class Sender { } skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0; - if (options.readOnly && !skipMasking) merge = true; offset = 6; } - let payloadLength = data.length; + let dataLength = options.payloadLength; + + if (typeof data === 'string') { + if (!options.mask || skipMasking) { + if (dataLength == null) { + dataLength = Buffer.byteLength(data); + } + } else { + data = toBuffer(data); + dataLength = data.length; + } + } else if (Buffer.isBuffer(data)) { + dataLength = data.length; + merge = options.mask && options.readOnly && !skipMasking; + } else { + data = toBuffer(data); + dataLength = data.length; + merge = options.mask && toBuffer.readOnly && !skipMasking; + } + + let payloadLength = dataLength; if (data.length >= 65536) { offset += 8; @@ -95,7 +114,7 @@ class Sender { payloadLength = 126; } - const target = Buffer.allocUnsafe(merge ? data.length + offset : offset); + const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset); target[0] = options.fin ? options.opcode | 0x80 : options.opcode; if (options.rsv1) target[0] |= 0x40; @@ -103,10 +122,10 @@ class Sender { target[1] = payloadLength; if (payloadLength === 126) { - target.writeUInt16BE(data.length, 2); + target.writeUInt16BE(dataLength, 2); } else if (payloadLength === 127) { target[2] = target[3] = 0; - target.writeUIntBE(data.length, 4, 6); + target.writeUIntBE(dataLength, 4, 6); } if (!options.mask) return [target, data]; @@ -188,7 +207,7 @@ class Sender { mask, maskBuffer: this._maskBuffer, generateMask: this._generateMask, - readOnly: false + readOnly: true }), cb ); @@ -203,16 +222,16 @@ class Sender { * @public */ ping(data, mask, cb) { - const buf = toBuffer(data); + const payloadLength = Buffer.byteLength(data); - if (buf.length > 125) { + if (payloadLength > 125) { throw new RangeError('The data size must not be greater than 125 bytes'); } if (this._deflating) { - this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]); + this.enqueue([this.doPing, data, mask, payloadLength, cb]); } else { - this.doPing(buf, mask, toBuffer.readOnly, cb); + this.doPing(data, mask, payloadLength, cb); } } @@ -221,11 +240,10 @@ class Sender { * * @param {Buffer} data The message to send * @param {Boolean} [mask=false] Specifies whether or not to mask `data` - * @param {Boolean} [readOnly=false] Specifies whether `data` can be modified * @param {Function} [cb] Callback * @private */ - doPing(data, mask, readOnly, cb) { + doPing(data, mask, payloadLength, cb) { this.sendFrame( Sender.frame(data, { fin: true, @@ -234,7 +252,8 @@ class Sender { mask, maskBuffer: this._maskBuffer, generateMask: this._generateMask, - readOnly + payloadLength, + readOnly: true }), cb ); @@ -249,16 +268,16 @@ class Sender { * @public */ pong(data, mask, cb) { - const buf = toBuffer(data); + const payloadLength = Buffer.byteLength(data); - if (buf.length > 125) { + if (payloadLength > 125) { throw new RangeError('The data size must not be greater than 125 bytes'); } if (this._deflating) { - this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]); + this.enqueue([this.doPong, data, mask, payloadLength, cb]); } else { - this.doPong(buf, mask, toBuffer.readOnly, cb); + this.doPong(data, mask, payloadLength, cb); } } @@ -267,11 +286,10 @@ class Sender { * * @param {Buffer} data The message to send * @param {Boolean} [mask=false] Specifies whether or not to mask `data` - * @param {Boolean} [readOnly=false] Specifies whether `data` can be modified * @param {Function} [cb] Callback * @private */ - doPong(data, mask, readOnly, cb) { + doPong(data, mask, payloadLength, cb) { this.sendFrame( Sender.frame(data, { fin: true, @@ -280,7 +298,8 @@ class Sender { mask, maskBuffer: this._maskBuffer, generateMask: this._generateMask, - readOnly + payloadLength, + readOnly: true }), cb ); @@ -303,11 +322,12 @@ class Sender { * @public */ send(data, options, cb) { - const buf = toBuffer(data); const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; let opcode = options.binary ? 2 : 1; let rsv1 = options.compress; + let payloadLength = options.payloadLength; + if (this._firstFragment) { this._firstFragment = false; if ( @@ -319,7 +339,10 @@ class Sender { : 'client_no_context_takeover' ] ) { - rsv1 = buf.length >= perMessageDeflate._threshold; + if (payloadLength == null) { + payloadLength = Buffer.byteLength(data); + } + rsv1 = payloadLength >= perMessageDeflate._threshold; } this._compress = rsv1; } else { @@ -337,24 +360,26 @@ class Sender { mask: options.mask, maskBuffer: this._maskBuffer, generateMask: this._generateMask, - readOnly: toBuffer.readOnly + payloadLength, + readOnly: true }; if (this._deflating) { - this.enqueue([this.dispatch, buf, this._compress, opts, cb]); + this.enqueue([this.dispatch, data, this._compress, opts, cb]); } else { - this.dispatch(buf, this._compress, opts, cb); + this.dispatch(data, this._compress, opts, cb); } } else { this.sendFrame( - Sender.frame(buf, { + Sender.frame(data, { fin: options.fin, rsv1: false, opcode, mask: options.mask, maskBuffer: this._maskBuffer, generateMask: this._generateMask, - readOnly: toBuffer.readOnly + payloadLength, + readOnly: true }), cb ); From 02d4afce7626c905c2b8ed0f48bdd703b0308dc7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 6 Jan 2022 14:20:33 +0100 Subject: [PATCH 2/3] refactor: single return --- lib/sender.js | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index c149b7852..5d9a0679b 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -117,7 +117,9 @@ class Sender { const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset); target[0] = options.fin ? options.opcode | 0x80 : options.opcode; - if (options.rsv1) target[0] |= 0x40; + if (options.rsv1) { + target[0] |= 0x40; + } target[1] = payloadLength; @@ -128,23 +130,24 @@ class Sender { target.writeUIntBE(dataLength, 4, 6); } - if (!options.mask) return [target, data]; - - target[1] |= 0x80; - target[offset - 4] = mask[0]; - target[offset - 3] = mask[1]; - target[offset - 2] = mask[2]; - target[offset - 1] = mask[3]; - - if (skipMasking) return [target, data]; - - if (merge) { - applyMask(data, mask, target, offset, data.length); - return [target]; + if (options.mask) { + target[1] |= 0x80; + target[offset - 4] = mask[0]; + target[offset - 3] = mask[1]; + target[offset - 2] = mask[2]; + target[offset - 1] = mask[3]; + + if (!skipMasking) { + if (merge) { + applyMask(data, mask, target, offset, data.length); + data = null; + } else { + applyMask(data, mask, data, 0, data.length); + } + } } - applyMask(data, mask, data, 0, data.length); - return [target, data]; + return data ? [target, data] : [target]; } /** From 7b034f13e11ff3d196ae5542460e380fad9a433e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 6 Jan 2022 14:23:45 +0100 Subject: [PATCH 3/3] perf: avoid sendFrame --- lib/sender.js | 51 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 5d9a0679b..936383e09 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -64,7 +64,7 @@ class Sender { * @return {Buffer[]} The framed data as a list of `Buffer` instances * @public */ - static frame(data, options) { + static frame(data, options, socket, cb) { let mask; let merge = false; let offset = 2; @@ -147,7 +147,18 @@ class Sender { } } - return data ? [target, data] : [target]; + if (socket) { + if (data) { + socket.cork(); + socket.write(target); + socket.write(data, cb); + socket.uncork(); + } else { + socket.write(target, cb); + } + } else { + return data ? [target, data] : [target]; + } } /** @@ -202,8 +213,9 @@ class Sender { * @private */ doClose(data, mask, cb) { - this.sendFrame( - Sender.frame(data, { + Sender.frame( + data, + { fin: true, rsv1: false, opcode: 0x08, @@ -211,7 +223,8 @@ class Sender { maskBuffer: this._maskBuffer, generateMask: this._generateMask, readOnly: true - }), + }, + this.socket, cb ); } @@ -247,8 +260,9 @@ class Sender { * @private */ doPing(data, mask, payloadLength, cb) { - this.sendFrame( - Sender.frame(data, { + Sender.frame( + data, + { fin: true, rsv1: false, opcode: 0x09, @@ -257,7 +271,8 @@ class Sender { generateMask: this._generateMask, payloadLength, readOnly: true - }), + }, + this.socket, cb ); } @@ -293,8 +308,9 @@ class Sender { * @private */ doPong(data, mask, payloadLength, cb) { - this.sendFrame( - Sender.frame(data, { + Sender.frame( + data, + { fin: true, rsv1: false, opcode: 0x0a, @@ -303,7 +319,8 @@ class Sender { generateMask: this._generateMask, payloadLength, readOnly: true - }), + }, + this.socket, cb ); } @@ -373,8 +390,9 @@ class Sender { this.dispatch(data, this._compress, opts, cb); } } else { - this.sendFrame( - Sender.frame(data, { + Sender.frame( + data, + { fin: options.fin, rsv1: false, opcode, @@ -383,7 +401,8 @@ class Sender { generateMask: this._generateMask, payloadLength, readOnly: true - }), + }, + this.socket, cb ); } @@ -414,7 +433,7 @@ class Sender { */ dispatch(data, compress, options, cb) { if (!compress) { - this.sendFrame(Sender.frame(data, options), cb); + Sender.frame(data, options, this.socket, cb); return; } @@ -442,7 +461,7 @@ class Sender { this._bufferedBytes -= data.length; this._deflating = false; options.readOnly = false; - this.sendFrame(Sender.frame(buf, options), cb); + Sender.frame(buf, options, this.socket, cb); this.dequeue(); }); }