Skip to content

Commit

Permalink
feat: add opt-in fallback support (to roughly old impl) for readables…
Browse files Browse the repository at this point in the history
…tream errors.
  • Loading branch information
cjpillsbury committed Apr 22, 2024
1 parent 717e50e commit f01282e
Showing 1 changed file with 146 additions and 17 deletions.
163 changes: 146 additions & 17 deletions src/upchunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,18 @@ export type ChunkedStreamIterableOptions = {
maxChunkSize?: number;
};

export interface ChunkedIterable extends AsyncIterable<Blob> {
chunkSize: number;
readonly chunkByteSize: number;
readonly minChunkSize: number;
readonly maxChunkSize: number;
readonly error: Error | undefined;
}

// An Iterable that accepts a readableStream of binary data (Blob | Uint8Array) and provides
// an asyncIterator which yields Blob values of the current chunkSize until done. Note that
// chunkSize may change between iterations.
export class ChunkedStreamIterable implements AsyncIterable<Blob> {
export class ChunkedStreamIterable implements ChunkedIterable {
protected _chunkSize: number | undefined;
protected defaultChunkSize: number;
protected _error: Error | undefined;
Expand Down Expand Up @@ -150,6 +158,95 @@ export class ChunkedStreamIterable implements AsyncIterable<Blob> {
}
}

export class ChunkedFileIterable implements ChunkedIterable {
protected _chunkSize: number | undefined;
protected defaultChunkSize: number;
protected _error: Error | undefined;
public readonly minChunkSize: number;
public readonly maxChunkSize: number;

constructor(
protected file: File,
options: ChunkedStreamIterableOptions = {}
) {
if (!isValidChunkSize(options.defaultChunkSize, options)) {
throw getChunkSizeError(options.defaultChunkSize, options);
}
this.defaultChunkSize = options.defaultChunkSize ?? DEFAULT_CHUNK_SIZE;
this.minChunkSize = options.minChunkSize ?? DEFAULT_MIN_CHUNK_SIZE;
this.maxChunkSize = options.maxChunkSize ?? DEFAULT_MAX_CHUNK_SIZE;
}

get chunkSize() {
return this._chunkSize ?? this.defaultChunkSize;
}

set chunkSize(value) {
if (!isValidChunkSize(value, this)) {
throw getChunkSizeError(value, this);
}
this._chunkSize = value;
}

get chunkByteSize() {
return this.chunkSize * 1024;
}

get error() {
return this._error;
}

async *[Symbol.asyncIterator](): AsyncIterator<Blob> {
const reader = new FileReader();
let nextChunkRangeStart = 0;
/**
* Get portion of the file of x bytes corresponding to chunkSize
*/
const getChunk = () => {
return new Promise<Blob | undefined>((resolve) => {
if (nextChunkRangeStart >= this.file.size) {
resolve(undefined);
return;
}
// We either want to slize a "chunkByteSize-worth" of the file or
// slice to the end of the file (if less than a "chunkByteSize-worth" is left)
const length = Math.min(
this.chunkByteSize,
this.file.size - nextChunkRangeStart
);
reader.onload = () => {
if (reader.result !== null) {
resolve(
new Blob([reader.result], {
type: 'application/octet-stream',
})
);
} else {
resolve(undefined);
}
};

reader.readAsArrayBuffer(
this.file.slice(nextChunkRangeStart, nextChunkRangeStart + length)
);
});
};
try {
while (true) {
const nextChunk = await getChunk();
if (!!nextChunk) {
nextChunkRangeStart += nextChunk.size;
yield nextChunk;
} else {
break;
}
}
} catch (e) {
this._error = e;
}
}
}

const SUCCESSFUL_CHUNK_UPLOAD_CODES = [200, 201, 202, 204, 308];
const TEMPORARY_ERROR_CODES = [408, 502, 503, 504]; // These error codes imply a chunk may be retried
const RESUME_INCOMPLETE_CODES = [308];
Expand Down Expand Up @@ -234,6 +331,7 @@ export interface UpChunkOptions {
dynamicChunkSize?: boolean;
maxChunkSize?: number;
minChunkSize?: number;
useFileSliceFallback?: boolean;
}

export class UpChunk {
Expand All @@ -249,8 +347,8 @@ export class UpChunk {
public delayBeforeAttempt: number;
public retryCodes: number[];
public dynamicChunkSize: boolean;
protected chunkedStreamIterable: ChunkedStreamIterable;
protected chunkedStreamIterator;
protected chunkedIterable: ChunkedIterable;
protected chunkedIterator;

protected pendingChunk?: Blob;
private chunkCount: number;
Expand All @@ -268,6 +366,8 @@ export class UpChunk {
private eventTarget: EventTarget<Record<EventName, UpchunkEvent>>;

constructor(options: UpChunkOptions) {
this.eventTarget = new EventTarget();

this.endpoint = options.endpoint;
this.file = options.file;

Expand All @@ -290,20 +390,46 @@ export class UpChunk {
this.success = false;
this.nextChunkRangeStart = 0;

if (options.useFileSliceFallback) {
const readableStreamErrorCallback = (event: CustomEvent) => {
// In this case, assume the error is a result of file reading via ReadableStream.
// Retry using ChunkedFileIterable, which reads the file into memory instead
// of a stream.
if (this.chunkedIterable.error) {
console.warn(
`Unable to read file of size ${this.file.size} bytes via a ReadableStream. Falling back to in-memory FileReader!`
);
event.stopImmediatePropagation();

// Re-set everything up with the fallback iterable and corresponding
// iterator
this.chunkedIterable = new ChunkedFileIterable(this.file, {
...options,
defaultChunkSize: options.chunkSize,
});
this.chunkedIterator = this.chunkedIterable[Symbol.asyncIterator]();
this.getEndpoint().then(() => {
this.sendChunks();
});
this.off('error', readableStreamErrorCallback);
}
};
this.on('error', readableStreamErrorCallback);
}

// Types appear to be getting confused in env setup, using the overloaded NodeJS Blob definition, which uses NodeJS.ReadableStream instead
// of the DOM type definitions. For definitions, See consumers.d.ts vs. lib.dom.d.ts. (CJP)
this.chunkedStreamIterable = new ChunkedStreamIterable(
this.chunkedIterable = new ChunkedStreamIterable(
this.file.stream() as unknown as ReadableStream<Uint8Array>,
{ ...options, defaultChunkSize: options.chunkSize }
);
this.chunkedStreamIterator =
this.chunkedStreamIterable[Symbol.asyncIterator]();
this.chunkedIterator = this.chunkedIterable[Symbol.asyncIterator]();

// NOTE: Since some of upchunk's properties defer "source of truth" to
// chunkedIterable, we need to do these after it's been created (CJP).
this.totalChunks = Math.ceil(this.file.size / this.chunkByteSize);

this.eventTarget = new EventTarget();

this.validateOptions();

this.getEndpoint().then(() => this.sendChunks());

// restart sync when back online
Expand All @@ -327,23 +453,23 @@ export class UpChunk {
}

protected get maxChunkSize() {
return this.chunkedStreamIterable?.maxChunkSize ?? DEFAULT_MAX_CHUNK_SIZE;
return this.chunkedIterable?.maxChunkSize ?? DEFAULT_MAX_CHUNK_SIZE;
}

protected get minChunkSize() {
return this.chunkedStreamIterable?.minChunkSize ?? DEFAULT_MIN_CHUNK_SIZE;
return this.chunkedIterable?.minChunkSize ?? DEFAULT_MIN_CHUNK_SIZE;
}

public get chunkSize() {
return this.chunkedStreamIterable?.chunkSize ?? DEFAULT_CHUNK_SIZE;
return this.chunkedIterable?.chunkSize ?? DEFAULT_CHUNK_SIZE;
}

public set chunkSize(value) {
this.chunkedStreamIterable.chunkSize = value;
this.chunkedIterable.chunkSize = value;
}

public get chunkByteSize() {
return this.chunkedStreamIterable.chunkByteSize;
return this.chunkedIterable.chunkByteSize;
}

public get totalChunkSize() {
Expand Down Expand Up @@ -706,17 +832,20 @@ export class UpChunk {
}

while (!(this.success || this._paused || this.offline)) {
const { value: chunk, done } = await this.chunkedStreamIterator.next();
const { value: chunk, done } = await this.chunkedIterator.next();
// NOTE: When `done`, `chunk` is undefined, so default `chunkUploadSuccess`
// to be `true` on this condition, otherwise `false`.
let chunkUploadSuccess = !chunk && done;
if (chunk) {
chunkUploadSuccess = await this.sendChunkWithRetries(chunk);
}

if (this.chunkedStreamIterable.error) {
this.dispatch('error', { message: `Unable to read file of size ${this.file.size} bytes. Try loading from another browser.` });
if (this.chunkedIterable.error) {
chunkUploadSuccess = false;
this.dispatch('error', {
message: `Unable to read file of size ${this.file.size} bytes. Try loading from another browser.`,
});
return;
}
// NOTE: Need to disambiguate "last chunk to upload" (done) vs. "successfully"
// uploaded last chunk to upload" (depends on status of sendChunkWithRetries),
Expand Down

0 comments on commit f01282e

Please sign in to comment.