Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hana): Remove encoding from hana-client streams #623

Merged
merged 5 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions hana/lib/drivers/hana-client.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const { Readable, Stream } = require('stream')

const hdb = require('@sap/hana-client')
const { StringDecoder } = require('string_decoder')
const { driver, prom, handleLevel } = require('./base')

const streamUnsafe = false
Expand Down Expand Up @@ -97,7 +96,7 @@ class HANAClientDriver extends driver {
row[col] = i > 3 ?
rs.isNull(i)
? null
: Readable.from(streamBlob(rsStreams, rs._rowPosition, i, 'binary'))
: Readable.from(streamBlob(rsStreams, rs._rowPosition, i), { objectMode: false })
: values[i]
}

Expand Down Expand Up @@ -153,7 +152,7 @@ class HANAClientDriver extends driver {
if (rs.getRowCount() === 0) return null
await prom(rs, 'next')()
if (rs.isNull(0)) return null
return Readable.from(streamBlob(rs, undefined, 0, 'binary'), { objectMode: false })
return Readable.from(streamBlob(rs, undefined, 0), { objectMode: false })
}
return Readable.from(rsIterator(rs, one), { objectMode: false })
}
Expand Down Expand Up @@ -217,7 +216,8 @@ class HANAClientDriver extends driver {
if (!curStream) continue
for await (const chunk of curStream) {
curStream.pause()
await sendParameterData(i, Buffer.from(chunk))
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)
if (buffer.length) await sendParameterData(i, buffer)
curStream.resume()
}
await sendParameterData(i, null)
Expand Down Expand Up @@ -293,7 +293,9 @@ async function* rsIterator(rs, one) {
yield buffer
buffer = ''

for await (const chunk of streamBlob(rs, undefined, columnIndex, 'base64', binaryBuffer)) {
const stream = Readable.from(streamBlob(rs, undefined, columnIndex, binaryBuffer), { objectMode: false })
stream.setEncoding('base64')
for await (const chunk of stream) {
yield chunk
}
buffer += '"'
Expand All @@ -316,7 +318,7 @@ async function* rsIterator(rs, one) {
yield buffer
}

async function* streamBlob(rs, rowIndex = -1, columnIndex, encoding, binaryBuffer = Buffer.allocUnsafe(1 << 16)) {
async function* streamBlob(rs, rowIndex = -1, columnIndex, binaryBuffer = Buffer.allocUnsafe(1 << 16)) {
const promChain = {
resolve: () => { },
reject: () => { }
Expand Down Expand Up @@ -357,29 +359,18 @@ async function* streamBlob(rs, rowIndex = -1, columnIndex, encoding, binaryBuffe

const getData = prom(rs, 'getData')

let decoder = new StringDecoder(encoding)

let blobPosition = 0

while (true) {
// REVISIT: Ensure that the data read is divisible by 3 as that allows for base64 encoding
let start = 0
const read = await getData(columnIndex, blobPosition, binaryBuffer, 0, binaryBuffer.byteLength)
if (blobPosition === 0 && binaryBuffer.slice(0, 7).toString() === 'base64,') {
decoder = {
write: encoding === 'base64' ? c => c : chunk => Buffer.from(chunk.toString(), 'base64'),
end: () => Buffer.allocUnsafe(0),
}
start = 7
}
blobPosition += read
if (read < binaryBuffer.byteLength) {
yield decoder.write(binaryBuffer.slice(start, read))
yield binaryBuffer.slice(0, read)
break
}
yield decoder.write(binaryBuffer.slice(start).toString('base64'))
yield binaryBuffer
}
yield decoder.end()
} catch (e) {
promChain.reject(e)
} finally {
Expand Down
11 changes: 11 additions & 0 deletions sqlite/test/general/stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,17 @@ describe('streaming', () => {
await checkSize(stream2_)
}))

test('WRITE stream property from READ stream', async () => cds.tx(async () => {
const { Images } = cds.entities('test')
const { data: stream } = await SELECT.one.from(Images).columns('data').where({ ID: 1 })

const changes = await UPDATE(Images).with({ data2: stream }).where({ ID: 3 })
expect(changes).toEqual(1)

const [{ data2: stream_ }] = await SELECT.from(Images).columns('data2').where({ ID: 3 })
await checkSize(stream_)
}))

test('WRITE multiple blob properties', async () => cds.tx(async () => {
const { Images } = cds.entities('test')
const blob1 = fs.readFileSync(path.join(__dirname, 'samples/test.jpg'))
Expand Down