diff --git a/.gitignore b/.gitignore index 72e3ed1487fe0..5f54a467b21b7 100644 --- a/.gitignore +++ b/.gitignore @@ -71,12 +71,12 @@ e2e_test/generated/* scale-test.tar.zst simulation-it-test.tar.zst - # hummock-trace .trace # spark binary e2e_test/iceberg/spark-*-bin* +e2e_test/iceberg/metastore_db **/poetry.lock diff --git a/Cargo.lock b/Cargo.lock index 32c5fe29fc5aa..98d01dff60589 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6086,7 +6086,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.3.141592654" -source = "git+https://github.com/risingwavelabs/icelake.git?rev=1860eb315183a5f3f72b4097c1e40d49407f8373#1860eb315183a5f3f72b4097c1e40d49407f8373" +source = "git+https://github.com/risingwavelabs/icelake.git?rev=3f4724158acee37a4785f56670a1427993a58739#3f4724158acee37a4785f56670a1427993a58739" dependencies = [ "anyhow", "apache-avro 0.17.0 (git+https://github.com/apache/avro.git)", diff --git a/Cargo.toml b/Cargo.toml index a5da9b82b658c..46ab2695a4ebb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -135,7 +135,8 @@ tonic-build = { package = "madsim-tonic-build", version = "0.5" } otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" } prost = { version = "0.13" } prost-build = { version = "0.13" } -icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "1860eb315183a5f3f72b4097c1e40d49407f8373", features = [ +# branch dylan/fix_parquet_nested_type_field_id +icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "3f4724158acee37a4785f56670a1427993a58739", features = [ "prometheus", ] } arrow-array-iceberg = { package = "arrow-array", version = "52" } diff --git a/e2e_test/iceberg/main.py b/e2e_test/iceberg/main.py index 01017f3db783d..4279b899c5c1d 100644 --- a/e2e_test/iceberg/main.py +++ b/e2e_test/iceberg/main.py @@ -55,16 +55,23 @@ def execute_slt(args, slt): def verify_result(args, verify_sql, verify_schema, verify_data): tc = unittest.TestCase() - print(f"Executing sql: {verify_sql}") + + time.sleep(3) + print(f"verify_result:\nExecuting sql: {verify_sql}") spark = get_spark(args) df = spark.sql(verify_sql).collect() + print(f"Result:") + print(f"================") for row in df: print(row) + print(f"================") rows = verify_data.splitlines() - tc.assertEqual(len(df), len(rows)) + tc.assertEqual(len(df), len(rows), "row length mismatch") + tc.assertEqual(len(verify_schema), len(df[0]), "column length mismatch") for row1, row2 in zip(df, rows): print(f"Row1: {row1}, Row 2: {row2}") - row2 = row2.split(",") + # New parsing logic for row2 + row2 = parse_row(row2) for idx, ty in enumerate(verify_schema): if ty == "int" or ty == "long": tc.assertEqual(row1[idx], int(row2[idx])) @@ -89,7 +96,7 @@ def verify_result(args, verify_sql, verify_schema, verify_data): else: tc.assertEqual(row1[idx], decimal.Decimal(row2[idx])) else: - tc.fail(f"Unsupported type {ty}") + tc.assertEqual(str(row1[idx]), str(row2[idx])) def compare_sql(args, cmp_sqls): assert len(cmp_sqls) == 2 @@ -113,6 +120,32 @@ def drop_table(args, drop_sqls): spark.sql(sql) +def parse_row(row): + result = [] + current = "" + parenthesis_count = {"{": 0, "[": 0, "(": 0} + for char in row: + if char in parenthesis_count: + parenthesis_count[char] += 1 + elif char == "}": + parenthesis_count["{"] -= 1 + elif char == "]": + parenthesis_count["["] -= 1 + elif char == ")": + parenthesis_count["("] -= 1 + + if char == "," and all(value == 0 for value in parenthesis_count.values()): + result.append(current.strip()) + current = "" + else: + current += char + + if current: + result.append(current.strip()) + + return result + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Test script for iceberg") parser.add_argument("-t", dest="test_case", type=str, help="Test case file") @@ -151,4 +184,3 @@ def drop_table(args, drop_sqls): execute_slt(config, verify_slt) if drop_sqls is not None and drop_sqls != "": drop_table(config, drop_sqls) - diff --git a/e2e_test/iceberg/start_spark_connect_server.sh b/e2e_test/iceberg/start_spark_connect_server.sh index 345653778b14c..f0f3f19a1fab7 100755 --- a/e2e_test/iceberg/start_spark_connect_server.sh +++ b/e2e_test/iceberg/start_spark_connect_server.sh @@ -1,3 +1,5 @@ +#!/usr/bin/env bash + set -ex ICEBERG_VERSION=1.4.3 diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt index a83173fc48ab6..66eb11da1f438 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt @@ -16,7 +16,10 @@ v_bool boolean, v_date date, v_timestamp timestamptz, v_ts_ntz timestamp, -v_decimal decimal +v_decimal decimal, +v_map map(int, int), +v_array int[], +v_struct struct ); statement ok @@ -41,10 +44,10 @@ CREATE SINK s6 AS select * from mv6 WITH ( statement ok INSERT INTO t6 VALUES -(1, 1, 1000, 1.1, 1.11, '1-1', true, '2022-03-11', '2022-03-11 01:00:00Z'::timestamptz, '2022-03-11 01:00:00',1.11), -(2, 2, 2000, 2.2, 2.22, '2-2', false, '2022-03-12', '2022-03-12 02:00:00Z'::timestamptz, '2022-03-12 02:00:00',2.22), -(3, 3, 3000, 3.3, 3.33, '3-3', true, '2022-03-13', '2022-03-13 03:00:00Z'::timestamptz, '2022-03-13 03:00:00','inf'), -(4, 4, 4000, 4.4, 4.44, '4-4', false, '2022-03-14', '2022-03-14 04:00:00Z'::timestamptz, '2022-03-14 04:00:00','-inf'); +(1, 1, 1000, 1.1, 1.11, '1-1', true, '2022-03-11', '2022-03-11 01:00:00Z'::timestamptz, '2022-03-11 01:00:00',1.11, map {1:100,2:200}, array[1,2,3], row(1,2)), +(2, 2, 2000, 2.2, 2.22, '2-2', false, '2022-03-12', '2022-03-12 02:00:00Z'::timestamptz, '2022-03-12 02:00:00',2.22, map {3:300}, array[1,null,3], row(3,null)), +(3, 3, 3000, 3.3, 3.33, '3-3', true, '2022-03-13', '2022-03-13 03:00:00Z'::timestamptz, '2022-03-13 03:00:00','inf', null, null, null), +(4, 4, 4000, 4.4, 4.44, '4-4', false, '2022-03-14', '2022-03-14 04:00:00Z'::timestamptz, '2022-03-14 04:00:00','-inf', null, null, null); statement ok FLUSH; @@ -53,7 +56,7 @@ sleep 5s statement ok INSERT INTO t6 VALUES -(5, 5, 5000, 5.5, 5.55, '5-5', true, '2022-03-15', '2022-03-15 05:00:00Z'::timestamptz, '2022-03-15 05:00:00','nan'); +(5, 5, 5000, 5.5, 5.55, '5-5', true, '2022-03-15', '2022-03-15 05:00:00Z'::timestamptz, '2022-03-15 05:00:00','nan', null, null, null); statement ok FLUSH; diff --git a/e2e_test/iceberg/test_case/no_partition_append_only.toml b/e2e_test/iceberg/test_case/no_partition_append_only.toml index 7d2952c508756..9d49b7a29d17f 100644 --- a/e2e_test/iceberg/test_case/no_partition_append_only.toml +++ b/e2e_test/iceberg/test_case/no_partition_append_only.toml @@ -13,24 +13,27 @@ init_sqls = [ v_date date, v_timestamp timestamp, v_ts_ntz timestamp_ntz, - v_decimal decimal(10,5) + v_decimal decimal(10,5), + v_map map, + v_array array, + v_struct struct ) USING iceberg TBLPROPERTIES ('format-version'='2'); ''' ] slt = 'test_case/iceberg_sink_no_partition_append_only_table.slt' -verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz','decimal'] +verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz','decimal', 'map', 'array', 'struct'] verify_sql = 'SELECT * FROM demo_db.no_partition_append_only_table ORDER BY id ASC' verify_data = """ -1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00,1.11 -2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00,2.22 -3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00,99999.99999 -4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00,-99999.99999 -5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none +1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00,1.11,{1: 100, 2: 200},[1, 2, 3],Row(a=1, b=2) +2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00,2.22,{3: 300},[1, None, 3],Row(a=3, b=None) +3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00,99999.99999,None,None,None +4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00,-99999.99999,None,None,None +5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none,None,None,None """ verify_slt = 'test_case/iceberg_sink_no_partition_append_only_table_verify.slt' diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 13797a0b9118b..8fa3e2abb6b5f 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -448,11 +448,16 @@ pub trait ToArrow { #[inline] fn map_type_to_arrow(&self, map_type: &MapType) -> Result { let sorted = false; - let struct_type = map_type.clone().into_struct(); + // "key" is always non-null + let key = self + .to_arrow_field("key", map_type.key())? + .with_nullable(false); + let value = self.to_arrow_field("value", map_type.value())?; Ok(arrow_schema::DataType::Map( Arc::new(arrow_schema::Field::new( "entries", - self.struct_type_to_arrow(struct_type.as_struct())?, + arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()), + // "entries" is always non-null false, )), sorted, diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index b68e74b1f5d95..1b135cd4d3b40 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -1375,15 +1375,21 @@ pub fn try_matches_arrow_schema( (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true, (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true, (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true, - (left, right) => left == right, + // cases where left != right (metadata, field name mismatch) + // + // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it: + // {"PARQUET:field_id": ".."} + // + // map: The standard name in arrow is "entries", "key", "value". + // in iceberg-rs, it's called "key_value" + (left, right) => left.equals_datatype(right), }; if !compatible { - bail!("Field {}'s type not compatible, risingwave converted data type {}, iceberg's data type: {}", + bail!("field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}", arrow_field.name(), converted_arrow_data_type, arrow_field.data_type() ); } } - Ok(()) }