Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support cross-db query in frontend and meta #20261

Merged
merged 23 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8a97831
feat(snapshot-backfill): implement executor to consume upstream table
wenym1 Jan 14, 2025
cd91f29
use box future to avoid cargo doc panic
wenym1 Jan 20, 2025
008d2e7
feat(snapshot-backfill): support cross db snapshot backfill
wenym1 Jan 18, 2025
5b12393
feat: support cross-db query in frontend and meta
yezizp2012 Jan 22, 2025
c96d15d
comment some assertion
yezizp2012 Jan 22, 2025
35c9b3f
comment check for consumed row progress check
yezizp2012 Feb 10, 2025
745894e
feat(snapshot-backfill): support cross db snapshot backfill
wenym1 Jan 18, 2025
59dc0ee
Merge branch 'yiming/cross-db-backfill-executor' into zp/cross-db-fro…
yezizp2012 Feb 11, 2025
7e6cc48
fix
wenym1 Feb 11, 2025
c0b6f2e
Merge branch 'yiming/cross-db-backfill-executor' into zp/cross-db-fro…
yezizp2012 Feb 11, 2025
2d75de0
fix finish condition and add debug log
wenym1 Feb 11, 2025
853d20d
Merge branch 'yiming/cross-db-backfill-executor' into zp/cross-db-fro…
yezizp2012 Feb 11, 2025
2e4342c
fix progress update and add some e2e
yezizp2012 Feb 11, 2025
64bc8fa
fix slt
yezizp2012 Feb 11, 2025
a8cf023
Merge branch 'main' into zp/cross-db-frontend
yezizp2012 Feb 12, 2025
952d515
fix
yezizp2012 Feb 12, 2025
e7e4df7
fix slt and address comments
yezizp2012 Feb 12, 2025
a1947b8
rename
yezizp2012 Feb 12, 2025
3c2a2a8
Merge branch 'main' into zp/cross-db-frontend
yezizp2012 Feb 12, 2025
a202989
add cross-db backfill check for drop subscription
yezizp2012 Feb 13, 2025
2169061
fix
yezizp2012 Feb 13, 2025
d68a365
Merge branch 'main' into zp/cross-db-frontend
yezizp2012 Feb 13, 2025
9779d78
comments
yezizp2012 Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions e2e_test/streaming/cross_db_mv.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create database d1;

statement ok
create database d2;

statement ok
set database to d1;

statement ok
create table t1(v1 int);

statement ok
insert into t1 select * from generate_series(1, 10000);

statement ok
set database to d2;

# requires log store to support cross-db mv
statement error
create materialized view mv1 as select * from d1.public.t1;

statement ok
set database to d1;

statement ok
create subscription sub from t1 with(retention = '1D');

statement ok
create subscription sub2 from t1 with(retention = '1D');

statement ok
set database to d2;

query ok
select count(*) from d1.public.t1;
----
10000

statement ok
create materialized view mv1 as select * from d1.public.t1;

query ok
select count(*) from mv1;
----
10000

statement ok
create table t2(v2 int);

statement ok
insert into t2 select * from generate_series(1, 100);

statement ok
create materialized view mv2 as select t2.v2 from d1.public.t1 join t2 on t1.v1 = t2.v2;

query ok
select count(*) from mv2;
----
100

statement ok
set database to d1;

# need to drop cross-db mv first
statement error
drop table t1 cascade;

statement ok
drop subscription sub2;

# requires at least one active subscription to maintain log store.
statement error
drop subscription sub;

statement ok
set database to d2;

statement ok
drop materialized view mv1;

statement ok
drop materialized view mv2;

statement ok
set database to d1;

statement ok
drop table t1 cascade;

statement ok
drop database d2;

connection other
statement ok
drop database d1;
2 changes: 1 addition & 1 deletion src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl Binder {
CteInner::ChangeLog(from_table_name) => {
self.push_context();
let from_table_relation =
self.bind_relation_by_name(from_table_name.clone(), None, None)?;
self.bind_relation_by_name(from_table_name.clone(), None, None, true)?;
self.pop_context()?;
self.context.cte_to_relation.insert(
table_name,
Expand Down
69 changes: 64 additions & 5 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,52 @@ impl Binder {
Ok((schema_name, name))
}

/// check whether the name is a cross-database reference
pub fn validate_cross_db_reference(
db_name: &str,
name: &ObjectName,
) -> std::result::Result<(), ResolveQualifiedNameError> {
let formatted_name = name.to_string();
let identifiers = &name.0;
if identifiers.len() > 3 {
return Err(ResolveQualifiedNameError::new(
formatted_name,
ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
));
}

if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
return Err(ResolveQualifiedNameError::new(
formatted_name,
ResolveQualifiedNameErrorKind::NotCurrentDatabase,
));
}

Ok(())
}

/// return (`database_name`, `schema_name`, `name`)
pub fn resolve_db_schema_qualified_name(
name: ObjectName,
) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
{
let formatted_name = name.to_string();
let mut identifiers = name.0;

if identifiers.len() > 3 {
return Err(ResolveQualifiedNameError::new(
formatted_name,
ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
));
}

let name = identifiers.pop().unwrap().real_value();
let schema_name = identifiers.pop().map(|ident| ident.real_value());
let database_name = identifiers.pop().map(|ident| ident.real_value());

Ok((database_name, schema_name, name))
}

/// return first name in identifiers, must have only one name.
fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
if identifiers.len() > 1 {
Expand Down Expand Up @@ -351,8 +397,15 @@ impl Binder {
name: ObjectName,
alias: Option<TableAlias>,
as_of: Option<AsOf>,
allow_cross_db: bool,
) -> Result<Relation> {
let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?;
let (db_name, schema_name, table_name) = if allow_cross_db {
Self::resolve_db_schema_qualified_name(name)?
} else {
let (schema_name, table_name) =
Self::resolve_schema_qualified_name(&self.db_name, name)?;
(None, schema_name, table_name)
};

if schema_name.is_none()
// the `table_name` here is the name of the currently binding cte.
Expand Down Expand Up @@ -422,7 +475,13 @@ impl Binder {
},
}
} else {
self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, as_of)
self.bind_relation_by_name_inner(
db_name.as_deref(),
schema_name.as_deref(),
&table_name,
alias,
as_of,
)
}
}

Expand All @@ -442,7 +501,7 @@ impl Binder {
}?;

Ok((
self.bind_relation_by_name(table_name.clone(), None, None)?,
self.bind_relation_by_name(table_name.clone(), None, None, true)?,
table_name,
))
}
Expand Down Expand Up @@ -489,13 +548,13 @@ impl Binder {
let schema = args.get(1).map(|arg| arg.to_string());

let table_name = self.catalog.get_table_name_by_id(table_id)?;
self.bind_relation_by_name_inner(schema.as_deref(), &table_name, alias, None)
self.bind_relation_by_name_inner(None, schema.as_deref(), &table_name, alias, None)
}

pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result<Relation> {
match table_factor {
TableFactor::Table { name, alias, as_of } => {
self.bind_relation_by_name(name, alias, as_of)
self.bind_relation_by_name(name, alias, as_of, true)
}
TableFactor::TableFunction {
name,
Expand Down
19 changes: 10 additions & 9 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl Binder {
/// Binds table or source, or logical view according to what we get from the catalog.
pub fn bind_relation_by_name_inner(
&mut self,
db_name: Option<&str>,
schema_name: Option<&str>,
table_name: &str,
alias: Option<TableAlias>,
Expand All @@ -94,17 +95,17 @@ impl Binder {
let (ret, columns) = {
match schema_name {
Some(schema_name) => {
let db_name = db_name.unwrap_or(&self.db_name);
let schema_path = SchemaPath::Name(schema_name);
if is_system_schema(schema_name) {
if let Ok(sys_table_catalog) = self.catalog.get_sys_table_by_name(
&self.db_name,
schema_name,
table_name,
) {
if let Ok(sys_table_catalog) =
self.catalog
.get_sys_table_by_name(db_name, schema_name, table_name)
{
resolve_sys_table_relation(sys_table_catalog)
} else if let Ok((view_catalog, _)) =
self.catalog
.get_view_by_name(&self.db_name, schema_path, table_name)
.get_view_by_name(db_name, schema_path, table_name)
{
self.resolve_view_relation(&view_catalog.clone())?
} else {
Expand All @@ -127,17 +128,17 @@ impl Binder {
self.resolve_source_relation(&source_catalog.clone(), as_of)
} else if let Ok((table_catalog, schema_name)) = self
.catalog
.get_created_table_by_name(&self.db_name, schema_path, table_name)
.get_created_table_by_name(db_name, schema_path, table_name)
{
self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
} else if let Ok((source_catalog, _)) =
self.catalog
.get_source_by_name(&self.db_name, schema_path, table_name)
.get_source_by_name(db_name, schema_path, table_name)
{
self.resolve_source_relation(&source_catalog.clone(), as_of)
} else if let Ok((view_catalog, _)) =
self.catalog
.get_view_by_name(&self.db_name, schema_path, table_name)
.get_view_by_name(db_name, schema_path, table_name)
{
self.resolve_view_relation(&view_catalog.clone())?
} else {
Expand Down
8 changes: 1 addition & 7 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,26 +677,20 @@ impl Catalog {

// Used by test_utils only.
pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
let (mut database_id, mut schema_id) = (0, 0);
let mut found = false;
for database in self.database_by_name.values() {
if !found {
for schema in database.iter_schemas() {
if schema.iter_user_table().any(|t| t.id() == *table_id) {
found = true;
database_id = database.id();
schema_id = schema.id();
break;
}
}
}
}

if found {
let mut table = self
.get_any_table_by_id(table_id)
.unwrap()
.to_prost(schema_id, database_id);
let mut table = self.get_any_table_by_id(table_id).unwrap().to_prost();
table.name = table_name.to_owned();
self.update_table(&table);
}
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use crate::user::UserId;
pub struct SourceCatalog {
pub id: SourceId,
pub name: String,
pub schema_id: SchemaId,
pub database_id: DatabaseId,
pub columns: Vec<ColumnCatalog>,
pub pk_col_ids: Vec<ColumnId>,
pub append_only: bool,
Expand Down Expand Up @@ -72,12 +74,12 @@ impl SourceCatalog {
.context("expecting exactly one statement in definition")?)
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbSource {
pub fn to_prost(&self) -> PbSource {
let (with_properties, secret_refs) = self.with_properties.clone().into_parts();
PbSource {
id: self.id,
schema_id,
database_id,
schema_id: self.schema_id,
database_id: self.database_id,
name: self.name.clone(),
row_id_index: self.row_id_index.map(|idx| idx as _),
columns: self.columns.iter().map(|c| c.to_protobuf()).collect(),
Expand Down Expand Up @@ -157,6 +159,8 @@ impl From<&PbSource> for SourceCatalog {
fn from(prost: &PbSource) -> Self {
let id = prost.id;
let name = prost.name.clone();
let database_id = prost.database_id;
let schema_id = prost.schema_id;
let prost_columns = prost.columns.clone();
let pk_col_ids = prost
.pk_column_ids
Expand Down Expand Up @@ -184,6 +188,8 @@ impl From<&PbSource> for SourceCatalog {
Self {
id,
name,
schema_id,
database_id,
columns,
pk_col_ids,
append_only,
Expand Down
Loading
Loading