Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: try to avoid buffer allocations #1998

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 106 additions & 59 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Sender {
/**
* Frames a piece of data according to the HyBi WebSocket protocol.
*
* @param {Buffer} data The data to frame
* @param {Buffer|String} data The data to frame
* @param {Object} options Options object
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
* FIN bit
Expand All @@ -58,13 +58,13 @@ class Sender {
* key
* @param {Number} options.opcode The opcode
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
* modified
* modified. Only relevant if data is a `Buffer`.
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
* RSV1 bit
* @return {Buffer[]} The framed data as a list of `Buffer` instances
* @public
*/
static frame(data, options) {
static frame(data, options, socket, cb) {
let mask;
let merge = false;
let offset = 2;
Expand All @@ -80,12 +80,31 @@ class Sender {
}

skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;
if (options.readOnly && !skipMasking) merge = true;

offset = 6;
}

let payloadLength = data.length;
let dataLength = options.payloadLength;

if (typeof data === 'string') {
if (!options.mask || skipMasking) {
if (dataLength == null) {
dataLength = Buffer.byteLength(data);
}
} else {
data = toBuffer(data);
dataLength = data.length;
}
} else if (Buffer.isBuffer(data)) {
dataLength = data.length;
merge = options.mask && options.readOnly && !skipMasking;
} else {
data = toBuffer(data);
dataLength = data.length;
merge = options.mask && toBuffer.readOnly && !skipMasking;
}

let payloadLength = dataLength;

if (data.length >= 65536) {
offset += 8;
Expand All @@ -95,37 +114,51 @@ class Sender {
payloadLength = 126;
}

const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset);

target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
if (options.rsv1) target[0] |= 0x40;
if (options.rsv1) {
target[0] |= 0x40;
}

target[1] = payloadLength;

if (payloadLength === 126) {
target.writeUInt16BE(data.length, 2);
target.writeUInt16BE(dataLength, 2);
} else if (payloadLength === 127) {
target[2] = target[3] = 0;
target.writeUIntBE(data.length, 4, 6);
target.writeUIntBE(dataLength, 4, 6);
}

if (!options.mask) return [target, data];

target[1] |= 0x80;
target[offset - 4] = mask[0];
target[offset - 3] = mask[1];
target[offset - 2] = mask[2];
target[offset - 1] = mask[3];

if (skipMasking) return [target, data];

if (merge) {
applyMask(data, mask, target, offset, data.length);
return [target];
if (options.mask) {
target[1] |= 0x80;
target[offset - 4] = mask[0];
target[offset - 3] = mask[1];
target[offset - 2] = mask[2];
target[offset - 1] = mask[3];

if (!skipMasking) {
if (merge) {
applyMask(data, mask, target, offset, data.length);
data = null;
} else {
applyMask(data, mask, data, 0, data.length);
}
}
}

applyMask(data, mask, data, 0, data.length);
return [target, data];
if (socket) {
if (data) {
socket.cork();
socket.write(target);
socket.write(data, cb);
socket.uncork();
} else {
socket.write(target, cb);
}
} else {
return data ? [target, data] : [target];
}
}

/**
Expand Down Expand Up @@ -180,16 +213,18 @@ class Sender {
* @private
*/
doClose(data, mask, cb) {
this.sendFrame(
Sender.frame(data, {
Sender.frame(
data,
{
fin: true,
rsv1: false,
opcode: 0x08,
mask,
maskBuffer: this._maskBuffer,
generateMask: this._generateMask,
readOnly: false
}),
readOnly: true
},
this.socket,
cb
);
}
Expand All @@ -203,16 +238,16 @@ class Sender {
* @public
*/
ping(data, mask, cb) {
const buf = toBuffer(data);
const payloadLength = Buffer.byteLength(data);

if (buf.length > 125) {
if (payloadLength > 125) {
throw new RangeError('The data size must not be greater than 125 bytes');
}

if (this._deflating) {
this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
this.enqueue([this.doPing, data, mask, payloadLength, cb]);
} else {
this.doPing(buf, mask, toBuffer.readOnly, cb);
this.doPing(data, mask, payloadLength, cb);
}
}

Expand All @@ -221,21 +256,23 @@ class Sender {
*
* @param {Buffer} data The message to send
* @param {Boolean} [mask=false] Specifies whether or not to mask `data`
* @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
* @param {Function} [cb] Callback
* @private
*/
doPing(data, mask, readOnly, cb) {
this.sendFrame(
Sender.frame(data, {
doPing(data, mask, payloadLength, cb) {
Sender.frame(
data,
{
fin: true,
rsv1: false,
opcode: 0x09,
mask,
maskBuffer: this._maskBuffer,
generateMask: this._generateMask,
readOnly
}),
payloadLength,
readOnly: true
},
this.socket,
cb
);
}
Expand All @@ -249,16 +286,16 @@ class Sender {
* @public
*/
pong(data, mask, cb) {
const buf = toBuffer(data);
const payloadLength = Buffer.byteLength(data);

if (buf.length > 125) {
if (payloadLength > 125) {
throw new RangeError('The data size must not be greater than 125 bytes');
}

if (this._deflating) {
this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
this.enqueue([this.doPong, data, mask, payloadLength, cb]);
} else {
this.doPong(buf, mask, toBuffer.readOnly, cb);
this.doPong(data, mask, payloadLength, cb);
}
}

Expand All @@ -267,21 +304,23 @@ class Sender {
*
* @param {Buffer} data The message to send
* @param {Boolean} [mask=false] Specifies whether or not to mask `data`
* @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
* @param {Function} [cb] Callback
* @private
*/
doPong(data, mask, readOnly, cb) {
this.sendFrame(
Sender.frame(data, {
doPong(data, mask, payloadLength, cb) {
Sender.frame(
data,
{
fin: true,
rsv1: false,
opcode: 0x0a,
mask,
maskBuffer: this._maskBuffer,
generateMask: this._generateMask,
readOnly
}),
payloadLength,
readOnly: true
},
this.socket,
cb
);
}
Expand All @@ -303,11 +342,12 @@ class Sender {
* @public
*/
send(data, options, cb) {
const buf = toBuffer(data);
const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
let opcode = options.binary ? 2 : 1;
let rsv1 = options.compress;

let payloadLength = options.payloadLength;

if (this._firstFragment) {
this._firstFragment = false;
if (
Expand All @@ -319,7 +359,10 @@ class Sender {
: 'client_no_context_takeover'
]
) {
rsv1 = buf.length >= perMessageDeflate._threshold;
if (payloadLength == null) {
payloadLength = Buffer.byteLength(data);
}
rsv1 = payloadLength >= perMessageDeflate._threshold;
}
this._compress = rsv1;
} else {
Expand All @@ -337,25 +380,29 @@ class Sender {
mask: options.mask,
maskBuffer: this._maskBuffer,
generateMask: this._generateMask,
readOnly: toBuffer.readOnly
payloadLength,
readOnly: true
};

if (this._deflating) {
this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
} else {
this.dispatch(buf, this._compress, opts, cb);
this.dispatch(data, this._compress, opts, cb);
}
} else {
this.sendFrame(
Sender.frame(buf, {
Sender.frame(
data,
{
fin: options.fin,
rsv1: false,
opcode,
mask: options.mask,
maskBuffer: this._maskBuffer,
generateMask: this._generateMask,
readOnly: toBuffer.readOnly
}),
payloadLength,
readOnly: true
},
this.socket,
cb
);
}
Expand Down Expand Up @@ -386,7 +433,7 @@ class Sender {
*/
dispatch(data, compress, options, cb) {
if (!compress) {
this.sendFrame(Sender.frame(data, options), cb);
Sender.frame(data, options, this.socket, cb);
return;
}

Expand Down Expand Up @@ -414,7 +461,7 @@ class Sender {
this._bufferedBytes -= data.length;
this._deflating = false;
options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
Sender.frame(buf, options, this.socket, cb);
this.dequeue();
});
}
Expand Down