-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Call pull when needsMore is true #169
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,10 +87,10 @@ export default class ReadableStream { | |
this._waitPromise_resolve = resolve; | ||
this._waitPromise_reject = reject; | ||
}); | ||
this._callOrSchedulePull(); | ||
} | ||
} | ||
|
||
this._callOrSchedulePull(); | ||
return chunk; | ||
} | ||
|
||
|
@@ -201,6 +201,16 @@ export default class ReadableStream { | |
return output; | ||
} | ||
|
||
_getNeedsMore() { | ||
var queueSize = helpers.getTotalQueueSize(this._queue); | ||
try { | ||
return Boolean(this._strategy.needsMore(queueSize)); | ||
} catch (error) { | ||
this._error(error); | ||
throw error; | ||
} | ||
} | ||
|
||
_enqueue(chunk) { | ||
if (this._state === 'errored' || this._state === 'closed') { | ||
return false; | ||
|
@@ -219,24 +229,25 @@ export default class ReadableStream { | |
} | ||
|
||
helpers.enqueueValueWithSize(this._queue, chunk, chunkSize); | ||
this._pulling = false; | ||
|
||
var queueSize = helpers.getTotalQueueSize(this._queue); | ||
var needsMore; | ||
try { | ||
needsMore = Boolean(this._strategy.needsMore(queueSize)); | ||
needsMore = this._getNeedsMore(); | ||
} catch (error) { | ||
this._error(error); | ||
return false; | ||
} | ||
|
||
if (needsMore === false) { | ||
this._pulling = false; | ||
} | ||
|
||
if (this._state === 'waiting') { | ||
this._state = 'readable'; | ||
this._waitPromise_resolve(undefined); | ||
} | ||
|
||
return needsMore; | ||
} | ||
} | ||
|
||
_close() { | ||
if (this._state === 'waiting') { | ||
|
@@ -272,6 +283,15 @@ export default class ReadableStream { | |
if (this._pulling === true) { | ||
return; | ||
} | ||
|
||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't want to try/catch here; ReturnIfAbrupt just means letting the error bubble. Will fix while merging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Thanks |
||
if (this._getNeedsMore() === false) { | ||
return; | ||
} | ||
} catch (error) { | ||
return; | ||
} | ||
|
||
this._pulling = true; | ||
|
||
if (this._started === false) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ var test = require('tape'); | |
import sequentialReadableStream from './utils/sequential-rs'; | ||
import duckTypedPassThroughTransform from './utils/duck-typed-pass-through-transform'; | ||
import readableStreamToArray from './utils/readable-stream-to-array'; | ||
import CountQueuingStrategy from '../lib/count-queuing-strategy'; | ||
import ReadableStream from '../lib/readable-stream'; | ||
import WritableStream from '../lib/writable-stream'; | ||
import TransformStream from '../lib/transform-stream'; | ||
|
@@ -32,7 +33,8 @@ test('Piping through an identity transform stream will close the destination whe | |
transform(chunk, enqueue, done) { | ||
enqueue(chunk); | ||
done(); | ||
} | ||
}, | ||
outputStrategy: new CountQueuingStrategy({ highWaterMark: 1 }) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, something is broken if we need to add this for the streaming to work. Commenting it out results in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, Anyway, It seems weird that TransformStream is watching output.wait().then(maybeDoTransform) to if (output.state === 'waiting') {
maybeDoTransform();
} I guess the right fix is using a special There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good question. I kind of just let things fall out of using the existing ReadableStream API to implement it, and as part of that, the configurable outputStrategy appeared. We need to think a bit about what the desired backpressure is for
So I think I see the logic behind your prototyped changes. The Although, I wonder if, instead of reusing
Hmm this seems like a problem. HWM=0 was supposed to be "once you're above zero, stop pulling", not "never pull". Maybe the problem is that the default strategy is not HWM=0, but is instead needsMore() always returns false. Still, it seems weird that you can totally break a stream in this way by adding a strategy whose needsMore() always returns false. But perhaps it's just like adding one whose needsMore() throws?
I can't remember why I did that :(. I guess in light of my first few paragraphs, output.state should not matter. |
||
}); | ||
|
||
var ws = new WritableStream({ | ||
|
@@ -69,7 +71,8 @@ test('Piping through a zero-HWM transform stream immediately causes backpressure | |
transform(chunk, enqueue, done) { | ||
enqueue(chunk); | ||
done(); | ||
} | ||
}, | ||
outputStrategy: new CountQueuingStrategy({ highWaterMark: 1 }) | ||
}); | ||
|
||
var writtenValues = []; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can move the "If needsMore is an abrupt completion, call
this.[[error]]
(needsMore.[[value]])" into this operation as step 3, as long as we then still "return needsMore" as step 4.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see! Done