Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce readable stream controller class #310

Merged
merged 1 commit into from
Mar 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 166 additions & 118 deletions index.bs

Large diffs are not rendered by default.

232 changes: 134 additions & 98 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@ export default class ReadableStream {
this._pullingPromise = undefined;
this._storedError = undefined;

this._enqueue = CreateReadableStreamEnqueueFunction(this);
this._close = CreateReadableStreamCloseFunction(this);
this._error = CreateReadableStreamErrorFunction(this);
this._controller = new ReadableStreamController(this);

const startResult = InvokeOrNoop(underlyingSource, 'start', [this._enqueue, this._close, this._error]);
const startResult = InvokeOrNoop(underlyingSource, 'start', [this._controller]);
Promise.resolve(startResult).then(
() => {
this._started = true;
CallReadableStreamPull(this);
},
r => this._error(r)
r => ErrorReadableStream(this, r)
);
}

Expand Down Expand Up @@ -254,6 +252,111 @@ class ReadableStreamReader {
}
}

class ReadableStreamController {
constructor(stream) {
if (IsReadableStream(stream) === false) {
throw new TypeError('ReadableStreamController can only be constructed with a ReadableStream instance');
}

if (stream._controller !== undefined) {
throw new TypeError('ReadableStreamController instances can only be created by the ReadableStream constructor');
}

this._controlledReadableStream = stream;
}

close() {
if (IsReadableStreamController(this) === false) {
throw new TypeError('ReadableStreamController.prototype.close can only be used on a ReadableStreamController');
}

if (this._controlledReadableStream._closeRequested === true) {
throw new TypeError('The stream has already been closed; do not close it again!');
}
if (this._controlledReadableStream._state === 'errored') {
throw new TypeError('The stream is in an errored state and cannot be closed');
}

if (this._controlledReadableStream._state === 'closed') {
// This will happen if the stream was closed without close() being called, i.e. by a call to stream.cancel()
return undefined;
}

this._controlledReadableStream._closeRequested = true;

if (this._controlledReadableStream._queue.length === 0) {
return CloseReadableStream(this._controlledReadableStream);
}
}

enqueue(chunk) {
if (IsReadableStreamController(this) === false) {
throw new TypeError('ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController');
}

if (this._controlledReadableStream._state === 'errored') {
throw this._controlledReadableStream._storedError;
}

if (this._controlledReadableStream._state === 'closed') {
throw new TypeError('stream is closed');
}

if (this._controlledReadableStream._closeRequested === true) {
throw new TypeError('stream is draining');
}

if (IsReadableStreamLocked(this._controlledReadableStream) === true &&
this._controlledReadableStream._reader._readRequests.length > 0) {
const readRequest = this._controlledReadableStream._reader._readRequests.shift();
readRequest._resolve(CreateIterResultObject(chunk, false));
} else {
let chunkSize = 1;

let strategy;
try {
strategy = this._controlledReadableStream._underlyingSource.strategy;
} catch (strategyE) {
ErrorReadableStream(this._controlledReadableStream, strategyE);
throw strategyE;
}

if (strategy !== undefined) {
try {
chunkSize = strategy.size(chunk);
} catch (chunkSizeE) {
ErrorReadableStream(this._controlledReadableStream, chunkSizeE);
throw chunkSizeE;
}
}

try {
EnqueueValueWithSize(this._controlledReadableStream._queue, chunk, chunkSize);
} catch (enqueueE) {
ErrorReadableStream(this._controlledReadableStream, enqueueE);
throw enqueueE;
}
}

CallReadableStreamPull(this._controlledReadableStream);

const shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(this._controlledReadableStream);
if (shouldApplyBackpressure === true) {
return false;
}
return true;
}

error(e) {
if (IsReadableStreamController(this) === false) {
throw new TypeError('ReadableStreamController.prototype.error can only be used on a ReadableStreamController');
}

return ErrorReadableStream(this._controlledReadableStream, e);
}
}


function AcquireReadableStreamReader(stream) {
return new ReadableStreamReader(stream);
}
Expand All @@ -279,10 +382,10 @@ function CallReadableStreamPull(stream) {
return undefined;
}

stream._pullingPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'pull', [stream._enqueue, stream._close]);
stream._pullingPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'pull', [stream._controller]);
stream._pullingPromise.then(
() => { stream._pullingPromise = undefined; },
e => { stream._error(e); }
e => ErrorReadableStream(stream, e)
);

return undefined;
Expand Down Expand Up @@ -315,97 +418,18 @@ function CloseReadableStream(stream) {
return undefined;
}

function CreateReadableStreamCloseFunction(stream) {
return () => {
if (stream._closeRequested === true) {
throw new TypeError('The stream has already been closed; do not close it again!');
}
if (stream._state === 'errored') {
throw new TypeError('The stream is in an errored state and cannot be closed');
}

if (stream._state === 'closed') {
// This will happen if the stream was closed without close() being called, i.e. by a call to stream.cancel()
return undefined;
}

stream._closeRequested = true;

if (stream._queue.length === 0) {
return CloseReadableStream(stream);
}
};
}

function CreateReadableStreamEnqueueFunction(stream) {
return chunk => {
if (stream._state === 'errored') {
throw stream._storedError;
}

if (stream._state === 'closed') {
throw new TypeError('stream is closed');
}

if (stream._closeRequested === true) {
throw new TypeError('stream is draining');
}

if (IsReadableStreamLocked(stream) === true && stream._reader._readRequests.length > 0) {
const readRequest = stream._reader._readRequests.shift();
readRequest._resolve(CreateIterResultObject(chunk, false));
} else {
let chunkSize = 1;

let strategy;
try {
strategy = stream._underlyingSource.strategy;
} catch (strategyE) {
stream._error(strategyE);
throw strategyE;
}

if (strategy !== undefined) {
try {
chunkSize = strategy.size(chunk);
} catch (chunkSizeE) {
stream._error(chunkSizeE);
throw chunkSizeE;
}
}

try {
EnqueueValueWithSize(stream._queue, chunk, chunkSize);
} catch (enqueueE) {
stream._error(enqueueE);
throw enqueueE;
}
}

CallReadableStreamPull(stream);

const shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream);
if (shouldApplyBackpressure === true) {
return false;
}
return true;
};
}

function CreateReadableStreamErrorFunction(stream) {
return e => {
if (stream._state !== 'readable') {
throw new TypeError(`The stream is ${stream._state} and so cannot be errored`);
}
function ErrorReadableStream(stream, e) {
if (stream._state !== 'readable') {
throw new TypeError(`The stream is ${stream._state} and so cannot be errored`);
}

stream._queue = [];
stream._storedError = e;
stream._state = 'errored';
stream._queue = [];
stream._storedError = e;
stream._state = 'errored';

if (IsReadableStreamLocked(stream) === true) {
return ReleaseReadableStreamReader(stream._reader);
}
};
if (IsReadableStreamLocked(stream) === true) {
return ReleaseReadableStreamReader(stream._reader);
}
}

function IsReadableStream(x) {
Expand All @@ -430,6 +454,18 @@ function IsReadableStreamLocked(stream) {
return true;
}

function IsReadableStreamController(x) {
if (!typeIsObject(x)) {
return false;
}

if (!Object.prototype.hasOwnProperty.call(x, '_controlledReadableStream')) {
return false;
}

return true;
}

function IsReadableStreamReader(x) {
if (!typeIsObject(x)) {
return false;
Expand Down Expand Up @@ -477,15 +513,15 @@ function ShouldReadableStreamApplyBackpressure(stream) {
try {
strategy = stream._underlyingSource.strategy;
} catch (strategyE) {
stream._error(strategyE);
ErrorReadableStream(stream, strategyE);
throw strategyE;
}

if (strategy !== undefined) {
try {
shouldApplyBackpressure = Boolean(strategy.shouldApplyBackpressure(queueSize));
} catch (shouldApplyBackpressureE) {
stream._error(shouldApplyBackpressureE);
ErrorReadableStream(stream, shouldApplyBackpressureE);
throw shouldApplyBackpressureE;
}
}
Expand Down
8 changes: 4 additions & 4 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ export default class TransformStream {

let enqueueInReadable, closeReadable, errorReadable;
const readable = this.readable = new ReadableStream({
start(enqueue, close, error) {
enqueueInReadable = enqueue;
closeReadable = close;
errorReadable = error;
start(c) {
enqueueInReadable = c.enqueue.bind(c);
closeReadable = c.close.bind(c);
errorReadable = c.error.bind(c);
},
pull() {
if (chunkWrittenButNotYetTransformed === true) {
Expand Down
Loading