Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tvf with cdc source #20439

Merged
merged 8 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions e2e_test/source_inline/kafka/handling_mode.slt
Original file line number Diff line number Diff line change
Expand Up @@ -78,47 +78,47 @@ format debezium encode json (timestamptz.handling.mode = 'milli');

sleep 2s

query TT
query TT retry 3 backoff 5s
select "case", (payload).after.at from plain_guess order by 1;
----
0 number small 1970-01-01 00:01:40+00:00
1 number recent 2024-04-11 02:00:00.123456+00:00
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

query TT
query TT retry 3 backoff 5s
select "case", (payload).after.at from plain_milli order by 1;
----
0 number small 1970-01-01 00:00:00.100+00:00
1 number recent 56246-07-01 08:02:03.456+00:00
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

query TT
query TT retry 3 backoff 5s
select "case", (payload).after.at from plain_micro order by 1;
----
0 number small 1970-01-01 00:00:00.000100+00:00
1 number recent 2024-04-11 02:00:00.123456+00:00
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

query TT
query TT retry 3 backoff 5s
select "case", (payload).after.at from plain_utc order by 1;
----
0 number small NULL
1 number recent NULL
2 string utc 2024-04-11 02:00:00.654321+00:00
3 string naive NULL

query TT
query TT retry 3 backoff 5s
select "case", (payload).after.at from plain_naive order by 1;
----
0 number small NULL
1 number recent NULL
2 string utc NULL
3 string naive 2024-04-11 02:00:00.234321+00:00

query TT
query TT retry 3 backoff 5s
select "case", at from debezium_milli order by 1;
----
0 number small 1970-01-01 00:00:00.100+00:00
Expand Down
17 changes: 16 additions & 1 deletion e2e_test/source_inline/tvf/mysql_query.slt
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,24 @@ INSERT INTO test SELECT
null as v23;
"

statement ok
create source mysql_cdc_source with (
${RISEDEV_MYSQL_WITH_OPTIONS_COMMON},
username = '$RISEDEV_MYSQL_USER',
password = '$MYSQL_PWD',
database.name = 'tvf',
);

query
select * from mysql_query('$MYSQL_HOST', '$MYSQL_TCP_PORT', '$RISEDEV_MYSQL_USER', '$MYSQL_PWD', 'tvf', 'select * from test;');
----
1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL

query
select * from mysql_query('mysql_cdc_source', 'select * from test;');
----
1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL


system ok
mysql -e "
Expand Down Expand Up @@ -161,4 +173,7 @@ system ok
mysql -e "
USE tvf;
DROP DATABASE tvf;
"
"

statement ok
drop source mysql_cdc_source;
27 changes: 27 additions & 0 deletions e2e_test/source_inline/tvf/postgres_query.slt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ PGDATABASE=source_test psql -c "
INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00 pst', 'text', 'varchar', '1 day', '{}', '\x01';
"

statement ok
create source postgres_cdc_source with (
${RISEDEV_POSTGRES_WITH_OPTIONS_COMMON},
username = '$PGUSER',
password = '$PGPASSWORD',
database.name = 'source_test'
);

sleep 1s

query II
select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '$PGPASSWORD', 'source_test', 'select * from test where id > 90;');
----
Expand All @@ -43,3 +53,20 @@ select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '$PGPASSWORD', 'so
98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01

query II
select * from postgres_query('postgres_cdc_source', 'select * from test where id > 90;');
----
91 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
92 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
93 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
94 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
95 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
96 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
97 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01

statement ok
drop source postgres_cdc_source;
22 changes: 16 additions & 6 deletions src/frontend/src/binder/expr/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,14 @@ impl Binder {
"`VARIADIC` is not allowed in table function call"
);
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_postgres_query(args)
.context("postgres_query error")?
.into());
return Ok(TableFunction::new_postgres_query(
&self.catalog,
&self.db_name,
self.bind_schema_path(schema_name.as_deref()),
args,
)
.context("postgres_query error")?
.into());
}
// `mysql_query` table function
if func_name.eq("mysql_query") {
Expand All @@ -328,9 +333,14 @@ impl Binder {
"`VARIADIC` is not allowed in table function call"
);
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_mysql_query(args)
.context("mysql_query error")?
.into());
return Ok(TableFunction::new_mysql_query(
&self.catalog,
&self.db_name,
self.bind_schema_path(schema_name.as_deref()),
args,
)
.context("mysql_query error")?
.into());
}
// UDTF
if let Some(ref udf) = udf
Expand Down
182 changes: 109 additions & 73 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use itertools::Itertools;
use mysql_async::consts::ColumnType as MySqlColumnType;
use mysql_async::prelude::*;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::{DataType, ScalarImpl, StructType};
use risingwave_connector::source::iceberg::{
extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_azblob_operator,
Expand All @@ -30,10 +31,15 @@ use thiserror_ext::AsReport;
use tokio_postgres::types::Type as TokioPgType;

use super::{infer_type, Expr, ExprImpl, ExprRewriter, Literal, RwResult};
use crate::catalog::catalog_service::CatalogReadGuard;
use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind};
use crate::catalog::root_catalog::SchemaPath;
use crate::error::ErrorCode::BindError;
use crate::utils::FRONTEND_RUNTIME;

const INLINE_ARG_LEN: usize = 6;
const CDC_SOURCE_ARG_LEN: usize = 2;

/// A table function takes a row as input and returns a table. It is also known as Set-Returning
/// Function.
///
Expand Down Expand Up @@ -291,45 +297,76 @@ impl TableFunction {
})
}

pub fn new_postgres_query(args: Vec<ExprImpl>) -> RwResult<Self> {
let args = {
if args.len() != 6 {
return Err(BindError("postgres_query function only accepts 6 arguments: postgres_query(hostname varchar, port varchar, username varchar, password varchar, database_name varchar, postgres_query varchar)".to_owned()).into());
}
let mut cast_args = Vec::with_capacity(6);
for arg in args {
let arg = arg.cast_implicit(DataType::Varchar)?;
cast_args.push(arg);
fn handle_postgres_or_mysql_query_args(
catalog_reader: &CatalogReadGuard,
db_name: &str,
schema_path: SchemaPath<'_>,
args: Vec<ExprImpl>,
expect_connector_name: &str,
) -> RwResult<Vec<ExprImpl>> {
let cast_args = match args.len() {
INLINE_ARG_LEN => {
let mut cast_args = Vec::with_capacity(INLINE_ARG_LEN);
for arg in args {
let arg = arg.cast_implicit(DataType::Varchar)?;
cast_args.push(arg);
}
cast_args
}
cast_args
};
let evaled_args = {
let mut evaled_args: Vec<String> = Vec::with_capacity(6);
for arg in &args {
match arg.try_fold_const() {
Some(Ok(value)) => {
let Some(scalar) = value else {
return Err(BindError(
"postgres_query function does not accept null arguments".to_owned(),
)
.into());
};
evaled_args.push(scalar.into_utf8().into());
}
Some(Err(err)) => {
return Err(err);
}
None => {
return Err(BindError(
"postgres_query function only accepts constant arguments".to_owned(),
)
.into());
}
CDC_SOURCE_ARG_LEN => {
let source_name = expr_impl_to_string_fn(&args[0])?;
let source_catalog = catalog_reader
.get_source_by_name(db_name, schema_path, &source_name)?
.0;
if !source_catalog
.connector_name()
.eq_ignore_ascii_case(expect_connector_name)
{
return Err(BindError(format!("TVF function only accepts `mysql-cdc` and `postgres-cdc` source. Expected: {}, but got: {}", expect_connector_name, source_catalog.connector_name())).into());
}

let (props, secret_refs) = source_catalog.with_properties.clone().into_parts();
let secret_resolved =
LocalSecretManager::global().fill_secrets(props, secret_refs)?;

vec![
ExprImpl::literal_varchar(secret_resolved["hostname"].clone()),
ExprImpl::literal_varchar(secret_resolved["port"].clone()),
ExprImpl::literal_varchar(secret_resolved["username"].clone()),
ExprImpl::literal_varchar(secret_resolved["password"].clone()),
ExprImpl::literal_varchar(secret_resolved["database.name"].clone()),
args.get(1)
.unwrap()
.clone()
.cast_implicit(DataType::Varchar)?,
]
}
_ => {
return Err(BindError("postgres_query function and mysql_query function accept either 2 arguments: (cdc_source_name varchar, query varchar) or 6 arguments: (hostname varchar, port varchar, username varchar, password varchar, database_name varchar, query varchar)".to_owned()).into());
}
evaled_args
};

Ok(cast_args)
}

pub fn new_postgres_query(
catalog_reader: &CatalogReadGuard,
db_name: &str,
schema_path: SchemaPath<'_>,
args: Vec<ExprImpl>,
) -> RwResult<Self> {
let args = Self::handle_postgres_or_mysql_query_args(
catalog_reader,
db_name,
schema_path,
args,
"postgres-cdc",
)?;
let evaled_args = args
.iter()
.map(expr_impl_to_string_fn)
.collect::<RwResult<Vec<_>>>()?;

#[cfg(madsim)]
{
return Err(crate::error::ErrorCode::BindError(
Expand Down Expand Up @@ -411,45 +448,23 @@ impl TableFunction {
}
}

pub fn new_mysql_query(args: Vec<ExprImpl>) -> RwResult<Self> {
static MYSQL_ARGS_LEN: usize = 6;
let args = {
if args.len() != MYSQL_ARGS_LEN {
return Err(BindError("mysql_query function only accepts 6 arguments: mysql_query(hostname varchar, port varchar, username varchar, password varchar, database_name varchar, mysql_query varchar)".to_owned()).into());
}
let mut cast_args = Vec::with_capacity(MYSQL_ARGS_LEN);
for arg in args {
let arg = arg.cast_implicit(DataType::Varchar)?;
cast_args.push(arg);
}
cast_args
};
let evaled_args = {
let mut evaled_args: Vec<String> = Vec::with_capacity(MYSQL_ARGS_LEN);
for arg in &args {
match arg.try_fold_const() {
Some(Ok(value)) => {
let Some(scalar) = value else {
return Err(BindError(
"mysql_query function does not accept null arguments".to_owned(),
)
.into());
};
evaled_args.push(scalar.into_utf8().into());
}
Some(Err(err)) => {
return Err(err);
}
None => {
return Err(BindError(
"mysql_query function only accepts constant arguments".to_owned(),
)
.into());
}
}
}
evaled_args
};
pub fn new_mysql_query(
catalog_reader: &CatalogReadGuard,
db_name: &str,
schema_path: SchemaPath<'_>,
args: Vec<ExprImpl>,
) -> RwResult<Self> {
let args = Self::handle_postgres_or_mysql_query_args(
catalog_reader,
db_name,
schema_path,
args,
"mysql-cdc",
)?;
let evaled_args = args
.iter()
.map(expr_impl_to_string_fn)
.collect::<RwResult<Vec<_>>>()?;

#[cfg(madsim)]
{
Expand Down Expand Up @@ -624,3 +639,24 @@ impl Expr for TableFunction {
unreachable!("Table function should not be converted to ExprNode")
}
}

fn expr_impl_to_string_fn(arg: &ExprImpl) -> RwResult<String> {
match arg.try_fold_const() {
Some(Ok(value)) => {
let Some(scalar) = value else {
return Err(BindError(
"postgres_query function and mysql_query function do not accept null arguments"
.to_owned(),
)
.into());
};
Ok(scalar.into_utf8().to_string())
}
Some(Err(err)) => Err(err),
None => Err(BindError(
"postgres_query function and mysql_query function only accept constant arguments"
.to_owned(),
)
.into()),
}
}
5 changes: 5 additions & 0 deletions src/risedevtool/src/risedev_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ pub fn generate_risedev_env(services: &Vec<ServiceConfig>) -> String {
writeln!(env, r#"PGUSER="{user}""#,).unwrap();
writeln!(env, r#"PGPASSWORD="{password}""#,).unwrap();
writeln!(env, r#"PGDATABASE="{database}""#,).unwrap();
writeln!(
env,
r#"RISEDEV_POSTGRES_WITH_OPTIONS_COMMON="connector='postgres-cdc',hostname='{host}',port='{port}'""#,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think test specific env vars should be set in risedev_env. How about setting them inside the e2e-source-test script?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be acceptable as we already have it for Mysql tests

writeln!(env, r#"RISEDEV_MYSQL_WITH_OPTIONS_COMMON="connector='mysql-cdc',hostname='{host}',port='{port}'""#,).unwrap();

)
.unwrap();
}
ServiceConfig::SqlServer(c) => {
let host = &c.address;
Expand Down
Loading