From 4dab4cad2c3b9b165d6118636a179b5443e50442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Weslley=20Ara=C3=BAjo?= <46850407+wellwelwel@users.noreply.github.com> Date: Thu, 30 May 2024 15:48:32 -0300 Subject: [PATCH] fix(stream): reads should emit the dataset number for each dataset (#2628) * :bug: streaming reads should emit the dataset number for each dataset * :ok_hand: attend to PR commentary * :ok_hand: rename test fixture file, as per PR request * :rotating_light: run prettier -w on test-multi-result-streaming.test.cjs * :alembic: try latest mysql * :alembic: enable GHA for this branch, hopefully * :bug: no need to process.exit(0) in the test if the connection is properly destroyed * :rewind: revert addition of this branch as a trigger * :rewind: revert mysql version update * :rotating_light: pacify the linter * ci: debug multilpe stream test order --------- Co-authored-by: Davyd McColl --- lib/commands/query.js | 6 +- .../test-multi-result-streaming.test.cjs | 55 +++++++++++++++++++ 2 files changed, 58 insertions(+), 3 deletions(-) create mode 100644 test/integration/test-multi-result-streaming.test.cjs diff --git a/lib/commands/query.js b/lib/commands/query.js index 67386bbfba..e8cd215bb0 100644 --- a/lib/commands/query.js +++ b/lib/commands/query.js @@ -251,7 +251,7 @@ class Query extends Command { if (this.onResult) { this._rows[this._resultIndex].push(row); } else { - this.emit('result', row); + this.emit('result', row, this._resultIndex); } return Query.prototype.row; } @@ -268,11 +268,11 @@ class Query extends Command { stream._read = () => { this._connection && this._connection.resume(); }; - this.on('result', row => { + this.on('result', (row, resultSetIndex) => { if (!stream.push(row)) { this._connection.pause(); } - stream.emit('result', row); // replicate old emitter + stream.emit('result', row, resultSetIndex); // replicate old emitter }); this.on('error', err => { stream.emit('error', err); // Pass on any errors diff --git a/test/integration/test-multi-result-streaming.test.cjs b/test/integration/test-multi-result-streaming.test.cjs new file mode 100644 index 0000000000..81fc2dcc1f --- /dev/null +++ b/test/integration/test-multi-result-streaming.test.cjs @@ -0,0 +1,55 @@ +'use strict'; + +const { assert } = require('poku'); +const { createConnection } = require('../common.test.cjs'); + +(async () => { + const conn = createConnection({ multipleStatements: true }); + const captured1 = []; + const captured2 = []; + const sql1 = + 'select * from information_schema.columns order by table_schema, table_name, column_name limit 1;'; + const sql2 = + 'select * from information_schema.columns order by table_schema, table_name, ordinal_position limit 1;'; + + await conn.promise().query('set global max_allowed_packet=524288000'); + + const compare1 = await conn.promise().query(sql1); + const compare2 = await conn.promise().query(sql2); + + if (!compare1 || compare1.length < 1) { + assert.fail('no results for comparison 1'); + } + if (!compare2 || compare2.length < 1) { + assert.fail('no results for comparison 2'); + } + + const stream = conn.query(`${sql1}\n${sql2}`).stream(); + stream.on('result', (row, datasetIndex) => { + if (datasetIndex === 0) { + captured1.push(row); + } else { + captured2.push(row); + } + }); + // note: this is very important: + // after each result set is complete, + // the stream will emit "readable" and if we don't + // read then 'end' won't be emitted and the + // test will hang. + stream.on('readable', () => { + stream.read(); + }); + + await new Promise((resolve, reject) => { + stream.on('error', (e) => reject(e)); + stream.on('end', () => resolve()); + }); + + assert.equal(captured1.length, 1); + assert.equal(captured2.length, 1); + assert.deepEqual(captured1[0], compare1[0][0]); + assert.deepEqual(captured2[0], compare2[0][0]); + + conn.end(); +})();