-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-45611: [C++][Acero] Improve Swiss join build performance by partitioning batches ahead to reduce contention #45612
base: main
Are you sure you want to change the base?
Conversation
|
281ca6d
to
30f6adf
Compare
|
3 similar comments
|
|
|
I'm pretty excited by this optimization. @pitrou @westonpace would you mind to take a look? Thanks. |
Does this also remove the spin locks? |
std::vector<uint32_t> hashes; | ||
std::vector<uint16_t> prtn_ranges; | ||
std::vector<uint16_t> prtn_row_ids; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these one value per row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is what I meant in the Overhead section of the PR description (quoted below).
... and worsen the memory profile by 6 bytes per row (4 bytes for hash and 2 bytes for row id in partition).
Some more details you may also want to know:
- The
prtn_ranges
is one element per partition. - This
BatchState
struct is per batch.
Both are less space complexity.
Besides the two comments above, I don't feel competent to review this, sorry. |
This removes the spin lock usage in Swiss join build, but the spin lock itself, namely
No problem, I appreciate the comments anyway! |
@@ -1112,7 +1113,7 @@ Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t | |||
|
|||
// Make sure that we do not use many partitions if there are not enough rows. | |||
// | |||
constexpr int64_t min_num_rows_per_prtn = 1 << 18; | |||
constexpr int64_t min_num_rows_per_prtn = 1 << 12; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the contention is eliminated, we can be a little more aggressive on the parallelism.
Rationale for this change
High contention is observed in Swiss join build phase as showed in #45611 .
A little background about the contention. To build the hash table in parallel, we first build
N
partitioned hash tables (the "build" stage), then merge them together into the final hash table (the "merge" stage, less interesting in this PR). In the build stage, each one of the exec batches from the build side table is distributed to one of theM
threads. Each such thread processes each one of the assigned batches by:N
partitions;N
partitions into the corresponding one of theN
partitioned hash tables.Because each batch contains arbitrary data, all
M
threads will write to allN
partitioned hash tables simultaneously. So we use (spin) locks on these partitioned hash tables, thus the contention.What changes are included in this PR?
Instead of all
M
threads writing to allN
partitioned hash tables simultaneously, we can further split the build stage into two:M
threads, each only partitions the batches and preserves the partition info of each batch;N
threads, each builds one of theN
partitioned hash tables. Every thread will iterate all the batches and only insert the belonging rows of the batch into its assigned hash table.Performance
Take this benchmark, which is dedicated for the performance of parallel build, the result shows by eliminating the contention, we can achieve up to 10x (on Arm) and 5x (on Intel) performance boost for Swiss join build. I picked
krows=64
andkrows=512
and made a chart.Note the single thread performance is actually down a little bit (reasons detailed later). But IMO this is quite trivial compared to the total win of multi-threaded cases.
Detailed benchmark numbers (on Arm) follow.
Benchmark After (Click to expand)
Benchmark After (Click to expand)
Overhead
This change introduces some overhead indeed. First, in the old implementation, the partition info is used right way after partitioning the batch, whereas the new implementation preserves the partition info and uses it in the next stage (potentially by other thread). This may be less cache friendly. Second, preserving the the partition info requires more memory: the increased allocation may hurt performance a bit, and worsen the memory profile by 6 bytes per row (4 bytes for hash and 2 bytes for row id in partition).
But as mentioned above, almost all multi-threaded cases are winning. Even nicer, the increased memory profile spans only a short period and doesn't really increase the peak memory: the peak moment always comes in the merge stage, and by that time, the preserved partition info for all batches are released already. This is verified by printing the memory pool stats when benchmarking in my local.
Are these changes tested?
Yes. Existing tests suffice.
Are there any user-facing changes?
None.
This PR includes breaking changes to public APIs. (If there are any breaking changes to public APIs, please explain which changes are breaking. If not, you can remove this.)
This PR contains a "Critical Fix". (If the changes fix either (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld), please provide explanation. If not, you can remove this.)