diff --git a/README.md b/README.md index 8e0e1ceee..ffb2fa283 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ Both `start` and `pull` are given the ability to manipulate the stream's interna 1. If `this.[[draining]]` is **false**, 1. Set `this.[[state]]` to `"waiting"`. 1. Let `this.[[waitPromise]]` be a new promise. - 1. Call `this.[[callOrSchedulePull]]()`. +1. Call `this.[[callOrSchedulePull]]()`. 1. Return `chunk`. ##### wait() @@ -163,16 +163,15 @@ For now, please consider the reference implementation normative: [reference-impl 1. Call `this.[[error]]`(_chunkSize_.[[value]]). 1. Return **false**. 1. EnqueueValueWithSize(`this.[[queue]]`, `chunk`, _chunkSize_.[[value]]). -1. Set `this.[[pulling]]` to **false**. -1. Let _queueSize_ be GetTotalQueueSize(`this.[[queue]]`). -1. Let _needsMore_ be ToBoolean(Invoke(`this.[[strategy]]`, `"needsMore"`, (_queueSize_))). +1. Let _needsMore_ be the result of calling **this**.\[\[getNeedsMore\]\](). 1. If _needsMore_ is an abrupt completion, - 1. Call `this.[[error]]`(_needsMore_.[[value]]). 1. Return **false**. +1. Let _needsMore_ be _needsMore_.[[value]]. +1. If _needsMore_ is **false**, set `this.[[pulling]]` to **false**. 1. If `this.[[state]]` is `"waiting"`, 1. Set `this.[[state]]` to `"readable"`. 1. Resolve `this.[[waitPromise]]` with **undefined**. -1. Return _needsMore_.[[value]]. +1. Return _needsMore_. ##### `[[close]]()` @@ -197,9 +196,20 @@ For now, please consider the reference implementation normative: [reference-impl 1. Let `this.[[waitPromise]]` be a new promise rejected with `e`. 1. Reject `this.[[closedPromise]]` with `e`. +##### `[[getNeedsMore]]()` + +1. Let _queueSize_ be GetTotalQueueSize(`this.[[queue]]`). +1. Return ToBoolean(Invoke(`this.[[strategy]]`, `"needsMore"`, (_queueSize_))). +1. If _needsMore_ is an abrupt completion, + 1. Call the **this**.\[\[error\]\](_needsMore_.[[value]]). +1. Return _needsMore_. + ##### `[[callOrSchedulePull]]()` 1. If `this.[[pulling]]` is **true**, return. +1. Let _needsMore_ be the result of calling **this**.\[\[getNeedsMore\]\](). +1. ReturnIfAbrupt(_needsMore_). +1. If _needsMore_ is **false**, return. 1. Set `this.[[pulling]]` to **true**. 1. If `this.[[started]]` is **false**, 1. Upon fulfillment of `this.[[startedPromise]]`, call `this.[[callPull]]`. diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 1a0994adf..4f6c37faa 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -87,10 +87,10 @@ export default class ReadableStream { this._waitPromise_resolve = resolve; this._waitPromise_reject = reject; }); - this._callOrSchedulePull(); } } + this._callOrSchedulePull(); return chunk; } @@ -201,6 +201,16 @@ export default class ReadableStream { return output; } + _getNeedsMore() { + var queueSize = helpers.getTotalQueueSize(this._queue); + try { + return Boolean(this._strategy.needsMore(queueSize)); + } catch (error) { + this._error(error); + throw error; + } + } + _enqueue(chunk) { if (this._state === 'errored' || this._state === 'closed') { return false; @@ -219,24 +229,25 @@ export default class ReadableStream { } helpers.enqueueValueWithSize(this._queue, chunk, chunkSize); - this._pulling = false; - var queueSize = helpers.getTotalQueueSize(this._queue); var needsMore; try { - needsMore = Boolean(this._strategy.needsMore(queueSize)); + needsMore = this._getNeedsMore(); } catch (error) { - this._error(error); return false; } + if (needsMore === false) { + this._pulling = false; + } + if (this._state === 'waiting') { this._state = 'readable'; this._waitPromise_resolve(undefined); } return needsMore; - } + } _close() { if (this._state === 'waiting') { @@ -272,6 +283,15 @@ export default class ReadableStream { if (this._pulling === true) { return; } + + try { + if (this._getNeedsMore() === false) { + return; + } + } catch (error) { + return; + } + this._pulling = true; if (this._started === false) { diff --git a/reference-implementation/test/pipe-through.js b/reference-implementation/test/pipe-through.js index 442a8535b..523a9b0ec 100644 --- a/reference-implementation/test/pipe-through.js +++ b/reference-implementation/test/pipe-through.js @@ -3,6 +3,7 @@ var test = require('tape'); import sequentialReadableStream from './utils/sequential-rs'; import duckTypedPassThroughTransform from './utils/duck-typed-pass-through-transform'; import readableStreamToArray from './utils/readable-stream-to-array'; +import CountQueuingStrategy from '../lib/count-queuing-strategy'; import ReadableStream from '../lib/readable-stream'; import WritableStream from '../lib/writable-stream'; import TransformStream from '../lib/transform-stream'; @@ -32,7 +33,8 @@ test('Piping through an identity transform stream will close the destination whe transform(chunk, enqueue, done) { enqueue(chunk); done(); - } + }, + outputStrategy: new CountQueuingStrategy({ highWaterMark: 1 }) }); var ws = new WritableStream({ @@ -69,7 +71,8 @@ test('Piping through a zero-HWM transform stream immediately causes backpressure transform(chunk, enqueue, done) { enqueue(chunk); done(); - } + }, + outputStrategy: new CountQueuingStrategy({ highWaterMark: 1 }) }); var writtenValues = []; diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 6f84b55a4..bd34375bd 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -31,19 +31,20 @@ test('Piping from a ReadableStream from which lots of data are readable synchron }); test('Piping from a ReadableStream in readable state to a WritableStream in closing state', t => { - var pullCount = 0; var cancelCalled = false; var rs = new ReadableStream({ start(enqueue, close) { - enqueue("Hello"); + t.assert(enqueue("Hello")); }, pull() { - ++pullCount; + t.fail('Unexpected pull call'); + t.end(); }, cancel() { t.assert(!cancelCalled); cancelCalled = true; - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'readable'); @@ -68,22 +69,23 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos }); test('Piping from a ReadableStream in readable state to a WritableStream in errored state', t => { - var pullCount = 0; var cancelCalled = false; var passedError = new Error('horrible things'); var rs = new ReadableStream({ start(enqueue, close) { - enqueue("Hello"); + t.assert(enqueue("Hello")); }, pull() { - ++pullCount; + t.fail('Unexpected pull call'); + t.end(); }, cancel(reason) { t.assert(!cancelCalled, 'cancel must not be called more than once'); cancelCalled = true; t.strictEqual(reason, passedError); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'readable'); @@ -136,7 +138,8 @@ test('Piping from a ReadableStream in closed state to a WritableStream in writab cancel(reason) { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'closed'); @@ -179,7 +182,8 @@ test('Piping from a ReadableStream in errored state to a WritableStream in writa cancel(reason) { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'errored'); @@ -229,7 +233,8 @@ test(`Piping from a ReadableStream in readable state which becomes closed after cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'readable'); @@ -273,7 +278,7 @@ test(`Piping from a ReadableStream in readable state which becomes errored after var pullCount = 0; var rs = new ReadableStream({ start(enqueue, close, error) { - enqueue("Hello"); + t.assert(enqueue("Hello")); errorReadableStream = error; }, pull() { @@ -282,7 +287,8 @@ test(`Piping from a ReadableStream in readable state which becomes errored after cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'readable'); @@ -336,15 +342,15 @@ test(`Piping from a ReadableStream in waiting state which becomes readable after cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); var ws = new WritableStream({ write(chunk) { t.equal(chunk, 'Hello'); - // Includes pull invoked inside read() - t.equal(pullCount, 2); + t.equal(pullCount, 1); t.end(); }, @@ -362,7 +368,7 @@ test(`Piping from a ReadableStream in waiting state which becomes readable after t.equal(rs.state, 'waiting'); t.equal(ws.state, 'writable'); - enqueue('Hello'); + t.assert(enqueue('Hello')); }); test(`Piping from a ReadableStream in waiting state which becomes errored after pipeTo call to a WritableStream in @@ -379,7 +385,8 @@ test(`Piping from a ReadableStream in waiting state which becomes errored after cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); var passedError = new Error('horrible things'); @@ -422,7 +429,8 @@ test(`Piping from a ReadableStream in waiting state to a WritableStream in writa t.equal(pullCount, 1); t.assert(writeCalled); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); var errorWritableStream; @@ -468,7 +476,7 @@ test(`Piping from a ReadableStream in readable state to a WritableStream in wait var pullCount = 0; var rs = new ReadableStream({ start(enqueue) { - enqueue("World"); + t.assert(enqueue("World")); }, pull() { ++pullCount; @@ -476,7 +484,8 @@ test(`Piping from a ReadableStream in readable state to a WritableStream in wait cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'readable'); @@ -525,7 +534,7 @@ test(`Piping from a ReadableStream in readable state to a WritableStream in wait var enqueue; var rs = new ReadableStream({ start(enqueue) { - enqueue("World"); + t.assert(enqueue("World")); }, pull() { t.fail('Unexpected pull call'); @@ -535,7 +544,8 @@ test(`Piping from a ReadableStream in readable state to a WritableStream in wait t.assert(writeCalled); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'readable'); @@ -576,7 +586,7 @@ test(`Piping from a ReadableStream in readable state which becomes errored after var errorReadableStream; var rs = new ReadableStream({ start(enqueue, close, error) { - enqueue("World"); + t.assert(enqueue("World")); errorReadableStream = error; }, pull() { @@ -586,7 +596,8 @@ test(`Piping from a ReadableStream in readable state which becomes errored after cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); t.equal(rs.state, 'readable'); @@ -635,7 +646,8 @@ test(`Piping from a ReadableStream in waiting state to a WritableStream in waiti cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); var checkSecondWrite = false; @@ -671,7 +683,7 @@ test(`Piping from a ReadableStream in waiting state to a WritableStream in waiti rs.pipeTo(ws); - enqueue('Goodbye'); + t.assert(enqueue('Goodbye')); // Check that nothing happens before calling done(), and then call done() // to check that pipeTo is woken up. @@ -695,7 +707,8 @@ test(`Piping from a ReadableStream in waiting state to a WritableStream in waiti cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); var done; @@ -749,7 +762,8 @@ test(`Piping from a ReadableStream in waiting state which becomes closed after p cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); var writeCalled = false; @@ -807,7 +821,8 @@ test(`Piping from a ReadableStream in waiting state which becomes errored after cancel() { t.fail('Unexpected cancel call'); t.end(); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); var writeCalled = false; diff --git a/reference-implementation/test/readable-stream-cancel.js b/reference-implementation/test/readable-stream-cancel.js index d28e4bca4..09fece4e9 100644 --- a/reference-implementation/test/readable-stream-cancel.js +++ b/reference-implementation/test/readable-stream-cancel.js @@ -1,5 +1,6 @@ var test = require('tape'); +import CountQueuingStrategy from '../lib/count-queuing-strategy'; import ReadableStream from '../lib/readable-stream'; import RandomPushSource from './utils/random-push-source'; import readableStreamToArray from './utils/readable-stream-to-array'; @@ -28,7 +29,9 @@ test('ReadableStream canceling an infinite stream', t => { cancelationFinished = true; resolve(); }, 50)); - } + }, + + strategy: new CountQueuingStrategy({ highWaterMark: 1 }) }); readableStreamToArray(rs).then( diff --git a/reference-implementation/test/readable-stream.js b/reference-implementation/test/readable-stream.js index 3b02e845e..ec096529c 100644 --- a/reference-implementation/test/readable-stream.js +++ b/reference-implementation/test/readable-stream.js @@ -97,10 +97,10 @@ test('ReadableStream avoid redundant pull call', t => { pull() { pullCount++; }, - cancel() { t.fail('cancel should not be called'); - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) }); rs.wait(); @@ -114,6 +114,33 @@ test('ReadableStream avoid redundant pull call', t => { }, 50); }); +test('ReadableStream pull is not called until enqueue returns false', t => { + var enqueue; + var pullCount = 0; + var rs = new ReadableStream({ + start(enqueue_) { + enqueue = enqueue_; + }, + pull() { + pullCount++; + }, + cancel() { + t.fail('cancel should not be called'); + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) + }); + + // Wait for rs to start. + setTimeout(() => { + rs.wait(); + t.equal(pullCount, 1); + t.equal(enqueue('a'), true); + rs.wait(); + t.equal(pullCount, 1, 'another pull should not be called unless we hit highWaterMark'); + t.end(); + }, 0); +}); + test('ReadableStream start throws an error', t => { t.plan(1); @@ -129,7 +156,12 @@ test('ReadableStream pull throws an error', t => { t.plan(4); var error = new Error('aaaugh!!'); - var rs = new ReadableStream({ pull() { throw error; } }); + var rs = new ReadableStream({ + pull() { + throw error; + }, + strategy: new CountQueuingStrategy({ highWaterMark: 10 }) + }); rs.wait().then(() => { t.fail('waiting should fail'); @@ -181,7 +213,9 @@ test('ReadableStream adapting a push source', t => { } randomSource.readStart(); - } + }, + + strategy: new CountQueuingStrategy({ highWaterMark: 1 }) }); readableStreamToArray(rs).then(chunks => { @@ -228,7 +262,8 @@ test('ReadableStream is able to pull data repeatedly if it\'s available synchron } else { close(); } - } + }, + strategy: new CountQueuingStrategy({ highWaterMark: 1 }) }); rs.wait().then(() => { diff --git a/reference-implementation/test/utils/sequential-rs.js b/reference-implementation/test/utils/sequential-rs.js index 169c1f41b..2dfdeb4e9 100644 --- a/reference-implementation/test/utils/sequential-rs.js +++ b/reference-implementation/test/utils/sequential-rs.js @@ -1,5 +1,6 @@ import ReadableStream from '../../lib/readable-stream'; import SequentialPullSource from './sequential-pull-source'; +import CountQueuingStrategy from '../../lib/count-queuing-strategy'; export default function sequentialReadableStream(limit, options) { var sequentialSource = new SequentialPullSource(limit, options); @@ -31,7 +32,9 @@ export default function sequentialReadableStream(limit, options) { enqueue(chunk); } }); - } + }, + + strategy: new CountQueuingStrategy({ highWaterMark: 1 }) }); stream.source = sequentialSource;