Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call pull when needsMore is true #169

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Both `start` and `pull` are given the ability to manipulate the stream's interna
1. If `this.[[draining]]` is **false**,
1. Set `this.[[state]]` to `"waiting"`.
1. Let `this.[[waitPromise]]` be a new promise.
1. Call `this.[[callOrSchedulePull]]()`.
1. Call `this.[[callOrSchedulePull]]()`.
1. Return `chunk`.

##### wait()
Expand Down Expand Up @@ -163,16 +163,15 @@ For now, please consider the reference implementation normative: [reference-impl
1. Call `this.[[error]]`(_chunkSize_.[[value]]).
1. Return **false**.
1. EnqueueValueWithSize(`this.[[queue]]`, `chunk`, _chunkSize_.[[value]]).
1. Set `this.[[pulling]]` to **false**.
1. Let _queueSize_ be GetTotalQueueSize(`this.[[queue]]`).
1. Let _needsMore_ be ToBoolean(Invoke(`this.[[strategy]]`, `"needsMore"`, (_queueSize_))).
1. Let _needsMore_ be the result of calling **this**.\[\[getNeedsMore\]\]().
1. If _needsMore_ is an abrupt completion,
1. Call `this.[[error]]`(_needsMore_.[[value]]).
1. Return **false**.
1. Let _needsMore_ be _needsMore_.[[value]].
1. If _needsMore_ is **false**, set `this.[[pulling]]` to **false**.
1. If `this.[[state]]` is `"waiting"`,
1. Set `this.[[state]]` to `"readable"`.
1. Resolve `this.[[waitPromise]]` with **undefined**.
1. Return _needsMore_.[[value]].
1. Return _needsMore_.

##### `[[close]]()`

Expand All @@ -197,9 +196,20 @@ For now, please consider the reference implementation normative: [reference-impl
1. Let `this.[[waitPromise]]` be a new promise rejected with `e`.
1. Reject `this.[[closedPromise]]` with `e`.

##### `[[getNeedsMore]]()`

1. Let _queueSize_ be GetTotalQueueSize(`this.[[queue]]`).
1. Return ToBoolean(Invoke(`this.[[strategy]]`, `"needsMore"`, (_queueSize_))).
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move the "If needsMore is an abrupt completion, call this.[[error]](needsMore.[[value]])" into this operation as step 3, as long as we then still "return needsMore" as step 4.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! Done

1. If _needsMore_ is an abrupt completion,
1. Call the **this**.\[\[error\]\](_needsMore_.[[value]]).
1. Return _needsMore_.

##### `[[callOrSchedulePull]]()`

1. If `this.[[pulling]]` is **true**, return.
1. Let _needsMore_ be the result of calling **this**.\[\[getNeedsMore\]\]().
1. ReturnIfAbrupt(_needsMore_).
1. If _needsMore_ is **false**, return.
1. Set `this.[[pulling]]` to **true**.
1. If `this.[[started]]` is **false**,
1. Upon fulfillment of `this.[[startedPromise]]`, call `this.[[callPull]]`.
Expand Down
32 changes: 26 additions & 6 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ export default class ReadableStream {
this._waitPromise_resolve = resolve;
this._waitPromise_reject = reject;
});
this._callOrSchedulePull();
}
}

this._callOrSchedulePull();
return chunk;
}

Expand Down Expand Up @@ -201,6 +201,16 @@ export default class ReadableStream {
return output;
}

_getNeedsMore() {
var queueSize = helpers.getTotalQueueSize(this._queue);
try {
return Boolean(this._strategy.needsMore(queueSize));
} catch (error) {
this._error(error);
throw error;
}
}

_enqueue(chunk) {
if (this._state === 'errored' || this._state === 'closed') {
return false;
Expand All @@ -219,24 +229,25 @@ export default class ReadableStream {
}

helpers.enqueueValueWithSize(this._queue, chunk, chunkSize);
this._pulling = false;

var queueSize = helpers.getTotalQueueSize(this._queue);
var needsMore;
try {
needsMore = Boolean(this._strategy.needsMore(queueSize));
needsMore = this._getNeedsMore();
} catch (error) {
this._error(error);
return false;
}

if (needsMore === false) {
this._pulling = false;
}

if (this._state === 'waiting') {
this._state = 'readable';
this._waitPromise_resolve(undefined);
}

return needsMore;
}
}

_close() {
if (this._state === 'waiting') {
Expand Down Expand Up @@ -272,6 +283,15 @@ export default class ReadableStream {
if (this._pulling === true) {
return;
}

try {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want to try/catch here; ReturnIfAbrupt just means letting the error bubble. Will fix while merging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks

if (this._getNeedsMore() === false) {
return;
}
} catch (error) {
return;
}

this._pulling = true;

if (this._started === false) {
Expand Down
7 changes: 5 additions & 2 deletions reference-implementation/test/pipe-through.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ var test = require('tape');
import sequentialReadableStream from './utils/sequential-rs';
import duckTypedPassThroughTransform from './utils/duck-typed-pass-through-transform';
import readableStreamToArray from './utils/readable-stream-to-array';
import CountQueuingStrategy from '../lib/count-queuing-strategy';
import ReadableStream from '../lib/readable-stream';
import WritableStream from '../lib/writable-stream';
import TransformStream from '../lib/transform-stream';
Expand Down Expand Up @@ -32,7 +33,8 @@ test('Piping through an identity transform stream will close the destination whe
transform(chunk, enqueue, done) {
enqueue(chunk);
done();
}
},
outputStrategy: new CountQueuingStrategy({ highWaterMark: 1 })
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, something is broken if we need to add this for the streaming to work. Commenting it out results in ws never becoming closed. Why? What went wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, output in pipeThrough() is basically not expected to behave as a queue but just to expose ReadableStream interface. But you've made strategies of output and input configurable. Maybe you expect them to work as queues. Right? I want to confirm.

Anyway, output must forward pulling signal from the destination of pipeThrough() to the core of TransformStream. TransformStream's maybeDoTransform() is invoked when output.state is "waiting" or when output calls pull(). Before the change, the destination sent the pulling signal to output by draining all data (ReadableStream was calling pull() whenever the [[queue]] becomes empty regardless of whatever strategy.needsMore says). Now, ReadableStream with HWM=0 never calls pull().

It seems weird that TransformStream is watching output.state, but thanks to that, TransformStream works even when pull() is not invoked. But we need to add

output.wait().then(maybeDoTransform)

to write(). Otherwise, the next write() call happens synchronously to writeDone() call, and the following line is processed before output.[[queue]] gets drained. So, maybeDoTransform() is never called.

if (output.state === 'waiting') {
  maybeDoTransform();
}

I guess the right fix is using a special ReadableStream for TransformStream.output which calls pull() iff its [[queue]] is empty (it must pass pulling signal but also must not buffer data to exert back pressure). That's I guess equivalent to watching at output.state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, output in pipeThrough() is basically not expected to behave as a queue but just to expose ReadableStream interface. But you've made strategies of output and input configurable. Maybe you expect them to work as queues. Right? I want to confirm.

Good question. I kind of just let things fall out of using the existing ReadableStream API to implement it, and as part of that, the configurable outputStrategy appeared.

We need to think a bit about what the desired backpressure is for rs.pipeThrough(t).pipeTo(ws). If rs is fast and ws is faster, then whether there's backpressure should be up to how long the transform for t takes. If it is slow then backpressure should be exerted; if it's at least as fast as rs then no backpressure should be exerted.

t exerts backpressure by making its writable side, t.input, move from "writable" to "waiting". The strategy for the readable side, t.output, does not impact the outside interface, since the only thing that it affects is the return value for enqueue, which is only called by the internals of TransformStream.

So I think I see the logic behind your prototyped changes. The inputStrategy (which we should probably just rename to strategy) allows the transform stream itself to have a queue on that side so that if it doesn't process things quickly enough, backpressure is delayed for a bit. But the outputStrategy is not useful.

Although, I wonder if, instead of reusing ReadableStream and WritableStream, we just tried to duplicate their interfaces, and if we got rid of the inputStrategy too, we'd be able to simplify the internal code for TransformStream a lot.

Before the change, the destination sent the pulling signal to output by draining all data (ReadableStream was calling pull() whenever the [[queue]] becomes empty regardless of whatever strategy.needsMore says). Now, ReadableStream with HWM=0 never calls pull().

Hmm this seems like a problem. HWM=0 was supposed to be "once you're above zero, stop pulling", not "never pull". Maybe the problem is that the default strategy is not HWM=0, but is instead needsMore() always returns false.

Still, it seems weird that you can totally break a stream in this way by adding a strategy whose needsMore() always returns false. But perhaps it's just like adding one whose needsMore() throws?

It seems weird that TransformStream is watching output.state

I can't remember why I did that :(. I guess in light of my first few paragraphs, output.state should not matter.

});

var ws = new WritableStream({
Expand Down Expand Up @@ -69,7 +71,8 @@ test('Piping through a zero-HWM transform stream immediately causes backpressure
transform(chunk, enqueue, done) {
enqueue(chunk);
done();
}
},
outputStrategy: new CountQueuingStrategy({ highWaterMark: 1 })
});

var writtenValues = [];
Expand Down
Loading