Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iceberg): fix map - arrow convert & fix sink nested types to iceberg #18463

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ e2e_test/generated/*
scale-test.tar.zst
simulation-it-test.tar.zst


# hummock-trace
.trace

# spark binary
e2e_test/iceberg/spark-*-bin*
e2e_test/iceberg/metastore_db

**/poetry.lock

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ tonic-build = { package = "madsim-tonic-build", version = "0.5" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" }
prost = { version = "0.13" }
prost-build = { version = "0.13" }
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "1860eb315183a5f3f72b4097c1e40d49407f8373", features = [
# branch dylan/fix_parquet_nested_type_field_id
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "3f4724158acee37a4785f56670a1427993a58739", features = [
"prometheus",
] }
arrow-array-iceberg = { package = "arrow-array", version = "52" }
Expand Down
42 changes: 37 additions & 5 deletions e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,23 @@ def execute_slt(args, slt):

def verify_result(args, verify_sql, verify_schema, verify_data):
tc = unittest.TestCase()
print(f"Executing sql: {verify_sql}")

time.sleep(3)
print(f"verify_result:\nExecuting sql: {verify_sql}")
spark = get_spark(args)
df = spark.sql(verify_sql).collect()
print(f"Result:")
print(f"================")
for row in df:
print(row)
print(f"================")
rows = verify_data.splitlines()
tc.assertEqual(len(df), len(rows))
tc.assertEqual(len(df), len(rows), "row length mismatch")
tc.assertEqual(len(verify_schema), len(df[0]), "column length mismatch")
for row1, row2 in zip(df, rows):
print(f"Row1: {row1}, Row 2: {row2}")
row2 = row2.split(",")
# New parsing logic for row2
row2 = parse_row(row2)
for idx, ty in enumerate(verify_schema):
if ty == "int" or ty == "long":
tc.assertEqual(row1[idx], int(row2[idx]))
Expand All @@ -89,7 +96,7 @@ def verify_result(args, verify_sql, verify_schema, verify_data):
else:
tc.assertEqual(row1[idx], decimal.Decimal(row2[idx]))
else:
tc.fail(f"Unsupported type {ty}")
tc.assertEqual(str(row1[idx]), str(row2[idx]))

def compare_sql(args, cmp_sqls):
assert len(cmp_sqls) == 2
Expand All @@ -113,6 +120,32 @@ def drop_table(args, drop_sqls):
spark.sql(sql)


def parse_row(row):
result = []
current = ""
parenthesis_count = {"{": 0, "[": 0, "(": 0}
for char in row:
if char in parenthesis_count:
parenthesis_count[char] += 1
elif char == "}":
parenthesis_count["{"] -= 1
elif char == "]":
parenthesis_count["["] -= 1
elif char == ")":
parenthesis_count["("] -= 1

if char == "," and all(value == 0 for value in parenthesis_count.values()):
result.append(current.strip())
current = ""
else:
current += char

if current:
result.append(current.strip())

return result


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test script for iceberg")
parser.add_argument("-t", dest="test_case", type=str, help="Test case file")
Expand Down Expand Up @@ -151,4 +184,3 @@ def drop_table(args, drop_sqls):
execute_slt(config, verify_slt)
if drop_sqls is not None and drop_sqls != "":
drop_table(config, drop_sqls)

2 changes: 2 additions & 0 deletions e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env bash

set -ex

ICEBERG_VERSION=1.4.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ v_bool boolean,
v_date date,
v_timestamp timestamptz,
v_ts_ntz timestamp,
v_decimal decimal
v_decimal decimal,
v_map map(int, int),
v_array int[],
v_struct struct<a int,b int>
);

statement ok
Expand All @@ -41,10 +44,10 @@ CREATE SINK s6 AS select * from mv6 WITH (

statement ok
INSERT INTO t6 VALUES
(1, 1, 1000, 1.1, 1.11, '1-1', true, '2022-03-11', '2022-03-11 01:00:00Z'::timestamptz, '2022-03-11 01:00:00',1.11),
(2, 2, 2000, 2.2, 2.22, '2-2', false, '2022-03-12', '2022-03-12 02:00:00Z'::timestamptz, '2022-03-12 02:00:00',2.22),
(3, 3, 3000, 3.3, 3.33, '3-3', true, '2022-03-13', '2022-03-13 03:00:00Z'::timestamptz, '2022-03-13 03:00:00','inf'),
(4, 4, 4000, 4.4, 4.44, '4-4', false, '2022-03-14', '2022-03-14 04:00:00Z'::timestamptz, '2022-03-14 04:00:00','-inf');
(1, 1, 1000, 1.1, 1.11, '1-1', true, '2022-03-11', '2022-03-11 01:00:00Z'::timestamptz, '2022-03-11 01:00:00',1.11, map {1:100,2:200}, array[1,2,3], row(1,2)),
(2, 2, 2000, 2.2, 2.22, '2-2', false, '2022-03-12', '2022-03-12 02:00:00Z'::timestamptz, '2022-03-12 02:00:00',2.22, map {3:300}, array[1,null,3], row(3,null)),
(3, 3, 3000, 3.3, 3.33, '3-3', true, '2022-03-13', '2022-03-13 03:00:00Z'::timestamptz, '2022-03-13 03:00:00','inf', null, null, null),
(4, 4, 4000, 4.4, 4.44, '4-4', false, '2022-03-14', '2022-03-14 04:00:00Z'::timestamptz, '2022-03-14 04:00:00','-inf', null, null, null);

statement ok
FLUSH;
Expand All @@ -53,7 +56,7 @@ sleep 5s

statement ok
INSERT INTO t6 VALUES
(5, 5, 5000, 5.5, 5.55, '5-5', true, '2022-03-15', '2022-03-15 05:00:00Z'::timestamptz, '2022-03-15 05:00:00','nan');
(5, 5, 5000, 5.5, 5.55, '5-5', true, '2022-03-15', '2022-03-15 05:00:00Z'::timestamptz, '2022-03-15 05:00:00','nan', null, null, null);

statement ok
FLUSH;
Expand Down
17 changes: 10 additions & 7 deletions e2e_test/iceberg/test_case/no_partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,27 @@ init_sqls = [
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz,
v_decimal decimal(10,5)
v_decimal decimal(10,5),
v_map map<int, int>,
v_array array<int>,
v_struct struct<a:int, b:int>
Comment on lines +16 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you ever tried nesting a nested type inside another nested type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried, and found a new icelake bug. It will fail to parse table metadata 🥵

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix this later

) USING iceberg TBLPROPERTIES ('format-version'='2');
'''
]

slt = 'test_case/iceberg_sink_no_partition_append_only_table.slt'

verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz','decimal']
verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz','decimal', 'map', 'array', 'struct']

verify_sql = 'SELECT * FROM demo_db.no_partition_append_only_table ORDER BY id ASC'


verify_data = """
1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00,1.11
2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00,2.22
3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00,99999.99999
4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00,-99999.99999
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none
1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00,1.11,{1: 100, 2: 200},[1, 2, 3],Row(a=1, b=2)
2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00,2.22,{3: 300},[1, None, 3],Row(a=3, b=None)
3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00,99999.99999,None,None,None
4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00,-99999.99999,None,None,None
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none,None,None,None
"""

verify_slt = 'test_case/iceberg_sink_no_partition_append_only_table_verify.slt'
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/no_partition_upsert.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ init_sqls = [

slt = 'test_case/iceberg_sink_no_partition_upsert_table.slt'

verify_schema = ['int','int','long','string']
verify_schema = ['int','int','long','string','date']

verify_sql = 'SELECT * FROM demo_db.no_partition_upsert_table ORDER BY id, v1 ASC'

Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/partition_upsert.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ init_sqls = [

slt = 'test_case/iceberg_sink_partition_upsert_table.slt'

verify_schema = ['int','int','long','string']
verify_schema = ['int','int','long','string', 'date']

verify_sql = 'SELECT * FROM demo_db.partition_upsert_table ORDER BY id, v1 ASC'

Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/range_partition_upsert.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ init_sqls = [

slt = 'test_case/iceberg_sink_range_partition_upsert_table.slt'

verify_schema = ['int','int','long','string']
verify_schema = ['int','int','long','string','date']

verify_sql = 'SELECT * FROM demo_db.range_partition_upsert_table ORDER BY id, v1 ASC'

Expand Down
11 changes: 8 additions & 3 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,17 @@ pub trait ToArrow {
#[inline]
fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
let sorted = false;
let list_type = map_type.clone().into_list();
// "key" is always non-null
let key = self
.to_arrow_field("key", map_type.key())?
.with_nullable(false);
let value = self.to_arrow_field("value", map_type.value())?;
Ok(arrow_schema::DataType::Map(
Arc::new(arrow_schema::Field::new(
"entries",
self.list_type_to_arrow(&list_type)?,
true,
arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
// "entries" is always non-null
false,
)),
sorted,
))
Expand Down
117 changes: 117 additions & 0 deletions src/common/src/array/arrow/arrow_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl FromArrow for UdfArrowConvert {

#[cfg(test)]
mod tests {

use super::*;
use crate::array::*;

Expand Down Expand Up @@ -205,4 +206,120 @@ mod tests {
.unwrap();
assert_eq!(rw_array.as_list(), &array);
}

#[test]
fn map() {
let map_type = MapType::from_kv(DataType::Varchar, DataType::Int32);
let rw_map_type = DataType::Map(map_type.clone());
let mut builder = MapArrayBuilder::with_type(3, rw_map_type.clone());
builder.append_owned(Some(
MapValue::try_from_kv(
ListValue::from_str("{a,b,c}", &DataType::List(Box::new(DataType::Varchar)))
.unwrap(),
ListValue::from_str("{1,2,3}", &DataType::List(Box::new(DataType::Int32))).unwrap(),
)
.unwrap(),
));
builder.append_owned(None);
builder.append_owned(Some(
MapValue::try_from_kv(
ListValue::from_str("{a,c}", &DataType::List(Box::new(DataType::Varchar))).unwrap(),
ListValue::from_str("{1,3}", &DataType::List(Box::new(DataType::Int32))).unwrap(),
)
.unwrap(),
));
let rw_array = builder.finish();

let arrow_map_type = UdfArrowConvert::default()
.map_type_to_arrow(&map_type)
.unwrap();
expect_test::expect![[r#"
Map(
Field {
name: "entries",
data_type: Struct(
[
Field {
name: "key",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "value",
data_type: Int32,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
),
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
false,
)
"#]]
.assert_debug_eq(&arrow_map_type);
let rw_map_type_new = UdfArrowConvert::default()
.from_field(&arrow_schema::Field::new(
"map",
arrow_map_type.clone(),
true,
))
.unwrap();
assert_eq!(rw_map_type, rw_map_type_new);
let arrow = UdfArrowConvert::default()
.map_to_arrow(&arrow_map_type, &rw_array)
.unwrap();
expect_test::expect![[r#"
MapArray
[
StructArray
[
-- child 0: "key" (Utf8)
StringArray
[
"a",
"b",
"c",
]
-- child 1: "value" (Int32)
PrimitiveArray<Int32>
[
1,
2,
3,
]
],
null,
StructArray
[
-- child 0: "key" (Utf8)
StringArray
[
"a",
"c",
]
-- child 1: "value" (Int32)
PrimitiveArray<Int32>
[
1,
3,
]
],
]
"#]]
.assert_debug_eq(&arrow);

let rw_array_new = UdfArrowConvert::default()
.from_map_array(arrow.as_any().downcast_ref().unwrap())
.unwrap();
assert_eq!(&rw_array, rw_array_new.as_map());
}
}
12 changes: 9 additions & 3 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,15 +1375,21 @@ pub fn try_matches_arrow_schema(
(ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
(ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
(ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
(left, right) => left == right,
// cases where left != right (metadata, field name mismatch)
//
// all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
// {"PARQUET:field_id": ".."}
//
// map: The standard name in arrow is "entries", "key", "value".
// in iceberg-rs, it's called "key_value"
(left, right) => left.equals_datatype(right),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comparing only the datatype. That's exactly what we need!

};
if !compatible {
bail!("Field {}'s type not compatible, risingwave converted data type {}, iceberg's data type: {}",
bail!("field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
arrow_field.name(), converted_arrow_data_type, arrow_field.data_type()
);
}
}

Ok(())
}

Expand Down
Loading