Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: optimize sse chunk reading off-by-one error #1339

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/internal/decoders/line.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,34 @@ function findNewlineIndex(

return null;
}

export function findDoubleNewlineIndex(buffer: Uint8Array): number {
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
// and returns the index right after the first occurrence of any pattern,
// or -1 if none of the patterns are found.
const newline = 0x0a; // \n
const carriage = 0x0d; // \r

for (let i = 0; i < buffer.length - 1; i++) {
if (buffer[i] === newline && buffer[i + 1] === newline) {
// \n\n
return i + 2;
}
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
// \r\r
return i + 2;
}
if (
buffer[i] === carriage &&
buffer[i + 1] === newline &&
i + 3 < buffer.length &&
buffer[i + 2] === carriage &&
buffer[i + 3] === newline
) {
// \r\n\r\n
return i + 4;
}
}

return -1;
}
48 changes: 1 addition & 47 deletions src/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ReadableStream, type Response } from './_shims/index';
import { OpenAIError } from './error';
import { LineDecoder } from './internal/decoders/line';
import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line';
import { ReadableStreamToAsyncIterable } from './internal/stream-utils';

import { APIError } from './error';
Expand Down Expand Up @@ -243,37 +243,6 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator<Bytes>): AsyncGene
}
}

function findDoubleNewlineIndex(buffer: Uint8Array): number {
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
// and returns the index right after the first occurrence of any pattern,
// or -1 if none of the patterns are found.
const newline = 0x0a; // \n
const carriage = 0x0d; // \r

for (let i = 0; i < buffer.length - 2; i++) {
if (buffer[i] === newline && buffer[i + 1] === newline) {
// \n\n
return i + 2;
}
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
// \r\r
return i + 2;
}
if (
buffer[i] === carriage &&
buffer[i + 1] === newline &&
i + 3 < buffer.length &&
buffer[i + 2] === carriage &&
buffer[i + 3] === newline
) {
// \r\n\r\n
return i + 4;
}
}

return -1;
}

class SSEDecoder {
private data: string[];
private event: string | null;
Expand Down Expand Up @@ -329,21 +298,6 @@ class SSEDecoder {
}
}

/** This is an internal helper function that's just used for testing */
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
const decoder = new LineDecoder();
const lines: string[] = [];
for (const chunk of chunks) {
lines.push(...decoder.decode(chunk));
}

if (flush) {
lines.push(...decoder.flush());
}

return lines;
}

function partition(str: string, delimiter: string): [string, string, string] {
const index = str.indexOf(delimiter);
if (index !== -1) {
Expand Down
128 changes: 128 additions & 0 deletions tests/internal/decoders/line.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { findDoubleNewlineIndex, LineDecoder } from 'openai/internal/decoders/line';

function decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
const decoder = new LineDecoder();
const lines: string[] = [];
for (const chunk of chunks) {
lines.push(...decoder.decode(chunk));
}

if (flush) {
lines.push(...decoder.flush());
}

return lines;
}

describe('line decoder', () => {
test('basic', () => {
// baz is not included because the line hasn't ended yet
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
});

test('basic with \\r', () => {
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
});

test('trailing new lines', () => {
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
});

test('trailing new lines with \\r', () => {
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
});

test('escaped new lines', () => {
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
});

test('escaped new lines with \\r', () => {
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
});

test('\\r & \\n split across multiple chunks', () => {
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
});

test('single \\r', () => {
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
});

test('double \\r', () => {
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
});

test('double \\r then \\r\\n', () => {
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
});

test('double newline', () => {
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
});

test('multi-byte characters across chunks', () => {
const decoder = new LineDecoder();

// bytes taken from the string 'известни' and arbitrarily split
// so that some multi-byte characters span multiple chunks
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
expect(
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
).toHaveLength(0);

const decoded = decoder.decode(new Uint8Array([0xa]));
expect(decoded).toEqual(['известни']);
});

test('flushing trailing newlines', () => {
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
});

test('flushing empty buffer', () => {
expect(decodeChunks([], { flush: true })).toEqual([]);
});
});

describe('findDoubleNewlineIndex', () => {
test('finds \\n\\n', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\nbar'))).toBe(5);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\nbar'))).toBe(2);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\n'))).toBe(5);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\n'))).toBe(2);
});

test('finds \\r\\r', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\rbar'))).toBe(5);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\rbar'))).toBe(2);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\r'))).toBe(5);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\r'))).toBe(2);
});

test('finds \\r\\n\\r\\n', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\nbar'))).toBe(7);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\nbar'))).toBe(4);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\n'))).toBe(7);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\n'))).toBe(4);
});

test('returns -1 when no double newline found', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\nbar'))).toBe(-1);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\rbar'))).toBe(-1);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\nbar'))).toBe(-1);
expect(findDoubleNewlineIndex(new TextEncoder().encode(''))).toBe(-1);
});

test('handles incomplete patterns', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r'))).toBe(-1);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n'))).toBe(-1);
});
});
81 changes: 1 addition & 80 deletions tests/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,7 @@
import { Response } from 'node-fetch';
import { PassThrough } from 'stream';
import assert from 'assert';
import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'openai/streaming';
import { LineDecoder } from 'openai/internal/decoders/line';

describe('line decoder', () => {
test('basic', () => {
// baz is not included because the line hasn't ended yet
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
});

test('basic with \\r', () => {
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
});

test('trailing new lines', () => {
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
});

test('trailing new lines with \\r', () => {
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
});

test('escaped new lines', () => {
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
});

test('escaped new lines with \\r', () => {
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
});

test('\\r & \\n split across multiple chunks', () => {
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
});

test('single \\r', () => {
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
});

test('double \\r', () => {
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
});

test('double \\r then \\r\\n', () => {
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
});

test('double newline', () => {
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
});

test('multi-byte characters across chunks', () => {
const decoder = new LineDecoder();

// bytes taken from the string 'известни' and arbitrarily split
// so that some multi-byte characters span multiple chunks
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
expect(
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
).toHaveLength(0);

const decoded = decoder.decode(new Uint8Array([0xa]));
expect(decoded).toEqual(['известни']);
});

test('flushing trailing newlines', () => {
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
});

test('flushing empty buffer', () => {
expect(decodeChunks([], { flush: true })).toEqual([]);
});
});
import { _iterSSEMessages } from 'openai/streaming';

describe('streaming decoding', () => {
test('basic', async () => {
Expand Down