Skip to content

Commit

Permalink
Introduce readable stream controller class
Browse files Browse the repository at this point in the history
Instead of creating enqueue, close, and error functions per-stream, and passing (some subset of them) to the underlying source's start() and pull() methods, we instead create an instance of the ReadableStreamController class, which has methods enqueue(), close(), and error().

This closes #309, which contains more discussion and justification. It sets the stage for #301 (which will work by adding another property to the ReadableStreamController class).
  • Loading branch information
domenic committed Mar 31, 2015
1 parent 8549ce0 commit 4a9b2ec
Show file tree
Hide file tree
Showing 17 changed files with 619 additions and 490 deletions.
284 changes: 166 additions & 118 deletions index.bs

Large diffs are not rendered by default.

232 changes: 134 additions & 98 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@ export default class ReadableStream {
this._pullingPromise = undefined;
this._storedError = undefined;

this._enqueue = CreateReadableStreamEnqueueFunction(this);
this._close = CreateReadableStreamCloseFunction(this);
this._error = CreateReadableStreamErrorFunction(this);
this._controller = new ReadableStreamController(this);

const startResult = InvokeOrNoop(underlyingSource, 'start', [this._enqueue, this._close, this._error]);
const startResult = InvokeOrNoop(underlyingSource, 'start', [this._controller]);
Promise.resolve(startResult).then(
() => {
this._started = true;
CallReadableStreamPull(this);
},
r => this._error(r)
r => ErrorReadableStream(this, r)
);
}

Expand Down Expand Up @@ -254,6 +252,111 @@ class ReadableStreamReader {
}
}

class ReadableStreamController {
constructor(stream) {
if (IsReadableStream(stream) === false) {
throw new TypeError('ReadableStreamController can only be constructed with a ReadableStream instance');
}

if (stream._controller !== undefined) {
throw new TypeError('ReadableStreamController instances can only be created by the ReadableStream constructor');
}

this._controlledReadableStream = stream;
}

close() {
if (IsReadableStreamController(this) === false) {
throw new TypeError('ReadableStreamController.prototype.close can only be used on a ReadableStreamController');
}

if (this._controlledReadableStream._closeRequested === true) {
throw new TypeError('The stream has already been closed; do not close it again!');
}
if (this._controlledReadableStream._state === 'errored') {
throw new TypeError('The stream is in an errored state and cannot be closed');
}

if (this._controlledReadableStream._state === 'closed') {
// This will happen if the stream was closed without close() being called, i.e. by a call to stream.cancel()
return undefined;
}

this._controlledReadableStream._closeRequested = true;

if (this._controlledReadableStream._queue.length === 0) {
return CloseReadableStream(this._controlledReadableStream);
}
}

enqueue(chunk) {
if (IsReadableStreamController(this) === false) {
throw new TypeError('ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController');
}

if (this._controlledReadableStream._state === 'errored') {
throw this._controlledReadableStream._storedError;
}

if (this._controlledReadableStream._state === 'closed') {
throw new TypeError('stream is closed');
}

if (this._controlledReadableStream._closeRequested === true) {
throw new TypeError('stream is draining');
}

if (IsReadableStreamLocked(this._controlledReadableStream) === true &&
this._controlledReadableStream._reader._readRequests.length > 0) {
const readRequest = this._controlledReadableStream._reader._readRequests.shift();
readRequest._resolve(CreateIterResultObject(chunk, false));
} else {
let chunkSize = 1;

let strategy;
try {
strategy = this._controlledReadableStream._underlyingSource.strategy;
} catch (strategyE) {
ErrorReadableStream(this._controlledReadableStream, strategyE);
throw strategyE;
}

if (strategy !== undefined) {
try {
chunkSize = strategy.size(chunk);
} catch (chunkSizeE) {
ErrorReadableStream(this._controlledReadableStream, chunkSizeE);
throw chunkSizeE;
}
}

try {
EnqueueValueWithSize(this._controlledReadableStream._queue, chunk, chunkSize);
} catch (enqueueE) {
ErrorReadableStream(this._controlledReadableStream, enqueueE);
throw enqueueE;
}
}

CallReadableStreamPull(this._controlledReadableStream);

const shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(this._controlledReadableStream);
if (shouldApplyBackpressure === true) {
return false;
}
return true;
}

error(e) {
if (IsReadableStreamController(this) === false) {
throw new TypeError('ReadableStreamController.prototype.error can only be used on a ReadableStreamController');
}

return ErrorReadableStream(this._controlledReadableStream, e);
}
}


function AcquireReadableStreamReader(stream) {
return new ReadableStreamReader(stream);
}
Expand All @@ -279,10 +382,10 @@ function CallReadableStreamPull(stream) {
return undefined;
}

stream._pullingPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'pull', [stream._enqueue, stream._close]);
stream._pullingPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'pull', [stream._controller]);
stream._pullingPromise.then(
() => { stream._pullingPromise = undefined; },
e => { stream._error(e); }
e => ErrorReadableStream(stream, e)
);

return undefined;
Expand Down Expand Up @@ -315,97 +418,18 @@ function CloseReadableStream(stream) {
return undefined;
}

function CreateReadableStreamCloseFunction(stream) {
return () => {
if (stream._closeRequested === true) {
throw new TypeError('The stream has already been closed; do not close it again!');
}
if (stream._state === 'errored') {
throw new TypeError('The stream is in an errored state and cannot be closed');
}

if (stream._state === 'closed') {
// This will happen if the stream was closed without close() being called, i.e. by a call to stream.cancel()
return undefined;
}

stream._closeRequested = true;

if (stream._queue.length === 0) {
return CloseReadableStream(stream);
}
};
}

function CreateReadableStreamEnqueueFunction(stream) {
return chunk => {
if (stream._state === 'errored') {
throw stream._storedError;
}

if (stream._state === 'closed') {
throw new TypeError('stream is closed');
}

if (stream._closeRequested === true) {
throw new TypeError('stream is draining');
}

if (IsReadableStreamLocked(stream) === true && stream._reader._readRequests.length > 0) {
const readRequest = stream._reader._readRequests.shift();
readRequest._resolve(CreateIterResultObject(chunk, false));
} else {
let chunkSize = 1;

let strategy;
try {
strategy = stream._underlyingSource.strategy;
} catch (strategyE) {
stream._error(strategyE);
throw strategyE;
}

if (strategy !== undefined) {
try {
chunkSize = strategy.size(chunk);
} catch (chunkSizeE) {
stream._error(chunkSizeE);
throw chunkSizeE;
}
}

try {
EnqueueValueWithSize(stream._queue, chunk, chunkSize);
} catch (enqueueE) {
stream._error(enqueueE);
throw enqueueE;
}
}

CallReadableStreamPull(stream);

const shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream);
if (shouldApplyBackpressure === true) {
return false;
}
return true;
};
}

function CreateReadableStreamErrorFunction(stream) {
return e => {
if (stream._state !== 'readable') {
throw new TypeError(`The stream is ${stream._state} and so cannot be errored`);
}
function ErrorReadableStream(stream, e) {
if (stream._state !== 'readable') {
throw new TypeError(`The stream is ${stream._state} and so cannot be errored`);
}

stream._queue = [];
stream._storedError = e;
stream._state = 'errored';
stream._queue = [];
stream._storedError = e;
stream._state = 'errored';

if (IsReadableStreamLocked(stream) === true) {
return ReleaseReadableStreamReader(stream._reader);
}
};
if (IsReadableStreamLocked(stream) === true) {
return ReleaseReadableStreamReader(stream._reader);
}
}

function IsReadableStream(x) {
Expand All @@ -430,6 +454,18 @@ function IsReadableStreamLocked(stream) {
return true;
}

function IsReadableStreamController(x) {
if (!typeIsObject(x)) {
return false;
}

if (!Object.prototype.hasOwnProperty.call(x, '_controlledReadableStream')) {
return false;
}

return true;
}

function IsReadableStreamReader(x) {
if (!typeIsObject(x)) {
return false;
Expand Down Expand Up @@ -477,15 +513,15 @@ function ShouldReadableStreamApplyBackpressure(stream) {
try {
strategy = stream._underlyingSource.strategy;
} catch (strategyE) {
stream._error(strategyE);
ErrorReadableStream(stream, strategyE);
throw strategyE;
}

if (strategy !== undefined) {
try {
shouldApplyBackpressure = Boolean(strategy.shouldApplyBackpressure(queueSize));
} catch (shouldApplyBackpressureE) {
stream._error(shouldApplyBackpressureE);
ErrorReadableStream(stream, shouldApplyBackpressureE);
throw shouldApplyBackpressureE;
}
}
Expand Down
8 changes: 4 additions & 4 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ export default class TransformStream {

let enqueueInReadable, closeReadable, errorReadable;
const readable = this.readable = new ReadableStream({
start(enqueue, close, error) {
enqueueInReadable = enqueue;
closeReadable = close;
errorReadable = error;
start(c) {
enqueueInReadable = c.enqueue.bind(c);
closeReadable = c.close.bind(c);
errorReadable = c.error.bind(c);
},
pull() {
if (chunkWrittenButNotYetTransformed === true) {
Expand Down
Loading

0 comments on commit 4a9b2ec

Please sign in to comment.