Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): plan asof join #18683

Merged
merged 11 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dashboard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
node-version: 18
- uses: arduino/setup-protoc@v3
with:
version: "3.x"
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ mod stream_group_topn;
mod stream_hash_agg;
mod stream_hash_join;
mod stream_hop_window;
mod stream_join_common;
mod stream_local_approx_percentile;
mod stream_materialize;
mod stream_now;
Expand Down Expand Up @@ -1012,6 +1013,7 @@ pub use stream_group_topn::StreamGroupTopN;
pub use stream_hash_agg::StreamHashAgg;
pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
use stream_join_common::StreamJoinCommon;
pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
pub use stream_materialize::StreamMaterialize;
pub use stream_now::StreamNow;
Expand Down
73 changes: 18 additions & 55 deletions src/frontend/src/optimizer/plan_node/stream_asof_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::bail;
use risingwave_pb::expr::expr_node::PbType;
Expand All @@ -28,15 +27,16 @@ use super::stream::prelude::*;
use super::utils::{
childless_record, plan_node_name, watermark_pretty, Distill, TableCatalogBuilder,
};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamJoinCommon, StreamNode,
};
use crate::error::{ErrorCode, Result};
use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;
use crate::TableCatalog;

/// [`StreamAsOfJoin`] implements [`super::LogicalJoin`] with hash tables.
Expand All @@ -63,13 +63,19 @@ impl StreamAsOfJoin {
eq_join_predicate: EqJoinPredicate,
inequality_desc: AsOfJoinDesc,
) -> Self {
assert!(core.join_type == JoinType::AsofInner || core.join_type == JoinType::AsofLeftOuter);

// Inner join won't change the append-only behavior of the stream. The rest might.
let append_only = match core.join_type {
JoinType::Inner => core.left.append_only() && core.right.append_only(),
_ => false,
};

let dist = Self::derive_dist(core.left.distribution(), core.right.distribution(), &core);
let dist = StreamJoinCommon::derive_dist(
core.left.distribution(),
core.right.distribution(),
&core,
);

// TODO: derive watermarks
let watermark_columns = FixedBitSet::with_capacity(core.schema().len());
Expand Down Expand Up @@ -136,66 +142,23 @@ impl StreamAsOfJoin {
self.core.join_type
}

/// Get a reference to the batch hash join's eq join predicate.
/// Get a reference to the `AsOf` join's eq join predicate.
pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
&self.eq_join_predicate
}

pub(super) fn derive_dist(
left: &Distribution,
right: &Distribution,
logical: &generic::Join<PlanRef>,
) -> Distribution {
match (left, right) {
(Distribution::Single, Distribution::Single) => Distribution::Single,
(Distribution::HashShard(_), Distribution::HashShard(_)) => match logical.join_type {
JoinType::Unspecified
| JoinType::FullOuter
| JoinType::Inner
| JoinType::LeftOuter
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightOuter => unreachable!(),
JoinType::AsofInner | JoinType::AsofLeftOuter => {
let l2o = logical
.l2i_col_mapping()
.composite(&logical.i2o_col_mapping());
l2o.rewrite_provided_distribution(left)
}
},
(_, _) => unreachable!(
"suspicious distribution: left: {:?}, right: {:?}",
left, right
),
}
}

pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
let left_jk_indices = self.eq_join_predicate.left_eq_indexes();
let right_jk_indices = self.eq_join_predicate.right_eq_indexes();

assert_eq!(left_jk_indices.len(), right_jk_indices.len());

let mut dk_indices_in_jk = vec![];

for (l_dk_idx, r_dk_idx) in left_dk_indices.iter().zip_eq_fast(right_dk_indices.iter()) {
for dk_idx_in_jk in left_jk_indices.iter().positions(|idx| idx == l_dk_idx) {
if right_jk_indices[dk_idx_in_jk] == *r_dk_idx {
dk_indices_in_jk.push(dk_idx_in_jk);
break;
}
}
}

assert_eq!(dk_indices_in_jk.len(), left_dk_indices.len());
dk_indices_in_jk
StreamJoinCommon::get_dist_key_in_join_key(
&left_dk_indices,
&right_dk_indices,
self.eq_join_predicate(),
)
}

/// Return stream hash join internal table catalog.
/// Return stream asof join internal table catalog.
pub fn infer_internal_table_catalog<I: StreamPlanRef>(
input: I,
join_key_indices: Vec<usize>,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl StreamDeltaJoin {
}
}

/// Get a reference to the batch hash join's eq join predicate.
/// Get a reference to the delta hash join's eq join predicate.
pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
&self.eq_join_predicate
}
Expand Down
73 changes: 13 additions & 60 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DeltaExpression, HashJoinNode, PbInequalityPair};

use super::generic::Join;
use super::stream::prelude::*;
use super::stream_join_common::StreamJoinCommon;
use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode,
Expand All @@ -30,7 +30,7 @@ use crate::expr::{Expr, ExprDisplay, ExprRewriter, ExprVisitor, InequalityInputP
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;

Expand Down Expand Up @@ -72,7 +72,11 @@ impl StreamHashJoin {
_ => false,
};

let dist = Self::derive_dist(core.left.distribution(), core.right.distribution(), &core);
let dist = StreamJoinCommon::derive_dist(
core.left.distribution(),
core.right.distribution(),
&core,
);

let mut inequality_pairs = vec![];
let mut clean_left_state_conjunction_idx = None;
Expand Down Expand Up @@ -215,50 +219,11 @@ impl StreamHashJoin {
self.core.join_type
}

/// Get a reference to the batch hash join's eq join predicate.
/// Get a reference to the hash join's eq join predicate.
pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
&self.eq_join_predicate
}

pub(super) fn derive_dist(
left: &Distribution,
right: &Distribution,
logical: &generic::Join<PlanRef>,
) -> Distribution {
match (left, right) {
(Distribution::Single, Distribution::Single) => Distribution::Single,
(Distribution::HashShard(_), Distribution::HashShard(_)) => {
// we can not derive the hash distribution from the side where outer join can
// generate a NULL row
match logical.join_type {
JoinType::Unspecified | JoinType::AsofInner | JoinType::AsofLeftOuter => {
unreachable!()
}
JoinType::FullOuter => Distribution::SomeShard,
JoinType::Inner
| JoinType::LeftOuter
| JoinType::LeftSemi
| JoinType::LeftAnti => {
let l2o = logical
.l2i_col_mapping()
.composite(&logical.i2o_col_mapping());
l2o.rewrite_provided_distribution(left)
}
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightOuter => {
let r2o = logical
.r2i_col_mapping()
.composite(&logical.i2o_col_mapping());
r2o.rewrite_provided_distribution(right)
}
}
}
(_, _) => unreachable!(
"suspicious distribution: left: {:?}, right: {:?}",
left, right
),
}
}

/// Convert this hash join to a delta join plan
pub fn into_delta_join(self) -> StreamDeltaJoin {
StreamDeltaJoin::new(self.core, self.eq_join_predicate)
Expand All @@ -267,24 +232,12 @@ impl StreamHashJoin {
pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
let left_jk_indices = self.eq_join_predicate.left_eq_indexes();
let right_jk_indices = self.eq_join_predicate.right_eq_indexes();

assert_eq!(left_jk_indices.len(), right_jk_indices.len());

let mut dk_indices_in_jk = vec![];

for (l_dk_idx, r_dk_idx) in left_dk_indices.iter().zip_eq_fast(right_dk_indices.iter()) {
for dk_idx_in_jk in left_jk_indices.iter().positions(|idx| idx == l_dk_idx) {
if right_jk_indices[dk_idx_in_jk] == *r_dk_idx {
dk_indices_in_jk.push(dk_idx_in_jk);
break;
}
}
}

assert_eq!(dk_indices_in_jk.len(), left_dk_indices.len());
dk_indices_in_jk
StreamJoinCommon::get_dist_key_in_join_key(
&left_dk_indices,
&right_dk_indices,
self.eq_join_predicate(),
)
}

pub fn inequality_pairs(&self) -> &Vec<(bool, InequalityInputPair)> {
Expand Down
88 changes: 88 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_join_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::plan_common::JoinType;

use super::{generic, EqJoinPredicate};
use crate::optimizer::property::Distribution;
use crate::utils::ColIndexMappingRewriteExt;
use crate::PlanRef;

pub struct StreamJoinCommon;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh seems a module is enough.


impl StreamJoinCommon {
pub(super) fn get_dist_key_in_join_key(
left_dk_indices: &[usize],
right_dk_indices: &[usize],
eq_join_predicate: &EqJoinPredicate,
) -> Vec<usize> {
let left_jk_indices = eq_join_predicate.left_eq_indexes();
let right_jk_indices = &eq_join_predicate.right_eq_indexes();
assert_eq!(left_jk_indices.len(), right_jk_indices.len());
let mut dk_indices_in_jk = vec![];
for (l_dk_idx, r_dk_idx) in left_dk_indices.iter().zip_eq_fast(right_dk_indices.iter()) {
for dk_idx_in_jk in left_jk_indices.iter().positions(|idx| idx == l_dk_idx) {
if right_jk_indices[dk_idx_in_jk] == *r_dk_idx {
dk_indices_in_jk.push(dk_idx_in_jk);
break;
}
}
}
assert_eq!(dk_indices_in_jk.len(), left_dk_indices.len());
dk_indices_in_jk
}

pub(super) fn derive_dist(
left: &Distribution,
right: &Distribution,
logical: &generic::Join<PlanRef>,
) -> Distribution {
match (left, right) {
(Distribution::Single, Distribution::Single) => Distribution::Single,
(Distribution::HashShard(_), Distribution::HashShard(_)) => {
// we can not derive the hash distribution from the side where outer join can
// generate a NULL row
match logical.join_type {
JoinType::Unspecified => {
unreachable!()
}
JoinType::FullOuter => Distribution::SomeShard,
JoinType::Inner
| JoinType::LeftOuter
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::AsofInner
| JoinType::AsofLeftOuter => {
let l2o = logical
.l2i_col_mapping()
.composite(&logical.i2o_col_mapping());
l2o.rewrite_provided_distribution(left)
}
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightOuter => {
let r2o = logical
.r2i_col_mapping()
.composite(&logical.i2o_col_mapping());
r2o.rewrite_provided_distribution(right)
}
}
}
(_, _) => unreachable!(
"suspicious distribution: left: {:?}, right: {:?}",
left, right
),
}
}
}
Loading