From f9895a9910da63d12fd92dbaf769b0ed3d9cc6ef Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Wed, 1 May 2024 03:01:03 -0500 Subject: [PATCH] feat: upgrade dataframe write_parquet and write_json The options to write_parquet changed. write_json has a new argument that I defaulted to None. We can expose that config later. Ref: https://github.com/apache/datafusion/pull/9382 --- src/dataframe.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index a319b3d73..03cdf5cab 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; use datafusion::arrow::util::pretty; +use datafusion::config::{ParquetOptions, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::execution::SendableRecordBatchStream; use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}; -use datafusion::parquet::file::properties::WriterProperties; use datafusion::prelude::*; use datafusion_common::UnnestOptions; use pyo3::exceptions::{PyTypeError, PyValueError}; @@ -350,7 +350,7 @@ impl PyDataFrame { cl.ok_or(PyValueError::new_err("compression_level is not defined")) } - let compression_type = match compression.to_lowercase().as_str() { + let _validated = match compression.to_lowercase().as_str() { "snappy" => Compression::SNAPPY, "gzip" => Compression::GZIP( GzipLevel::try_new(compression_level.unwrap_or(6)) @@ -375,16 +375,20 @@ impl PyDataFrame { } }; - let writer_properties = WriterProperties::builder() - .set_compression(compression_type) - .build(); + let mut compression_string = compression.to_string(); + if let Some(level) = compression_level { + compression_string.push_str(&format!("({level})")); + } + + let mut options = TableParquetOptions::default(); + options.global.compression = Some(compression_string); wait_for_future( py, self.df.as_ref().clone().write_parquet( path, DataFrameWriteOptions::new(), - Option::from(writer_properties), + Option::from(options), ), )?; Ok(()) @@ -397,7 +401,7 @@ impl PyDataFrame { self.df .as_ref() .clone() - .write_json(path, DataFrameWriteOptions::new()), + .write_json(path, DataFrameWriteOptions::new(), None), )?; Ok(()) }