diff --git a/e2e_test/source_inline/kafka/handling_mode.slt b/e2e_test/source_inline/kafka/handling_mode.slt index 40259e130dcb3..b5591e6852624 100644 --- a/e2e_test/source_inline/kafka/handling_mode.slt +++ b/e2e_test/source_inline/kafka/handling_mode.slt @@ -78,7 +78,7 @@ format debezium encode json (timestamptz.handling.mode = 'milli'); sleep 2s -query TT +query TT retry 3 backoff 5s select "case", (payload).after.at from plain_guess order by 1; ---- 0 number small 1970-01-01 00:01:40+00:00 @@ -86,7 +86,7 @@ select "case", (payload).after.at from plain_guess order by 1; 2 string utc 2024-04-11 02:00:00.654321+00:00 3 string naive NULL -query TT +query TT retry 3 backoff 5s select "case", (payload).after.at from plain_milli order by 1; ---- 0 number small 1970-01-01 00:00:00.100+00:00 @@ -94,7 +94,7 @@ select "case", (payload).after.at from plain_milli order by 1; 2 string utc 2024-04-11 02:00:00.654321+00:00 3 string naive NULL -query TT +query TT retry 3 backoff 5s select "case", (payload).after.at from plain_micro order by 1; ---- 0 number small 1970-01-01 00:00:00.000100+00:00 @@ -102,7 +102,7 @@ select "case", (payload).after.at from plain_micro order by 1; 2 string utc 2024-04-11 02:00:00.654321+00:00 3 string naive NULL -query TT +query TT retry 3 backoff 5s select "case", (payload).after.at from plain_utc order by 1; ---- 0 number small NULL @@ -110,7 +110,7 @@ select "case", (payload).after.at from plain_utc order by 1; 2 string utc 2024-04-11 02:00:00.654321+00:00 3 string naive NULL -query TT +query TT retry 3 backoff 5s select "case", (payload).after.at from plain_naive order by 1; ---- 0 number small NULL @@ -118,7 +118,7 @@ select "case", (payload).after.at from plain_naive order by 1; 2 string utc NULL 3 string naive 2024-04-11 02:00:00.234321+00:00 -query TT +query TT retry 3 backoff 5s select "case", at from debezium_milli order by 1; ---- 0 number small 1970-01-01 00:00:00.100+00:00 diff --git a/e2e_test/source_inline/tvf/mysql_query.slt b/e2e_test/source_inline/tvf/mysql_query.slt index 0c7e69da33d7c..7b2e9fff77bfd 100644 --- a/e2e_test/source_inline/tvf/mysql_query.slt +++ b/e2e_test/source_inline/tvf/mysql_query.slt @@ -62,12 +62,24 @@ INSERT INTO test SELECT null as v23; " +statement ok +create source mysql_cdc_source with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = '$RISEDEV_MYSQL_USER', + password = '$MYSQL_PWD', + database.name = 'tvf', +); query select * from mysql_query('$MYSQL_HOST', '$MYSQL_TCP_PORT', '$RISEDEV_MYSQL_USER', '$MYSQL_PWD', 'tvf', 'select * from test;'); ---- 1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL +query +select * from mysql_query('mysql_cdc_source', 'select * from test;'); +---- +1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL + system ok mysql -e " @@ -161,4 +173,7 @@ system ok mysql -e " USE tvf; DROP DATABASE tvf; -" \ No newline at end of file +" + +statement ok +drop source mysql_cdc_source; \ No newline at end of file diff --git a/e2e_test/source_inline/tvf/postgres_query.slt b/e2e_test/source_inline/tvf/postgres_query.slt index 88e508e442474..e332e860928d6 100644 --- a/e2e_test/source_inline/tvf/postgres_query.slt +++ b/e2e_test/source_inline/tvf/postgres_query.slt @@ -30,6 +30,16 @@ PGDATABASE=source_test psql -c " INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00 pst', 'text', 'varchar', '1 day', '{}', '\x01'; " +statement ok +create source postgres_cdc_source with ( + ${RISEDEV_POSTGRES_WITH_OPTIONS_COMMON}, + username = '$PGUSER', + password = '$PGPASSWORD', + database.name = 'source_test' +); + +sleep 1s + query II select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '$PGPASSWORD', 'source_test', 'select * from test where id > 90;'); ---- @@ -43,3 +53,20 @@ select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '$PGPASSWORD', 'so 98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 + +query II +select * from postgres_query('postgres_cdc_source', 'select * from test where id > 90;'); +---- +91 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +92 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +93 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +94 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +95 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +96 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +97 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 + +statement ok +drop source postgres_cdc_source; diff --git a/src/frontend/src/binder/expr/function/mod.rs b/src/frontend/src/binder/expr/function/mod.rs index b45151c7aeada..52c2fb3b81312 100644 --- a/src/frontend/src/binder/expr/function/mod.rs +++ b/src/frontend/src/binder/expr/function/mod.rs @@ -317,9 +317,14 @@ impl Binder { "`VARIADIC` is not allowed in table function call" ); self.ensure_table_function_allowed()?; - return Ok(TableFunction::new_postgres_query(args) - .context("postgres_query error")? - .into()); + return Ok(TableFunction::new_postgres_query( + &self.catalog, + &self.db_name, + self.bind_schema_path(schema_name.as_deref()), + args, + ) + .context("postgres_query error")? + .into()); } // `mysql_query` table function if func_name.eq("mysql_query") { @@ -328,9 +333,14 @@ impl Binder { "`VARIADIC` is not allowed in table function call" ); self.ensure_table_function_allowed()?; - return Ok(TableFunction::new_mysql_query(args) - .context("mysql_query error")? - .into()); + return Ok(TableFunction::new_mysql_query( + &self.catalog, + &self.db_name, + self.bind_schema_path(schema_name.as_deref()), + args, + ) + .context("mysql_query error")? + .into()); } // UDTF if let Some(ref udf) = udf diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 11b2198b724d2..e022e37c64787 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -19,6 +19,7 @@ use itertools::Itertools; use mysql_async::consts::ColumnType as MySqlColumnType; use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_azblob_operator, @@ -30,10 +31,15 @@ use thiserror_ext::AsReport; use tokio_postgres::types::Type as TokioPgType; use super::{infer_type, Expr, ExprImpl, ExprRewriter, Literal, RwResult}; +use crate::catalog::catalog_service::CatalogReadGuard; use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind}; +use crate::catalog::root_catalog::SchemaPath; use crate::error::ErrorCode::BindError; use crate::utils::FRONTEND_RUNTIME; +const INLINE_ARG_LEN: usize = 6; +const CDC_SOURCE_ARG_LEN: usize = 2; + /// A table function takes a row as input and returns a table. It is also known as Set-Returning /// Function. /// @@ -291,45 +297,76 @@ impl TableFunction { }) } - pub fn new_postgres_query(args: Vec) -> RwResult { - let args = { - if args.len() != 6 { - return Err(BindError("postgres_query function only accepts 6 arguments: postgres_query(hostname varchar, port varchar, username varchar, password varchar, database_name varchar, postgres_query varchar)".to_owned()).into()); - } - let mut cast_args = Vec::with_capacity(6); - for arg in args { - let arg = arg.cast_implicit(DataType::Varchar)?; - cast_args.push(arg); + fn handle_postgres_or_mysql_query_args( + catalog_reader: &CatalogReadGuard, + db_name: &str, + schema_path: SchemaPath<'_>, + args: Vec, + expect_connector_name: &str, + ) -> RwResult> { + let cast_args = match args.len() { + INLINE_ARG_LEN => { + let mut cast_args = Vec::with_capacity(INLINE_ARG_LEN); + for arg in args { + let arg = arg.cast_implicit(DataType::Varchar)?; + cast_args.push(arg); + } + cast_args } - cast_args - }; - let evaled_args = { - let mut evaled_args: Vec = Vec::with_capacity(6); - for arg in &args { - match arg.try_fold_const() { - Some(Ok(value)) => { - let Some(scalar) = value else { - return Err(BindError( - "postgres_query function does not accept null arguments".to_owned(), - ) - .into()); - }; - evaled_args.push(scalar.into_utf8().into()); - } - Some(Err(err)) => { - return Err(err); - } - None => { - return Err(BindError( - "postgres_query function only accepts constant arguments".to_owned(), - ) - .into()); - } + CDC_SOURCE_ARG_LEN => { + let source_name = expr_impl_to_string_fn(&args[0])?; + let source_catalog = catalog_reader + .get_source_by_name(db_name, schema_path, &source_name)? + .0; + if !source_catalog + .connector_name() + .eq_ignore_ascii_case(expect_connector_name) + { + return Err(BindError(format!("TVF function only accepts `mysql-cdc` and `postgres-cdc` source. Expected: {}, but got: {}", expect_connector_name, source_catalog.connector_name())).into()); } + + let (props, secret_refs) = source_catalog.with_properties.clone().into_parts(); + let secret_resolved = + LocalSecretManager::global().fill_secrets(props, secret_refs)?; + + vec![ + ExprImpl::literal_varchar(secret_resolved["hostname"].clone()), + ExprImpl::literal_varchar(secret_resolved["port"].clone()), + ExprImpl::literal_varchar(secret_resolved["username"].clone()), + ExprImpl::literal_varchar(secret_resolved["password"].clone()), + ExprImpl::literal_varchar(secret_resolved["database.name"].clone()), + args.get(1) + .unwrap() + .clone() + .cast_implicit(DataType::Varchar)?, + ] + } + _ => { + return Err(BindError("postgres_query function and mysql_query function accept either 2 arguments: (cdc_source_name varchar, query varchar) or 6 arguments: (hostname varchar, port varchar, username varchar, password varchar, database_name varchar, query varchar)".to_owned()).into()); } - evaled_args }; + Ok(cast_args) + } + + pub fn new_postgres_query( + catalog_reader: &CatalogReadGuard, + db_name: &str, + schema_path: SchemaPath<'_>, + args: Vec, + ) -> RwResult { + let args = Self::handle_postgres_or_mysql_query_args( + catalog_reader, + db_name, + schema_path, + args, + "postgres-cdc", + )?; + let evaled_args = args + .iter() + .map(expr_impl_to_string_fn) + .collect::>>()?; + #[cfg(madsim)] { return Err(crate::error::ErrorCode::BindError( @@ -411,45 +448,23 @@ impl TableFunction { } } - pub fn new_mysql_query(args: Vec) -> RwResult { - static MYSQL_ARGS_LEN: usize = 6; - let args = { - if args.len() != MYSQL_ARGS_LEN { - return Err(BindError("mysql_query function only accepts 6 arguments: mysql_query(hostname varchar, port varchar, username varchar, password varchar, database_name varchar, mysql_query varchar)".to_owned()).into()); - } - let mut cast_args = Vec::with_capacity(MYSQL_ARGS_LEN); - for arg in args { - let arg = arg.cast_implicit(DataType::Varchar)?; - cast_args.push(arg); - } - cast_args - }; - let evaled_args = { - let mut evaled_args: Vec = Vec::with_capacity(MYSQL_ARGS_LEN); - for arg in &args { - match arg.try_fold_const() { - Some(Ok(value)) => { - let Some(scalar) = value else { - return Err(BindError( - "mysql_query function does not accept null arguments".to_owned(), - ) - .into()); - }; - evaled_args.push(scalar.into_utf8().into()); - } - Some(Err(err)) => { - return Err(err); - } - None => { - return Err(BindError( - "mysql_query function only accepts constant arguments".to_owned(), - ) - .into()); - } - } - } - evaled_args - }; + pub fn new_mysql_query( + catalog_reader: &CatalogReadGuard, + db_name: &str, + schema_path: SchemaPath<'_>, + args: Vec, + ) -> RwResult { + let args = Self::handle_postgres_or_mysql_query_args( + catalog_reader, + db_name, + schema_path, + args, + "mysql-cdc", + )?; + let evaled_args = args + .iter() + .map(expr_impl_to_string_fn) + .collect::>>()?; #[cfg(madsim)] { @@ -624,3 +639,24 @@ impl Expr for TableFunction { unreachable!("Table function should not be converted to ExprNode") } } + +fn expr_impl_to_string_fn(arg: &ExprImpl) -> RwResult { + match arg.try_fold_const() { + Some(Ok(value)) => { + let Some(scalar) = value else { + return Err(BindError( + "postgres_query function and mysql_query function do not accept null arguments" + .to_owned(), + ) + .into()); + }; + Ok(scalar.into_utf8().to_string()) + } + Some(Err(err)) => Err(err), + None => Err(BindError( + "postgres_query function and mysql_query function only accept constant arguments" + .to_owned(), + ) + .into()), + } +} diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index a52754c757983..82e5a1ea8726d 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -117,6 +117,11 @@ pub fn generate_risedev_env(services: &Vec) -> String { writeln!(env, r#"PGUSER="{user}""#,).unwrap(); writeln!(env, r#"PGPASSWORD="{password}""#,).unwrap(); writeln!(env, r#"PGDATABASE="{database}""#,).unwrap(); + writeln!( + env, + r#"RISEDEV_POSTGRES_WITH_OPTIONS_COMMON="connector='postgres-cdc',hostname='{host}',port='{port}'""#, + ) + .unwrap(); } ServiceConfig::SqlServer(c) => { let host = &c.address;