Skip to content

Commit

Permalink
Initial pass at ReadableByteStream in the spec
Browse files Browse the repository at this point in the history
Also includes a quick explainer at docs/ReadableByteStream.md.

Track overall progress at whatwg#300.
  • Loading branch information
tyoshino authored and domenic committed May 3, 2015
1 parent c80e15e commit 5b47faa
Show file tree
Hide file tree
Showing 8 changed files with 749 additions and 1,309 deletions.
198 changes: 1 addition & 197 deletions BinaryExtension.md
Original file line number Diff line number Diff line change
@@ -1,197 +1 @@
# Streams API Binary Extension

## Readable Byte Stream API

This extended API allows efficient access to a data source available for read via the POSIX non-blocking `read(2)` API or similar.

We map the non-blocking `read(2)` to a non-blocking method `ReadableByteStream.prototype.readInto()`, and asynchronous notification of I/O events to the state and Promises.

`ReadableByteStream` has the same states as the `ReadableStream`. State transition happens on `notifyReady()` call and on calling `readInto()` of the underlying source.

When a `ReadableByteStream` is constructed, the underlying source watches for events on the file descriptor from which data is read using some I/O event loop built with `select(2)`, `poll(2)`, etc. `ReadableByteStream` provides a function `notifyReady()` to the underlying source. It is intended to be called when the file descriptor is ready. It moves the stream into the `"readable"` state. The `"readable"` state doesn't necessarily mean some bytes are available for read. It just means that `ReadableByteStream.prototype.readInto()` can be called. We need to call `read(2)` to know what kind of event has actually happened (new data available for read or the EOF or an error), so we enter `"readable"` and let the user call `readInto()`.

`ReadableByteStream`'s constructor takes `readInto()` function from the underlying source instead of taking `pull()` and providing `[[enqueue]]`. A user calls `ReadableByteStream.prototype.readInto()` with an ArrayBuffer prepared by the user to get available data written into the ArrayBuffer. The method calls the `readInto()` of the underlying source with the ArrayBuffer. `readInto()` calls `read(2)` to write read data directly onto the ArrayBuffer. The stream translates the return value of the function into the next state of the stream and returns the number of bytes written to the given ArrayBuffer as follows:

- If there are bytes available for read,
- The underlying source's `readInto()` should write the bytes into the ArrayBuffer and return the number of bytes written.
- Then, the stream stays in the `"readable"` state and `stream.readInto()` will return the number of bytes written.
- If the file descriptor has nothing available for non-blocking read, e.g. `read(2)` returning `EAGAIN`, `EWOULDBLOCK`, etc.,
- The underlying source's `readInto()` should write nothing into the ArrayBuffer and return -2.
- Then, the stream enters the `"waiting"` state and `stream.readInto()` will return 0.
- If the file descriptor reached the EOF,
- The underlying source's `readInto()` should write nothing into the ArrayBuffer and return -1.
- Then, the stream enters the `"closed"` state and `stream.readInto()` will return 0.
- If the `read(2)` fails,
- The underlying source's `readInto()` should write nothing into the ArrayBuffer and throw.
- Then, the stream enters the `"errored"` state and `stream.readInto()` will return 0.

By admitting returning the ArrayBuffer with no data written on it, `ReadableByteStream` satisfies the semantics of `ReadableStream`.

Underlying sources that can determine how many bytes can be read synchronously may optinally implement a getter `readableAmount()` to allow the user of the `ReadableByteStream` to adjust the size of an ArrayBuffer to pass to `readInto()`.

### ReadableByteStream

```
class ReadableByteStream {
constructor(underlyingByteSource = {})
get closed()
get readableAmount()
get ready()
get state()
cancel(reason)
pipeThrough({ writable, readable }, options)
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
read()
readInto(arrayBuffer, offset, size)
// Internal slots
[[state]] = "waiting"
[[storedError]]
[[readyPromise]]
[[closedPromise]]
// Holders for stuff given by the underlying source
[[onReadInto]]
[[onCancel]]
// Internal methods for use by the underlying source
[[notifyReady]]()
[[error]](any e)
}
```

`underlyingByteSource` has optional methods `start`, `readInto` and `cancel`, and an optional property `readBufferSize`.

#### Abstract Operations For ReadableByteStream Objects

##### Notify Ready Function

A notify ready function is an anonymous built-in function that has [[Stream]] internal slot.

When a notify ready function _F_ is called, the following steps are taken:

1. Let _stream_ be the value of _F_'s [[Stream]] internal slot.
1. If _stream_.[[state]] is not `"waiting"`, return.
1. Set _stream_.[[state]] to `"readable"`.
1. Resolve _stream_.[[readyPromise]] with **undefined**.

##### ErrorReadableByteStream( stream, error )

1. If _stream_.[[state]] is `"errored"` or `"closed"`, return.
1. If _stream_.[[state]] is `"waiting"`, reject _stream_.[[readyPromise]] with _error_.
1. If _stream_.[[state]] is `"readable"`, let _stream_.[[readyPromise]] be a new promise rejected with _error_.
1. Set _stream_.[[state]] to `"errored"`.
1. Set _stream_.[[storedError]] to _error_.
1. Reject _stream_.[[closedPromise]] with _error_.

##### Error Function

An error function is an anonymous built-in function that has [[Stream]] internal slot.

When an error function _F_ is called with argument _error_, the following steps are taken:

1. Let _stream_ be the value of _F_'s [[Stream]] internal slot.
1. ErrorReadableByteStream(_stream_, _error_).

#### Properties of the ReadableByteStream Prototype Object

##### constructor({ start, readInto, cancel, readBufferSize })

1. Let _stream_ be the **this** value.
1. If IsCallable(_start_) is false, then throw a **TypeError** exception.
1. If IsCallable(_readInto_) is false, then throw a **TypeError** exception.
1. If IsCallable(_cancel_) is false, then throw a **TypeError** exception.
1. If _readBufferSize_ is not **undefined**,
1. Let _readBufferSize_ be ToInteger(_readBufferSize_).
1. If _readBufferSize_ < 0, throw a **RangeError** exception.
1. Set _stream_.[[onReadInto]] to _readInto_.
1. Set _stream_.[[onCancel]] to _cancel_.
1. Set _stream_.[[readBufferSize]] to _readBufferSize_.
1. Let _stream_.[[readyPromise]] be a new promise.
1. Let _stream_.[[closedPromise]] be a new promise.
1. Let _stream_.[[notifyReady]] be a new built-in function object as defined in Notify Ready Function with [[Stream]] internal slot set to _stream_.
1. Let _stream_.[[error]] be a new built-in function object as defined in Error Function with [[Stream]] internal slot set to _stream_.
1. Let _startResult_ be the result of calling the [[Call]] internal method of _start_ with **undefined** as _thisArgument_ and (_stream_.[[notifyReady]], _stream_.[[error]]) as _argumentList_.
1. ReturnIfAbrupt(_startResult_).

##### ReadableByteStream.prototype.read ()

1. If **this**.[[readBufferSize]] is **undefined**, throw a **TypeError** exception.
1. Let _arrayBuffer_ be a new array buffer with length equals to **this**.[[readBufferSize]].
1. Let _bytesRead_ be Invoke(**this**, `"readInto"`, (_arrayBuffer_, 0, **this**.[[readBufferSize]])).
1. Let _resizedArrayBuffer_ be a new array buffer with length equal to _bytesRead_ created by transferring _arrayBuffer_ using the semantics of [the proposed `ArrayBuffer.transfer`](https://gist.github.com/andhow/95fb9e49996615764eff).
1. Return _resizedArrayBuffer_.

##### ReadableByteStream.prototype.readInto ( arrayBuffer, offset, size )

1. If **this**.[[state]] is `"waiting"` or `"closed"`, throw a **TypeError** exception.
1. If **this**.[[state]] is `"errored"`, throw **this**.[[storedError]].
1. Assert: **this**.[[state]] is `"readable"`.
1. If Type(_arrayBuffer_) is not Object, throw a **TypeError** exception.
1. If _arrayBuffer_ does not have an [[ArrayBufferData]] internal slot, throw a **TypeError** exception.
1. If the value of _arrayBuffer_'s [[ArrayBufferData]] internal slot is **undefined**, then throw a **TypeError** exception.
1. If IsNeuteredBuffer(_arrayBuffer_) is **true**, then throw a **TypeError** exception.
1. Let _bufferLength_ be the value of _arrayBuffer_'s [[ArrayBufferByteLength]] internal slot.
1. If _offset_ is **undefined**, let _offset_ be 0.
1. Otherwise,
1. Let _offset_ be ToInteger(_offset_).
1. ReturnIfAbrupt(_offset_).
1. If _offset_ < 0, throw a **RangeError** exception.
1. If _size_ is **undefined**, let _size_ be _bufferLength_ - _offset_.
1. Otherwise,
1. Let _size_ be ToInteger(_size_).
1. ReturnIfAbrupt(_size_).
1. If _size_ < 0 or _offset_ + _size_ > _bufferLength_, throw a **RangeError** exception.
1. Let _bytesRead_ be the result of calling the [[Call]] internal method of **this**.[[onReadInto]] with **undefined** as _thisArgument_ and (_arrayBuffer_, _offset_, _size_) as _argumentsList_.
1. If _bytesRead_ is an abrupt completion,
1. ErrorReadableByteStream(**this**, _bytesRead_.[[value]]).
1. Return _bytesRead_.
1. Let _bytesRead_ be ToNumber(_bytesRead_).
1. If _bytesRead_ is **NaN** or _bytesRead_ < -2 or _bytesRead_ > _bufferLength_,
1. Let _error_ be a **RangeError** exception.
1. ErrorReadableByteStream(**this**, _error_).
1. Throw _error_.
1. If _bytesRead_ is -2,
1. Set **this**.[[state]] to `"waiting"`.
1. Let **this**.[[readyPromise]] be a new promise.
1. Return 0.
1. If _bytesRead_ is -1,
1. Set **this**.[[state]] to `"closed"`
1. Resolve **this**.[[closedPromise]] with **undefined**.
1. Return 0.
1. Return _bytesRead_.

##### ReadableByteStream.prototype.cancel ( reason )

1. If `this.[[state]]` is `"closed"`, return a new promise resolved with **undefined**.
1. If `this.[[state]]` is `"errored"`, return a new promise rejected with `this.[[storedError]]`.
1. If `this.[[state]]` is `"waiting"`, resolve `this.[[readyPromise]]` with **undefined**.
1. Set `this.[[state]]` to `"closed"`.
1. Resolve `this.[[closedPromise]]` with **undefined**.
1. Let _cancelPromise_ be a new promise.
1. Let _sourceCancelPromise_ be the result of promise-calling **this**.\[\[onCancel]](_reason_).
1. Upon fulfillment of _sourceCancelPromise_, resolve _cancelPromise_ with **undefined**.
1. Upon rejection of _sourceCancelPromise_ with reason _r_, reject _cancelPromise_ with _r_.
1. Return _cancelPromise_.

##### get ReadableByteStream.prototype.state

1. Let _stream_ be the **this** value.
1. Return _stream_.[[state]].

##### get ReadableByteStream.prototype.ready

1. Let _stream_ be the **this** value.
1. Return _stream_.[[readyPromise]].

##### get ReadableByteStream.prototype.readableAmount

1. Return InvokeOrNoop(**this**@[[underlyingSink]], `"readableAmount"`, «»).

##### get ReadableByteStream.prototype.closed

1. Let _stream_ be the **this** value.
1. Return _stream_.[[closedPromise]].
See https://github.com/whatwg/streams/pull/343 for normative definition of the readable byte stream.
157 changes: 157 additions & 0 deletions docs/ReadableByteStream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# ReadableByteStream Docs

In document, the role and semantics of the `ReadableByteStream`-related APIs are explained briefly. We may eventually
move this documentation to somewhere like MDN.

This document is non-normative. Please see the specification for normative definition.

## ReadableByteStream

`ReadableByteStream` has a hidden state. The state can be one of the following:
- `"readable"`: Data may be readable
- `"closed"`: No more data is readable
- `"errored"`: The stream has been errored

The state is not exposed but is observable by calling the methods on the stream or reader.

### getByobReader()

**WARNING: Unstable API**

See [#294](https://github.com/whatwg/streams/issues/294) about method naming.

The `getByobReader` method creates a byob byte stream reader and locks the byte stream to the new reader.

### getReader()

The `getReader` method creates a readable stream reader and locks the byte stream to the new reader.

The reader's `read()` method returns a Uint8Array.

## ReadableByteStreamReader

### get closed()

Used for getting notified that the stream is closed or errored.

If the promise returned by this getter:
- fulfills, that means either of:
- the stream has been closed
- the reader has been released while the stream was readable
- rejects, that means either of:
- the stream has been errored

### cancel(reason)

Tells the byte stream to stop generating or buffering data.

- _reason_: An object indicating the reason why the consumer lost interest

#### Return value

If the returned promise:
- fulfills, that means either of:
- the stream has been already closed
- the reader has been already released while the stream was readable
- the stream was successfully cancelled for this `cancel()` call. In this case, the stream becomes `"closed"`.
- rejects, that means either of:
- the stream has been already errored
- the stream was cancelled for this `cancel()` call but the cancellation finished uncleanly. In this case, the stream becomes `"closed"`.

### read()

Used for reading bytes as an `ArrayBufferView` and also for getting notified that the stream is closed or errored.

#### Return value

If the return promise:
- fulfills with _fulfillmentValue_,
- if _fulfillmentValue_.done is set,
- that means either of:
- the stream has been closed
- the reader has been already released while the stream was readable
- _fulfillmentValue_.value is set to **undefined**
- otherwise,
- that means that bytes were successfully read. The bytes are stored in the region specified by _fulfillmentValue_.value which is an `Uint8Array`
- rejects, that means either of:
- the stream has been errored

### releaseLock()

Detaches the reader from the stream.

#### Return value and exception

The return value of this method is void (always **undefined** if successful).

If this method returns without throwing, that means either of:
- the reader was released successfully
- the reader has already been released

If this method throws,
- that means that some of `read()` calls haven't yet been completed
- the failure doesn't affect the state of the stream or reader

## ByobByteStreamReader

**WARNING: Unstable API**

### Class Definition

```
class ByobByteStreamReader {
constructor(byteStream)
get closed()
cancel(reason)
read(view)
releaseLock()
}
```

### read(view)

Used for reading bytes into `view` and also for getting notified that the stream is closed or errored.

- _view_: An `ArrayBufferView` to which the reader stores the bytes read from the stream

#### Return value

If the return promise:
- fulfills with _fulfillmentValue_,
- if _fulfillmentValue_.done is set,
- that means either of:
- the stream has been closed
- the reader has been already released while the stream was readable
- _fulfillmentValue_.value is set to an `ArrayBufferView` of the same type as _view_ with `byteLength` set to 0 and `byteOffset` set to the same value as _view_
- otherwise,
- that means that bytes were successfully read. The bytes are stored in the region specified by _fulfillmentValue_.value which is an `ArrayBufferView` of the same type as _view_ with `byteOffset` set to the same value as _view_
- rejects, that means either of:
- the stream has been errored

#### Algorithm draft

1. If IsReadableByteStreamReader(*this*) is *false*, throw a **TypeError** exception.
1. If **this**@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(_view_, **true**).
1. If **this**@[[state]] is "errored", return a new promise rejected with **this**@[[storedError]].
1. Assert: **this**@[[stream]] is not **undefined**.
1. Assert: **this**@[[stream]]@[[state]] is "readable".
1. If **this**@[[stream]]'s buffer is not empty,
1. Fill _view_ with the bytes in **this**@[[stream]]'s buffer.
1. Let _bytesFilled_ be the number of the bytes written to _view_ in the last step, and pop the bytes that have been consumed from **this**@[[stream]]'s buffer in the last step.
1. If **this**@[[stream]] can generate bytes to return synchronously,
1. Generate bytes into _view_.
1. If non-zero bytes have been written to _view_,
1. If the bytes are the final bytes to return, call-with-rethrow CloseReadableByteStream(**this**@[[stream]]).
1. Return a new promise resolved with CreateIterResultObject(_view_, **false**).
1. Otherwise,
1. Detach the ArrayBuffer object pointed by _view_ from _view_.
1. Let _view_ be a new reference pointing the ArrayBuffer.
1. Let _readRequestPromise_ be a new promise.
1. Append _readRequestPromise_ as the last element of *this*@[[readRequests]].
1. Run the steps below asynchronously,
1. Generate bytes into _view_.
1. Let _newView_ be a new `ArrayBufferView` of the same type whose `buffer` is _view_.buffer, `byteLength` is _view_.byteLength, and `byteOffset` is _view_.byteOffset - _bytesFilled_.
1. Resolve _readRequestPromise_ with CreateIterResultObject(_newView_, *false*).
1. Return _readRequestPromise_.
Loading

0 comments on commit 5b47faa

Please sign in to comment.