-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(7181): Encapsulate cursor handling within the SortOrderBuilder. #7842
Conversation
* CursorStream is a crate pub type * BatchCursor is now in a batch submod * Add the cascade merge skeleton, which is currently a multithread-safe wrapper around a single merge node
…cur at cascade root
…node as only the loser tree * This sets up the SortOrderBuilder as owning (and slicing) the cursors.
match slot.as_mut() { | ||
Some(c) => { | ||
if c.cursor.is_finished() { | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is slightly different from the code removed from the merge node, which on finish sets *slot = None
.
Instead, we do not dump the batch_cursor and instead return false prior to advancement => which results in the same boolean returned on the next time advance_cursor()
is checked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this means that the lifetime/cycle of a self.cursors[stream_idx]
slot is:
- Option == None. For that stream_idx, no ongoing cursor exists yet.
- Some(cursor) after
push_batch()
. - several
push_row()
- merge/loser tree node checks that cursor is finished
!cursor_in_progress()
- polls for next CursorValues
push_batch()
- push_batch saves the completed BatchCursor (to
sorted_batches
) and adds the new BatchCursor.
After the next PR, we will then see:
SortOrderBuilder::build_batch()
will change toSortOrderBuilder::yield_sort_order()
- on yield, it will do:
- fully yielded BatchCursors will have a new (at the start) Cursor with the full CursorValues
- fully retained BatchCursors will have no change. (remain in ongoing
self.cursors[stream_idx]
) - partial yielded BatchCursor:
- will slice the underlying CursorValues and have new cursors in both (sliced) parts
@@ -42,15 +37,15 @@ pub struct BatchBuilder { | |||
reservation: MemoryReservation, | |||
|
|||
/// The current [`BatchCursor`] for each stream | |||
cursors: Vec<BatchCursor>, | |||
cursors: Vec<Option<BatchCursor<C>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is indexed per stream. As such, we don't always have an ongoing cursor per each stream => hence the option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this structure now does some of the same things as SortPreservingMergeStream
: https://github.com/apache/arrow-datafusion/blob/37d6bf08c948418fe6c72d072d988c2875d81e02/datafusion/physical-plan/src/sorts/merge.rs#L210-L222
Is there any way to avoid a second copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no second copy. (That one is deleted.)
All of the cursor management/ownership is moved into the SortOrderBuilder, and removed from the SortPreservingMergeStream. The division of concerns is:
- SortPreservingMergeStream == only care about merging (via loser tree)
- SortOrderBuilder == holds the cursor (being advanced), and the sort_order (being built).
This clearly demonstrates why I need to add more docs to the BatchCursor. 😅 Doing so.
pub(crate) struct BatchCursor<C: CursorValues> { | ||
/// The index into SortOrderBuilder::batches | ||
/// TODO: this will become a BatchId, for record batch collected (and not passed across streams) | ||
pub batch_idx: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the BatchId is required for the batch tracking. Therefore, we have to yield something (beyond just the CursorValues) which contains this tracking id.
I decided to have the same abstraction (a.k.a. BatchCursor) be used for both batches currently being sorted (in a merge/loser tree), and the batches being yielded to the next merge/loser tree node (in the cascade tree). But we could revisit this design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent a good while studying this code this morning. Thank you @wiedld
The code on main I think looks something like:
- Original
RecordBatch
es with input rows are stored in byBatchBuilder
- The
Cursors
(notBatchCursor
!) are stored inSortPreservingMergeStream
- The
SortPreservingMergeStream
manages the cursors and theBatchBuilder
manages creating output rows.
This PR somewhat combines these concerns: SortOrderBuilder
now has Cursors
(but they are also still in the SortPreservingMergeStream
).
I think I understand the need to keep the RecordBatch alongside the cursors as they pass through the merge tree. However keeping two (possibly more) structures with the same RecordBatch
synchronized during execution seems like it will be fraught with potential hard to debug errors.
Rather than duplicate the logic, would it be possible to transfer the RecordBatch ownership entirely to the Cursor
s? The BatchBuilder would still need to track the output being built (batch_id, row_idx)
but the batches could come from the cursors themselves 🤔
Musings
As I was reading this PR I was reflecting on why it has taken so long to review. One reason I think is that I don't have a high level understanding of the new concepts that are being introduced (e.g. a BatchCursor
) and how they interact in the new design. Thus while reviewing this PR I am both trying to understand the code, but also reverse engineer the larger design.
For example, it took a while to understand the role a BatchCursor
is supposed to play (I can read the code and see what it is doing but not the why it is doing do). To your credit there is some version of it here https://github.com/apache/arrow-datafusion/pull/7379/files#diff-f97f96eb27e6344b0d9de91d7eceb98f2cc4f2099843673270d3698ebbbad4abR43-R97 but I am still struggling to understand it for some reason (maybe the diagram could include the new structures and how they are related 🤔 I am just brainstorming)
pub batch_idx: usize, | ||
|
||
/// The row index within the given batch | ||
pub row_idx: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the Cursor
already has a row offset, why is there a another row_idx
index in BatchCursor
? Is it the same or does it potentially point at a different offset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is existing tech debt, as the row_idx
already exists (on main) in order to handle cursor advancement beyond row_idx.
This property will be removed in future PRs. I added a code comment to reflect this^^.
use super::cursor::{Cursor, CursorValues}; | ||
|
||
#[derive(Debug)] | ||
pub(crate) struct BatchCursor<C: CursorValues> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can add a doc comment here explaining what this structure is for. It seems like the core function is to hold the entire original RecordBatch
that the input rows came from (rather than just the columns that are part of the sort key).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It holds partial record batches, once the slicing occurs in the future.
Per your excellent suggestion, I've added documentation to make apparent the goals of each structure. Hopefully, this helps for design discussions.
@@ -42,15 +37,15 @@ pub struct BatchBuilder { | |||
reservation: MemoryReservation, | |||
|
|||
/// The current [`BatchCursor`] for each stream | |||
cursors: Vec<BatchCursor>, | |||
cursors: Vec<Option<BatchCursor<C>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this structure now does some of the same things as SortPreservingMergeStream
: https://github.com/apache/arrow-datafusion/blob/37d6bf08c948418fe6c72d072d988c2875d81e02/datafusion/physical-plan/src/sorts/merge.rs#L210-L222
Is there any way to avoid a second copy?
50233de
to
96f72d9
Compare
…nted yield_sort_order()
96f72d9
to
222c12a
Compare
/// | ||
#[allow(dead_code)] | ||
pub fn yield_sort_order(&mut self) -> Result<Option<YieldedSortOrder<C>>> { | ||
unimplemented!("to implement in future PR"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By having the SortOrderBuilder take full ownership of the cursors, then we can have all batch partials (and awareness of partials) be handled in the SortOrderBuilder. This yield_sort_order()
will have an implementation similar to this.
The loser/merge tree will only interface with the cursor through the SortOrderBuilder (e.g.push_batch()
, push_row()
, advance_cursor()
). Please let me know if there is a better design option. 😄
|
||
/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`] | ||
#[derive(Debug)] | ||
pub struct BatchBuilder { | ||
pub struct SortOrderBuilder<C: CursorValues> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the eventual plan to remove batches
from this? It seems a little peculiar as a construction as it stands
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. I just pushed a commit with documentation -- to show where this was going.
I was trying to break up the change into 2 parts (see PR description). That decision seems to have created confusion. 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should instead be called CursorInterleave or something, as I believe that is the operation it is actually performing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That decision seems to have created confusion
Yeah perhaps you could roll that into this PR, whilst it will be larger, I think it will be easier to review
@@ -48,6 +48,10 @@ pub trait PartitionedStream: std::fmt::Debug + Send { | |||
) -> Poll<Option<Self::Output>>; | |||
} | |||
|
|||
/// A fallible [`PartitionedStream`] of [`Cursor`](super::cursor::Cursor) and [`RecordBatch`] | |||
pub(crate) type CursorStream<C> = | |||
Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea this will eventually just be Box<dyn PartitionedStream<Output = Result<C>>>
with the RecordBatch
handled separately?
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ | ||
/// │ | ||
/// ▼ | ||
/// BatchCursors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure of a better name, but to me BatchCursor
would suggest this construction owns a RecordBatch
and is a cursor over it, which isn't the case. Perhaps we might also be able to work the slice aspect in there, as I think that is what they are.
Perhaps we could rename BatchCursors
to SortOrder
or MergedCursors
and have that as the abstraction? Or is per-slice access required somehow?
By trying to break up the latest chunk into 2 PRs, I instead added to the confusion. |
Thank you @wiedld -- sorry for the conflicting advice. I do feel like we are making progress though |
Closing, as will eventually be replaced by this larger PR + lots of ascii diagrams. |
Part of #7181
Rationale for this change
This is the 1st half of the next change, as we start building I/O streams for the merge node.
Goal in this PR to have a separation of concerns:
SortPreservingMergeStream
)Goal in next PR:
What changes are included in this PR?
BatchBuilder
=>SortOrderBuilder
.sorts/batches
.BatchCursor
, which will (in next PR) contain the unique BatchId and be yielded per merge node.BatchTracker
which collects the record batches and assigns a unique BatchId, such that the cascading streams only pass around cursors.SortPreservingCascadeStream
. Does nothing yet.Are these changes tested?
Passing sort tests.
Let me know if any additional tests should be added.
Are there any user-facing changes?
No.