-
Notifications
You must be signed in to change notification settings - Fork 603
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(util-stream-node): provide handling utilities for Node.js stream (…
…#3778) * feat(util-sdk-stream): implement sdk stream utility mixin * feat: types of the SDK Stream feat(util-stream-node): merge util-sdk-stream into util-stream-node(browser) feeat(util-stream-node): rename transformToBuffer to transformToByteArray; unit test * feat(util-stream-node): update sdkStreamMixin input to unknown
- Loading branch information
1 parent
5b2dc89
commit 0ef4af6
Showing
4 changed files
with
223 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
export * from "./getAwsChunkedEncodingStream"; | ||
export * from "./sdk-stream-mixin"; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
import { SdkStreamMixin } from "@aws-sdk/types"; | ||
import { fromArrayBuffer } from "@aws-sdk/util-buffer-from"; | ||
import { PassThrough, Readable, Writable } from "stream"; | ||
|
||
import { sdkStreamMixin } from "./sdk-stream-mixin"; | ||
|
||
jest.mock("@aws-sdk/util-buffer-from"); | ||
|
||
describe(sdkStreamMixin.name, () => { | ||
const writeDataToStream = (stream: Writable, data: Array<ArrayBufferLike>): Promise<void> => | ||
new Promise((resolve, reject) => { | ||
data.forEach((chunk) => { | ||
stream.write(chunk, (err) => { | ||
if (err) reject(err); | ||
}); | ||
}); | ||
stream.end(resolve); | ||
}); | ||
const byteArrayFromBuffer = (buf: Buffer) => new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength); | ||
let passThrough: PassThrough; | ||
const expectAllTransformsToFail = async (sdkStream: SdkStreamMixin) => { | ||
const transformMethods: Array<keyof SdkStreamMixin> = [ | ||
"transformToByteArray", | ||
"transformToString", | ||
"transformToWebStream", | ||
]; | ||
for (const method of transformMethods) { | ||
try { | ||
await sdkStream[method](); | ||
fail(new Error("expect subsequent tranform to fail")); | ||
} catch (error) { | ||
expect(error.message).toContain("The stream has already been transformed"); | ||
} | ||
} | ||
}; | ||
|
||
beforeEach(() => { | ||
passThrough = new PassThrough(); | ||
}); | ||
|
||
it("should throw if unexpected stream implementation is supplied", () => { | ||
try { | ||
const payload = {}; | ||
sdkStreamMixin(payload); | ||
fail("should throw when unexpected stream is supplied"); | ||
} catch (error) { | ||
expect(error.message).toContain("Unexpected stream implementation"); | ||
} | ||
}); | ||
|
||
describe("transformToByteArray", () => { | ||
it("should transform binary stream to byte array", async () => { | ||
const mockData = [Buffer.from("foo"), Buffer.from("bar"), Buffer.from("buzz")]; | ||
const expected = byteArrayFromBuffer(Buffer.from("foobarbuzz")); | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
await writeDataToStream(passThrough, mockData); | ||
expect(await sdkStream.transformToByteArray()).toEqual(expected); | ||
}); | ||
|
||
it("should fail any subsequent tranform calls", async () => { | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
await writeDataToStream(passThrough, [Buffer.from("abc")]); | ||
expect(await sdkStream.transformToByteArray()).toEqual(byteArrayFromBuffer(Buffer.from("abc"))); | ||
await expectAllTransformsToFail(sdkStream); | ||
}); | ||
}); | ||
|
||
describe("transformToString", () => { | ||
const toStringMock = jest.fn(); | ||
beforeAll(() => { | ||
jest.resetAllMocks(); | ||
}); | ||
|
||
it("should transform the stream to string with utf-8 encoding by default", async () => { | ||
(fromArrayBuffer as jest.Mock).mockImplementation( | ||
jest.requireActual("@aws-sdk/util-buffer-from").fromArrayBuffer | ||
); | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
await writeDataToStream(passThrough, [Buffer.from("foo")]); | ||
const transformed = await sdkStream.transformToString(); | ||
expect(transformed).toEqual("foo"); | ||
}); | ||
|
||
it.each([undefined, "utf-8", "ascii", "base64", "latin1", "binary"])( | ||
"should transform the stream to string with %s encoding", | ||
async (encoding) => { | ||
(fromArrayBuffer as jest.Mock).mockReturnValue({ toString: toStringMock }); | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
await writeDataToStream(passThrough, [Buffer.from("foo")]); | ||
await sdkStream.transformToString(encoding); | ||
expect(toStringMock).toBeCalledWith(encoding); | ||
} | ||
); | ||
|
||
it("should fail any subsequent tranform calls", async () => { | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
await writeDataToStream(passThrough, [Buffer.from("foo")]); | ||
await sdkStream.transformToString(); | ||
await expectAllTransformsToFail(sdkStream); | ||
}); | ||
}); | ||
|
||
describe("transformToWebStream", () => { | ||
it("should throw if any event listener is attached on the underlying stream", async () => { | ||
passThrough.on("data", console.log); | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
try { | ||
sdkStream.transformToWebStream(); | ||
fail(new Error("expect web stream transformation to fail")); | ||
} catch (error) { | ||
expect(error.message).toContain("The stream has been consumed by other callbacks"); | ||
} | ||
}); | ||
|
||
describe("when Readable.toWeb() is not supported", () => { | ||
// @ts-expect-error | ||
const originalToWebImpl = Readable.toWeb; | ||
beforeAll(() => { | ||
// @ts-expect-error | ||
Readable.toWeb = undefined; | ||
}); | ||
afterAll(() => { | ||
// @ts-expect-error | ||
Readable.toWeb = originalToWebImpl; | ||
}); | ||
|
||
it("should throw", async () => { | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
try { | ||
sdkStream.transformToWebStream(); | ||
fail(new Error("expect web stream transformation to fail")); | ||
} catch (error) { | ||
expect(error.message).toContain("Readable.toWeb() is not supported"); | ||
} | ||
}); | ||
}); | ||
|
||
describe("when Readable.toWeb() is supported", () => { | ||
// @ts-expect-error | ||
const originalToWebImpl = Readable.toWeb; | ||
beforeAll(() => { | ||
// @ts-expect-error | ||
Readable.toWeb = jest.fn().mockReturnValue("A web stream"); | ||
}); | ||
|
||
afterAll(() => { | ||
// @ts-expect-error | ||
Readable.toWeb = originalToWebImpl; | ||
}); | ||
|
||
it("should tranform Node stream to web stream", async () => { | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
sdkStream.transformToWebStream(); | ||
// @ts-expect-error | ||
expect(Readable.toWeb).toBeCalled(); | ||
}); | ||
|
||
it("should fail any subsequent tranform calls", async () => { | ||
const sdkStream = sdkStreamMixin(passThrough); | ||
await writeDataToStream(passThrough, [Buffer.from("foo")]); | ||
await sdkStream.transformToWebStream(); | ||
await expectAllTransformsToFail(sdkStream); | ||
}); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import { streamCollector } from "@aws-sdk/node-http-handler"; | ||
import { SdkStream, SdkStreamMixin } from "@aws-sdk/types"; | ||
import { fromArrayBuffer } from "@aws-sdk/util-buffer-from"; | ||
import { Readable } from "stream"; | ||
|
||
const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed."; | ||
|
||
/** | ||
* The function that mixes in the utility functions to help consuming runtime-specific payload stream. | ||
* | ||
* @internal | ||
*/ | ||
export const sdkStreamMixin = (stream: unknown): SdkStream<Readable> => { | ||
if (!(stream instanceof Readable)) { | ||
// @ts-ignore | ||
const name = stream?.__proto__?.constructor?.name || stream; | ||
throw new Error(`Unexpected stream implementation, expect Stream.Readable instance, got ${name}`); | ||
} | ||
|
||
let transformed = false; | ||
const transformToByteArray = async () => { | ||
if (transformed) { | ||
throw new Error(ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED); | ||
} | ||
transformed = true; | ||
return await streamCollector(stream); | ||
}; | ||
|
||
return Object.assign<Readable, SdkStreamMixin>(stream, { | ||
transformToByteArray, | ||
transformToString: async (encoding?: string) => { | ||
const buf = await transformToByteArray(); | ||
return fromArrayBuffer(buf.buffer, buf.byteOffset, buf.byteLength).toString(encoding); | ||
}, | ||
transformToWebStream: () => { | ||
if (transformed) { | ||
throw new Error(ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED); | ||
} | ||
if (stream.readableFlowing !== null) { | ||
// Prevent side effect of consuming webstream. | ||
throw new Error("The stream has been consumed by other callbacks."); | ||
} | ||
// @ts-expect-error toWeb() is only available in Node.js >= 17.0.0 | ||
if (typeof Readable.toWeb !== "function") { | ||
throw new Error( | ||
"Readable.toWeb() is not supported. Please make sure you are using Node.js >= 17.0.0, or polyfill is available." | ||
); | ||
} | ||
transformed = true; | ||
// @ts-expect-error toWeb() is only available in Node.js >= 17.0.0 | ||
return Readable.toWeb(stream); | ||
}, | ||
}); | ||
}; |