diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index cb0ae0f551f2..6b4e2aae2921 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -58,6 +58,7 @@ message LogicalPlanNode { DropViewNode drop_view = 27; DistinctOnNode distinct_on = 28; CopyToNode copy_to = 29; + UnnestNode unnest = 30; } } @@ -260,6 +261,20 @@ message CopyToNode { repeated string partition_by = 7; } +message UnnestNode { + LogicalPlanNode input = 1; + repeated datafusion_common.Column exec_columns = 2; + repeated uint64 list_type_columns = 3; + repeated uint64 struct_type_columns = 4; + repeated uint64 dependency_indices = 5; + datafusion_common.DfSchema schema = 6; + UnnestOptions options = 7; +} + +message UnnestOptions { + bool preserve_nulls = 1; +} + message UnionNode { repeated LogicalPlanNode inputs = 1; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 2edbae24294b..bbee3311b7d3 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -10360,6 +10360,9 @@ impl serde::Serialize for LogicalPlanNode { logical_plan_node::LogicalPlanType::CopyTo(v) => { struct_ser.serialize_field("copyTo", v)?; } + logical_plan_node::LogicalPlanType::Unnest(v) => { + struct_ser.serialize_field("unnest", v)?; + } } } struct_ser.end() @@ -10413,6 +10416,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode { "distinctOn", "copy_to", "copyTo", + "unnest", ]; #[allow(clippy::enum_variant_names)] @@ -10445,6 +10449,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode { DropView, DistinctOn, CopyTo, + Unnest, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -10494,6 +10499,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode { "dropView" | "drop_view" => Ok(GeneratedField::DropView), "distinctOn" | "distinct_on" => Ok(GeneratedField::DistinctOn), "copyTo" | "copy_to" => Ok(GeneratedField::CopyTo), + "unnest" => Ok(GeneratedField::Unnest), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -10710,6 +10716,13 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode { return Err(serde::de::Error::duplicate_field("copyTo")); } logical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::CopyTo) +; + } + GeneratedField::Unnest => { + if logical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("unnest")); + } + logical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::Unnest) ; } } @@ -19229,6 +19242,304 @@ impl<'de> serde::Deserialize<'de> for Unnest { deserializer.deserialize_struct("datafusion.Unnest", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for UnnestNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.input.is_some() { + len += 1; + } + if !self.exec_columns.is_empty() { + len += 1; + } + if !self.list_type_columns.is_empty() { + len += 1; + } + if !self.struct_type_columns.is_empty() { + len += 1; + } + if !self.dependency_indices.is_empty() { + len += 1; + } + if self.schema.is_some() { + len += 1; + } + if self.options.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.UnnestNode", len)?; + if let Some(v) = self.input.as_ref() { + struct_ser.serialize_field("input", v)?; + } + if !self.exec_columns.is_empty() { + struct_ser.serialize_field("execColumns", &self.exec_columns)?; + } + if !self.list_type_columns.is_empty() { + struct_ser.serialize_field("listTypeColumns", &self.list_type_columns.iter().map(ToString::to_string).collect::>())?; + } + if !self.struct_type_columns.is_empty() { + struct_ser.serialize_field("structTypeColumns", &self.struct_type_columns.iter().map(ToString::to_string).collect::>())?; + } + if !self.dependency_indices.is_empty() { + struct_ser.serialize_field("dependencyIndices", &self.dependency_indices.iter().map(ToString::to_string).collect::>())?; + } + if let Some(v) = self.schema.as_ref() { + struct_ser.serialize_field("schema", v)?; + } + if let Some(v) = self.options.as_ref() { + struct_ser.serialize_field("options", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for UnnestNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "input", + "exec_columns", + "execColumns", + "list_type_columns", + "listTypeColumns", + "struct_type_columns", + "structTypeColumns", + "dependency_indices", + "dependencyIndices", + "schema", + "options", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Input, + ExecColumns, + ListTypeColumns, + StructTypeColumns, + DependencyIndices, + Schema, + Options, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "input" => Ok(GeneratedField::Input), + "execColumns" | "exec_columns" => Ok(GeneratedField::ExecColumns), + "listTypeColumns" | "list_type_columns" => Ok(GeneratedField::ListTypeColumns), + "structTypeColumns" | "struct_type_columns" => Ok(GeneratedField::StructTypeColumns), + "dependencyIndices" | "dependency_indices" => Ok(GeneratedField::DependencyIndices), + "schema" => Ok(GeneratedField::Schema), + "options" => Ok(GeneratedField::Options), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = UnnestNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.UnnestNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut input__ = None; + let mut exec_columns__ = None; + let mut list_type_columns__ = None; + let mut struct_type_columns__ = None; + let mut dependency_indices__ = None; + let mut schema__ = None; + let mut options__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Input => { + if input__.is_some() { + return Err(serde::de::Error::duplicate_field("input")); + } + input__ = map_.next_value()?; + } + GeneratedField::ExecColumns => { + if exec_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("execColumns")); + } + exec_columns__ = Some(map_.next_value()?); + } + GeneratedField::ListTypeColumns => { + if list_type_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("listTypeColumns")); + } + list_type_columns__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } + GeneratedField::StructTypeColumns => { + if struct_type_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("structTypeColumns")); + } + struct_type_columns__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } + GeneratedField::DependencyIndices => { + if dependency_indices__.is_some() { + return Err(serde::de::Error::duplicate_field("dependencyIndices")); + } + dependency_indices__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } + GeneratedField::Schema => { + if schema__.is_some() { + return Err(serde::de::Error::duplicate_field("schema")); + } + schema__ = map_.next_value()?; + } + GeneratedField::Options => { + if options__.is_some() { + return Err(serde::de::Error::duplicate_field("options")); + } + options__ = map_.next_value()?; + } + } + } + Ok(UnnestNode { + input: input__, + exec_columns: exec_columns__.unwrap_or_default(), + list_type_columns: list_type_columns__.unwrap_or_default(), + struct_type_columns: struct_type_columns__.unwrap_or_default(), + dependency_indices: dependency_indices__.unwrap_or_default(), + schema: schema__, + options: options__, + }) + } + } + deserializer.deserialize_struct("datafusion.UnnestNode", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for UnnestOptions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.preserve_nulls { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.UnnestOptions", len)?; + if self.preserve_nulls { + struct_ser.serialize_field("preserveNulls", &self.preserve_nulls)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for UnnestOptions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "preserve_nulls", + "preserveNulls", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + PreserveNulls, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "preserveNulls" | "preserve_nulls" => Ok(GeneratedField::PreserveNulls), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = UnnestOptions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.UnnestOptions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut preserve_nulls__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::PreserveNulls => { + if preserve_nulls__.is_some() { + return Err(serde::de::Error::duplicate_field("preserveNulls")); + } + preserve_nulls__ = Some(map_.next_value()?); + } + } + } + Ok(UnnestOptions { + preserve_nulls: preserve_nulls__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.UnnestOptions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for ValuesNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index e9407cc65bb1..0354ead9e777 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -6,7 +6,7 @@ pub struct LogicalPlanNode { #[prost( oneof = "logical_plan_node::LogicalPlanType", - tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29" + tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30" )] pub logical_plan_type: ::core::option::Option, } @@ -71,6 +71,8 @@ pub mod logical_plan_node { DistinctOn(::prost::alloc::boxed::Box), #[prost(message, tag = "29")] CopyTo(::prost::alloc::boxed::Box), + #[prost(message, tag = "30")] + Unnest(::prost::alloc::boxed::Box), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -433,6 +435,30 @@ pub mod copy_to_node { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct UnnestNode { + #[prost(message, optional, boxed, tag = "1")] + pub input: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(message, repeated, tag = "2")] + pub exec_columns: ::prost::alloc::vec::Vec, + #[prost(uint64, repeated, tag = "3")] + pub list_type_columns: ::prost::alloc::vec::Vec, + #[prost(uint64, repeated, tag = "4")] + pub struct_type_columns: ::prost::alloc::vec::Vec, + #[prost(uint64, repeated, tag = "5")] + pub dependency_indices: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "6")] + pub schema: ::core::option::Option, + #[prost(message, optional, tag = "7")] + pub options: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct UnnestOptions { + #[prost(bool, tag = "1")] + pub preserve_nulls: bool, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct UnionNode { #[prost(message, repeated, tag = "1")] pub inputs: ::prost::alloc::vec::Vec, diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 905c6654cfe9..e2a2f875ea0c 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use datafusion::execution::registry::FunctionRegistry; use datafusion_common::{ internal_err, plan_datafusion_err, DataFusionError, Result, ScalarValue, - TableReference, + TableReference, UnnestOptions, }; use datafusion_expr::expr::Unnest; use datafusion_expr::expr::{Alias, Placeholder}; @@ -50,6 +50,14 @@ use crate::protobuf::{ use super::LogicalExtensionCodec; +impl From<&protobuf::UnnestOptions> for UnnestOptions { + fn from(opts: &protobuf::UnnestOptions) -> Self { + Self { + preserve_nulls: opts.preserve_nulls, + } + } +} + impl From for WindowFrameUnits { fn from(units: protobuf::WindowFrameUnits) -> Self { match units { diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 939bda9f247b..ef37150a35db 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan; use crate::protobuf::{CustomTableScanNode, LogicalExprNodeCollection}; use crate::{ - convert_required, + convert_required, into_required, protobuf::{ self, listing_table_scan_node::FileFormatType, logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode, @@ -47,6 +47,7 @@ use datafusion_common::{ context, internal_datafusion_err, internal_err, not_impl_err, DataFusionError, Result, TableReference, }; +use datafusion_expr::Unnest; use datafusion_expr::{ dml, logical_plan::{ @@ -838,6 +839,31 @@ impl AsLogicalPlan for LogicalPlanNode { }, )) } + LogicalPlanType::Unnest(unnest) => { + let input: LogicalPlan = + into_logical_plan!(unnest.input, ctx, extension_codec)?; + Ok(datafusion_expr::LogicalPlan::Unnest(Unnest { + input: Arc::new(input), + exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(), + list_type_columns: unnest + .list_type_columns + .iter() + .map(|c| *c as usize) + .collect(), + struct_type_columns: unnest + .struct_type_columns + .iter() + .map(|c| *c as usize) + .collect(), + dependency_indices: unnest + .dependency_indices + .iter() + .map(|c| *c as usize) + .collect(), + schema: Arc::new(convert_required!(unnest.schema)?), + options: into_required!(unnest.options)?, + })) + } } } @@ -1510,9 +1536,42 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } - LogicalPlan::Unnest(_) => Err(proto_error( - "LogicalPlan serde is not yet implemented for Unnest", - )), + LogicalPlan::Unnest(Unnest { + input, + exec_columns, + list_type_columns, + struct_type_columns, + dependency_indices, + schema, + options, + }) => { + let input = protobuf::LogicalPlanNode::try_from_logical_plan( + input, + extension_codec, + )?; + Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::Unnest(Box::new( + protobuf::UnnestNode { + input: Some(Box::new(input)), + exec_columns: exec_columns.iter().map(|c| c.into()).collect(), + list_type_columns: list_type_columns + .iter() + .map(|c| *c as u64) + .collect(), + struct_type_columns: struct_type_columns + .iter() + .map(|c| *c as u64) + .collect(), + dependency_indices: dependency_indices + .iter() + .map(|c| *c as u64) + .collect(), + schema: Some(schema.try_into()?), + options: Some(options.into()), + }, + ))), + }) + } LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error( "LogicalPlan serde is not yet implemented for CreateMemoryTable", )), diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index b0059aff615b..d2783305f638 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -19,7 +19,7 @@ //! DataFusion logical plans to be serialized and transmitted between //! processes. -use datafusion_common::TableReference; +use datafusion_common::{TableReference, UnnestOptions}; use datafusion_expr::expr::{ self, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Cast, GroupingSet, InList, Like, Placeholder, ScalarFunction, Sort, Unnest, @@ -45,6 +45,14 @@ use crate::protobuf::{ use super::LogicalExtensionCodec; +impl From<&UnnestOptions> for protobuf::UnnestOptions { + fn from(opts: &UnnestOptions) -> Self { + Self { + preserve_nulls: opts.preserve_nulls, + } + } +} + impl From<&StringifiedPlan> for protobuf::StringifiedPlan { fn from(stringified_plan: &StringifiedPlan) -> Self { Self { diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index d2721dfafd14..b756d4688dc0 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -522,6 +522,31 @@ async fn roundtrip_logical_plan_with_extension() -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_logical_plan_unnest() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new( + "b", + DataType::List(Arc::new(Field::new("item", DataType::Int32, false))), + true, + ), + ]); + ctx.register_csv( + "t1", + "tests/testdata/test.csv", + CsvReadOptions::default().schema(&schema), + ) + .await?; + let query = "SELECT unnest(b) FROM t1"; + let plan = ctx.sql(query).await?.into_optimized_plan()?; + let bytes = logical_plan_to_bytes(&plan)?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; + assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); + Ok(()) +} + #[tokio::test] async fn roundtrip_expr_api() -> Result<()> { let ctx = SessionContext::new();