diff --git a/src/server/event-stream.test.ts b/src/server/event-stream.test.ts index 8d78b2a..0285656 100644 --- a/src/server/event-stream.test.ts +++ b/src/server/event-stream.test.ts @@ -54,6 +54,19 @@ describe(eventStream.name, () => { expect(done).toBe(true); }); + test("accepts custom queuing strategy", () => { + expect(() => + eventStream( + new AbortController().signal, + (_, abort) => { + return () => abort(); + }, + undefined, + new CountQueuingStrategy({ highWaterMark: 2 }), + ), + ).not.toThrow(); + }); + describe("Headers Overrides", () => { test("overrrides Content-Type header", () => { // biome-ignore lint/suspicious/noEmptyBlockStatements: Test diff --git a/src/server/event-stream.ts b/src/server/event-stream.ts index 7cce44e..fa05e9b 100644 --- a/src/server/event-stream.ts +++ b/src/server/event-stream.ts @@ -27,41 +27,45 @@ export function eventStream( signal: AbortSignal, init: InitFunction, options: ResponseInit = {}, + strategy?: QueuingStrategy, ) { - let stream = new ReadableStream({ - start(controller) { - let encoder = new TextEncoder(); - let closed = false; + let stream = new ReadableStream( + { + start(controller) { + let encoder = new TextEncoder(); + let closed = false; - function send({ event = "message", data }: SendFunctionArgs) { - if (closed) return; // If already closed, not enqueue anything - controller.enqueue(encoder.encode(`event: ${event}\n`)); - - if (closed) return; // If already closed, not enqueue anything + function send({ event = "message", data }: SendFunctionArgs) { + if (closed) return; // If already closed, not enqueue anything + controller.enqueue(encoder.encode(`event: ${event}\n`)); - data.split("\n").forEach((line, index, array) => { if (closed) return; // If already closed, not enqueue anything - let value = `data: ${line}\n`; - if (index === array.length - 1) value += "\n"; - controller.enqueue(encoder.encode(value)); - }); - } - let cleanup = init(send, close); + data.split("\n").forEach((line, index, array) => { + if (closed) return; // If already closed, not enqueue anything + let value = `data: ${line}\n`; + if (index === array.length - 1) value += "\n"; + controller.enqueue(encoder.encode(value)); + }); + } + + let cleanup = init(send, close); - function close() { - if (closed) return; - cleanup(); - closed = true; - signal.removeEventListener("abort", close); - controller.close(); - } + function close() { + if (closed) return; + cleanup(); + closed = true; + signal.removeEventListener("abort", close); + controller.close(); + } - signal.addEventListener("abort", close); + signal.addEventListener("abort", close); - if (signal.aborted) return close(); + if (signal.aborted) return close(); + }, }, - }); + strategy, + ); let headers = new Headers(options.headers);