Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ext/node): shared global buffer unlock correctness fix #20314

Merged
merged 1 commit into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 39 additions & 31 deletions cli/tests/unit_node/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
assert,
assertEquals,
} from "../../../test_util/std/testing/asserts.ts";
import { deferred } from "../../../test_util/std/async/deferred.ts";
import { Deferred, deferred } from "../../../test_util/std/async/deferred.ts";
import * as path from "../../../test_util/std/path/mod.ts";
import * as http from "node:http";

Expand Down Expand Up @@ -131,17 +131,18 @@ Deno.test("[node/net] connection event has socket value", async () => {
await Promise.all([p, p2]);
});

/// We need to make sure that any shared buffers are never used concurrently by two reads.
// https://github.com/denoland/deno/issues/20188
Deno.test("[node/net] multiple Sockets should get correct server data", async () => {
const p = deferred();
const p2 = deferred();
const socketCount = 9;

const dataReceived1 = deferred();
const dataReceived2 = deferred();

const events1: string[] = [];
const events2: string[] = [];
class TestSocket {
dataReceived: Deferred<undefined> = deferred();
events: string[] = [];
socket: net.Socket | undefined;
}

const finished = deferred();
const server = net.createServer();
server.on("connection", (socket) => {
assert(socket !== undefined);
Expand All @@ -150,40 +151,47 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async ()
});
});

const sockets: TestSocket[] = [];
for (let i = 0; i < socketCount; i++) {
sockets[i] = new TestSocket();
}

server.listen(async () => {
// deno-lint-ignore no-explicit-any
const { port } = server.address() as any;

const socket1 = net.createConnection(port);
const socket2 = net.createConnection(port);

socket1.on("data", (data) => {
events1.push(new TextDecoder().decode(data));
dataReceived1.resolve();
});

socket2.on("data", (data) => {
events2.push(new TextDecoder().decode(data));
dataReceived2.resolve();
});
for (let i = 0; i < socketCount; i++) {
const socket = sockets[i].socket = net.createConnection(port);
socket.on("data", (data) => {
const count = sockets[i].events.length;
sockets[i].events.push(new TextDecoder().decode(data));
if (count === 0) {
// Trigger an immediate second write
sockets[i].socket?.write(`${i}`.repeat(3));
} else {
sockets[i].dataReceived.resolve();
}
});
}

socket1.write("111");
socket2.write("222");
for (let i = 0; i < socketCount; i++) {
sockets[i].socket?.write(`${i}`.repeat(3));
}

await Promise.all([dataReceived1, dataReceived2]);
await Promise.all(sockets.map((socket) => socket.dataReceived));

socket1.end();
socket2.end();
for (let i = 0; i < socketCount; i++) {
sockets[i].socket?.end();
}

server.close(() => {
p.resolve();
finished.resolve();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in the old code, p2 was guaranteed to fire before p so there wasn't much sense in waiting on both.

});

p2.resolve();
});

await Promise.all([p, p2]);
await finished;

assertEquals(events1, ["111"]);
assertEquals(events2, ["222"]);
for (let i = 0; i < socketCount; i++) {
assertEquals(sockets[i].events, [`${i}`.repeat(3), `${i}`.repeat(3)]);
}
});
41 changes: 34 additions & 7 deletions ext/node/polyfills/internal_binding/stream_wrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,21 +311,37 @@ export class LibuvStreamWrap extends HandleWrap {

/** Internal method for reading from the attached stream. */
async #read() {
const isOwnedBuf = bufLocked;
let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF;
bufLocked = true;
// Lock safety: We must hold this lock until we are certain that buf is no longer used
// This setup code is a little verbose, but we need to be careful about buffer management
let buf, locked = false;
if (bufLocked) {
// Already locked, allocate
buf = new Uint8Array(SUGGESTED_SIZE);
} else {
// Not locked, take the buffer + lock
buf = BUF;
locked = bufLocked = true;
Comment on lines +317 to +323
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the code be much simpler if you always allocate buf = new Uint8Array(SUGGESTED_SIZE);? locked, bufLocked will be deemed useless.

Is the current shared buffer motivated only by a performance gain? @mmastrac

}
try {
let nread: number | null;
const ridBefore = this[kStreamBaseField]!.rid;
try {
nread = await this[kStreamBaseField]!.read(buf);
} catch (e) {
// Lock safety: we know that the buffer will not be used in this function again
// All exits from this block either return or re-assign buf to a different value
if (locked) {
bufLocked = locked = false;
}

// Try to read again if the underlying stream resource
// changed. This can happen during TLS upgrades (eg. STARTTLS)
if (ridBefore != this[kStreamBaseField]!.rid) {
return this.#read();
}

buf = new Uint8Array(0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this line for clarity


if (
e instanceof Deno.errors.Interrupted ||
e instanceof Deno.errors.BadResource
Expand All @@ -339,8 +355,6 @@ export class LibuvStreamWrap extends HandleWrap {
} else {
nread = codeMap.get("UNKNOWN")!;
}

buf = new Uint8Array(0);
}

nread ??= codeMap.get("EOF")!;
Expand All @@ -351,7 +365,17 @@ export class LibuvStreamWrap extends HandleWrap {
this.bytesRead += nread;
}

buf = isOwnedBuf ? buf.subarray(0, nread) : buf.slice(0, nread);
// We release the lock early so a re-entrant read can make use of the shared buffer, but
// we need to make a copy of the data in the shared buffer.
if (locked) {
// Lock safety: we know that the buffer will not be used in this function again
// We're making a copy of data that lives in the shared buffer
buf = buf.slice(0, nread);
bufLocked = locked = false;
} else {
// The buffer isn't owned, so let's create a subarray view
buf = buf.subarray(0, nread);
}

streamBaseState[kArrayBufferOffset] = 0;

Expand All @@ -365,7 +389,10 @@ export class LibuvStreamWrap extends HandleWrap {
this.#read();
}
} finally {
bufLocked = false;
// Lock safety: we know that the buffer will not be used in this function again
if (locked) {
bufLocked = locked = false;
}
}
}

Expand Down