Skip to content

Commit

Permalink
Change the default HWM for transform stream readableStrategy to 0
Browse files Browse the repository at this point in the history
This reduces "buffer bloat" in pipes. The most significant change is that
now by default transform() is called until something is read. In normal
use, TransformStream will be used in a pipe and a transform() will be
triggered immediately to fill the queue in the following writable. So the
difference from the point of view of everyday use is minimal.

If the highWaterMark passed to the TransformStream constructor is
undefined (or not present) it will be changed to zero before passing it
on to the ReadableStream constructor.

Some tests assumed that transform() would be called without anything
being read and so have been fixed. In most cases the tests were fixed by
supplying an explicit highWaterMark to the constructor.

Verify that default strategies have the expected values.

Closes #777.
  • Loading branch information
ricea authored and domenic committed Sep 26, 2017
1 parent 567d320 commit fd17cf0
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 19 deletions.
4 changes: 2 additions & 2 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const { WritableStream, WritableStreamDefaultControllerErrorIfNeeded } = require
// Class TransformStream

class TransformStream {
constructor(transformer = {}, writableStrategy = undefined, readableStrategy = undefined) {
constructor(transformer = {}, writableStrategy = undefined, { size, highWaterMark = 0 } = {}) {
this._transformer = transformer;

this._transformStreamController = undefined;
Expand All @@ -28,7 +28,7 @@ class TransformStream {

const source = new TransformStreamDefaultSource(this, startPromise);

this._readable = new ReadableStream(source, readableStrategy);
this._readable = new ReadableStream(source, { size, highWaterMark });

const sink = new TransformStreamDefaultSink(this, startPromise);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@ error1.name = 'error1';
promise_test(() => {
const ts = recordingTransformStream();
const writer = ts.writable.getWriter();
// This call never resolves.
writer.write('a');
return flushAsyncEvents().then(() => {
assert_array_equals(ts.events, [], 'transform should not be called');
});
}, 'backpressure allows no transforms with a default identity transform and no reader');

promise_test(() => {
const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 });
const writer = ts.writable.getWriter();
// This call to write() resolves asynchronously.
writer.write('a');
// This call to write() waits for backpressure that is never relieved and never calls transform().
writer.write('b');
return delay(0).then(() => {
return flushAsyncEvents().then(() => {
assert_array_equals(ts.events, ['transform', 'a'], 'transform should be called once');
});
}, 'backpressure only allows one transform() with a default identity transform and no reader');
}, 'backpressure only allows one transform() with a identity transform with a readable HWM of 1 and no reader');

promise_test(() => {
// Without a transform() implementation, recordingTransformStream() never enqueues anything.
Expand All @@ -28,7 +38,7 @@ promise_test(() => {
// Discard all chunks. As a result, the readable side is never full enough to exert backpressure and transform()
// keeps being called.
}
});
}, undefined, { highWaterMark: 1 });
const writer = ts.writable.getWriter();
const writePromises = [];
for (let i = 0; i < 4; ++i) {
Expand All @@ -41,7 +51,7 @@ promise_test(() => {
}, 'transform() should keep being called as long as there is no backpressure');

promise_test(() => {
const ts = new TransformStream();
const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
const writer = ts.writable.getWriter();
const reader = ts.readable.getReader();
const events = [];
Expand Down Expand Up @@ -88,7 +98,7 @@ promise_test(() => {
controller.enqueue(chunk);
return reader.read();
}
});
}, undefined, { highWaterMark: 1 });
const writer = ts.writable.getWriter();
reader = ts.readable.getReader();
return writer.write('a');
Expand Down Expand Up @@ -132,10 +142,18 @@ promise_test(t => {
ts.readable.cancel(error1);
return promise_rejects(t, error1, ts.writable.getWriter().closed, 'closed should reject');
});
}, 'writer.closed should resolve after readable is canceled with backpressure');

promise_test(t => {
const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
return delay(0).then(() => {
ts.readable.cancel(error1);
return promise_rejects(t, error1, ts.writable.getWriter().closed, 'closed should reject');
});
}, 'writer.closed should resolve after readable is canceled with no backpressure');

promise_test(() => {
const ts = new TransformStream();
const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
const writer = ts.writable.getWriter();
return delay(0).then(() => {
const writePromise = writer.write('a');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ promise_test(t => {
controller.close();
throw thrownError;
}
});
}, undefined, { highWaterMark: 1 });
const writePromise = ts.writable.getWriter().write('a');
const closedPromise = ts.readable.getReader().closed;
return Promise.all([
Expand Down Expand Up @@ -218,7 +218,7 @@ promise_test(t => {
transform() {
return transformPromise;
}
}, { highWaterMark: 2 });
}, undefined, { highWaterMark: 2 });
const writer = ts.writable.getWriter();
return delay(0).then(() => {
const writePromise = writer.write();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ promise_test(() => {
'result from reading the readable is the same as was written to writable');
assert_false(result.done, 'stream should not be done');

return writer.ready.then(() => {
assert_equals(writer.desiredSize, 1, 'desiredSize should be 1 again');
});
return delay(0).then(() => assert_equals(writer.desiredSize, 1, 'desiredSize should be 1 again'));
});
}, 'Identity TransformStream: can read from readable what is put into writable');

Expand Down Expand Up @@ -180,11 +178,15 @@ promise_test(() => {
}, 'TransformStream: by default, closing the writable closes the readable (when there are no queued writes)');

promise_test(() => {
let transformResolve;
const transformPromise = new Promise(resolve => {
transformResolve = resolve;
});
const ts = new TransformStream({
transform() {
return delay(50);
return transformPromise;
}
});
}, undefined, { highWaterMark: 1 });

const writer = ts.writable.getWriter();
writer.write('a');
Expand All @@ -197,6 +199,7 @@ promise_test(() => {

return delay(0).then(() => {
assert_equals(rsClosed, false, 'readable is not closed after a tick');
transformResolve();

return writer.closed.then(() => {
// TODO: Is this expectation correct?
Expand Down Expand Up @@ -333,7 +336,7 @@ promise_test(t => {
});
});
}
});
}, undefined, { highWaterMark: 1 });

assert_true(startCalled, 'start() should be called synchronously');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ if (self.importScripts) {
test(() => {
const ts = new TransformStream({}, { highWaterMark: 17 });
assert_equals(ts.writable.getWriter().desiredSize, 17, 'desiredSize should be 17');
}, 'writableStrategy highWaterMark works');
}, 'writableStrategy highWaterMark should work');

promise_test(() => {
const ts = recordingTransformStream({}, undefined, { highWaterMark: 9 });
Expand All @@ -27,7 +27,7 @@ promise_test(() => {
'transform', 5, 'transform', 6, 'transform', 7, 'transform', 8],
'transform() should have been called 9 times');
});
}, 'readableStrategy highWaterMark works');
}, 'readableStrategy highWaterMark should work');

promise_test(t => {
let writableSizeCalled = false;
Expand Down Expand Up @@ -61,6 +61,32 @@ promise_test(t => {
return ts.writable.getWriter().write().then(() => {
assert_true(transformCalled, 'transform() should be called');
});
}, 'writable has the correct size() function');
}, 'writable should have the correct size() function');

test(() => {
const ts = new TransformStream();
const writer = ts.writable.getWriter();
assert_equals(writer.desiredSize, 1, 'default writable HWM is 1');
// There should be no size function, but a size function that always returns 1 is indistinguishable.
writer.write(undefined);
assert_equals(writer.desiredSize, 0, 'default chunk size is 1');
}, 'default writable strategy should be equivalent to { highWaterMark: 1 }');

promise_test(t => {
const ts = new TransformStream({
transform(chunk, controller) {
return t.step(() => {
assert_equals(controller.desiredSize, 0, 'desiredSize should be 0');
controller.enqueue(undefined);
// The first chunk enqueued is consumed by the pending read().
assert_equals(controller.desiredSize, 0, 'desiredSize should still be 0');
controller.enqueue(undefined);
assert_equals(controller.desiredSize, -1, 'desiredSize should be -1');
});
}
});
const writePromise = ts.writable.getWriter().write();
return ts.readable.getReader().read().then(() => writePromise);
}, 'default readable strategy should be equivalent to { highWaterMark: 0 }');

done();

0 comments on commit fd17cf0

Please sign in to comment.