From 4fe0419ae20eff72f114e6e509a97e196006cd85 Mon Sep 17 00:00:00 2001 From: franz Date: Sat, 9 Mar 2024 02:51:44 +0100 Subject: [PATCH 1/5] add more non gil blocking code --- python/deltalake/table.py | 6 +- python/deltalake/writer.py | 21 +- python/docs/source/_ext/edit_on_github.py | 6 +- python/src/lib.rs | 733 ++++++++++++---------- 4 files changed, 406 insertions(+), 360 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 064ee3a83c..e6cf9578dc 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1299,9 +1299,9 @@ def __init__( self.not_matched_insert_updates: Optional[List[Dict[str, str]]] = None self.not_matched_insert_predicate: Optional[List[Optional[str]]] = None self.not_matched_by_source_update_updates: Optional[List[Dict[str, str]]] = None - self.not_matched_by_source_update_predicate: Optional[List[Optional[str]]] = ( - None - ) + self.not_matched_by_source_update_predicate: Optional[ + List[Optional[str]] + ] = None self.not_matched_by_source_delete_predicate: Optional[List[str]] = None self.not_matched_by_source_delete_all: Optional[bool] = None diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 4a61254a23..afbb7fc70f 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -109,7 +109,8 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["pyarrow"] = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: ... +) -> None: + ... @overload @@ -137,7 +138,8 @@ def write_deltalake( engine: Literal["rust"], writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: ... +) -> None: + ... @overload @@ -166,7 +168,8 @@ def write_deltalake( engine: Literal["rust"], writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: ... +) -> None: + ... def write_deltalake( @@ -459,12 +462,12 @@ def check_data_is_aligned_with_partition_filtering( ) -> None: if table is None: return - existed_partitions: FrozenSet[FrozenSet[Tuple[str, Optional[str]]]] = ( - table._table.get_active_partitions() - ) - allowed_partitions: FrozenSet[FrozenSet[Tuple[str, Optional[str]]]] = ( - table._table.get_active_partitions(partition_filters) - ) + existed_partitions: FrozenSet[ + FrozenSet[Tuple[str, Optional[str]]] + ] = table._table.get_active_partitions() + allowed_partitions: FrozenSet[ + FrozenSet[Tuple[str, Optional[str]]] + ] = table._table.get_active_partitions(partition_filters) partition_values = pa.RecordBatch.from_arrays( [ batch.column(column_name) diff --git a/python/docs/source/_ext/edit_on_github.py b/python/docs/source/_ext/edit_on_github.py index 241560877c..f7188f189a 100644 --- a/python/docs/source/_ext/edit_on_github.py +++ b/python/docs/source/_ext/edit_on_github.py @@ -38,9 +38,9 @@ def html_page_context(app, pagename, templatename, context, doctree): context["display_github"] = True context["github_user"] = app.config.edit_on_github_project.split("/")[0] context["github_repo"] = app.config.edit_on_github_project.split("/")[1] - context["github_version"] = ( - f"{app.config.edit_on_github_branch}/{app.config.page_source_prefix}/" - ) + context[ + "github_version" + ] = f"{app.config.edit_on_github_branch}/{app.config.page_source_prefix}/" def setup(app): diff --git a/python/src/lib.rs b/python/src/lib.rs index 0d98d6906d..0a0a4f3839 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -282,75 +282,81 @@ impl RawDeltaTable { #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, custom_metadata=None))] pub fn vacuum( &mut self, + py: Python, dry_run: bool, retention_hours: Option, enforce_retention_duration: bool, custom_metadata: Option>, ) -> PyResult> { - let mut cmd = VacuumBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_enforce_retention_duration(enforce_retention_duration) - .with_dry_run(dry_run); - if let Some(retention_period) = retention_hours { - cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); - } + py.allow_threads(|| { + let mut cmd = VacuumBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_enforce_retention_duration(enforce_retention_duration) + .with_dry_run(dry_run); + if let Some(retention_period) = retention_hours { + cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; - let (table, metrics) = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(metrics.files_deleted) + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(metrics.files_deleted) + }) } /// Run the UPDATE command on the Delta Table #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, custom_metadata = None))] pub fn update( &mut self, + py: Python, updates: HashMap, predicate: Option, writer_properties: Option>>, safe_cast: bool, custom_metadata: Option>, ) -> PyResult { - let mut cmd = UpdateBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_safe_cast(safe_cast); + py.allow_threads(|| { + let mut cmd = UpdateBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_safe_cast(safe_cast); - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - for (col_name, expression) in updates { - cmd = cmd.with_update(col_name.clone(), expression.clone()); - } + for (col_name, expression) in updates { + cmd = cmd.with_update(col_name.clone(), expression.clone()); + } - if let Some(update_predicate) = predicate { - cmd = cmd.with_predicate(update_predicate); - } + if let Some(update_predicate) = predicate { + cmd = cmd.with_predicate(update_predicate); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; - let (table, metrics) = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(serde_json::to_string(&metrics).unwrap()) + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + }) } /// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing. @@ -364,6 +370,7 @@ impl RawDeltaTable { ))] pub fn compact_optimize( &mut self, + py: Python, partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, @@ -371,39 +378,42 @@ impl RawDeltaTable { writer_properties: Option>>, custom_metadata: Option>, ) -> PyResult { - let mut cmd = OptimizeBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); - if let Some(size) = target_size { - cmd = cmd.with_target_size(size); - } - if let Some(commit_interval) = min_commit_interval { - cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); - } + py.allow_threads(|| { + let mut cmd = OptimizeBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; - let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) - .map_err(PythonError::from)?; - cmd = cmd.with_filters(&converted_filters); + let converted_filters = + convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(serde_json::to_string(&metrics).unwrap()) + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + }) } /// Run z-order variation of optimize @@ -418,6 +428,7 @@ impl RawDeltaTable { custom_metadata=None,))] pub fn z_order_optimize( &mut self, + py: Python, z_order_columns: Vec, partition_filters: Option>, target_size: Option, @@ -427,96 +438,105 @@ impl RawDeltaTable { writer_properties: Option>>, custom_metadata: Option>, ) -> PyResult { - let mut cmd = OptimizeBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) - .with_max_spill_size(max_spill_size) - .with_type(OptimizeType::ZOrder(z_order_columns)); - if let Some(size) = target_size { - cmd = cmd.with_target_size(size); - } - if let Some(commit_interval) = min_commit_interval { - cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); - } + py.allow_threads(|| { + let mut cmd = OptimizeBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) + .with_max_spill_size(max_spill_size) + .with_type(OptimizeType::ZOrder(z_order_columns)); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; - let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) - .map_err(PythonError::from)?; - cmd = cmd.with_filters(&converted_filters); + let converted_filters = + convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(serde_json::to_string(&metrics).unwrap()) + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + }) } #[pyo3(signature = (constraints, custom_metadata=None))] pub fn add_constraints( &mut self, + py: Python, constraints: HashMap, custom_metadata: Option>, ) -> PyResult<()> { - let mut cmd = ConstraintBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ); + py.allow_threads(|| { + let mut cmd = ConstraintBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ); - for (col_name, expression) in constraints { - cmd = cmd.with_constraint(col_name.clone(), expression.clone()); - } + for (col_name, expression) in constraints { + cmd = cmd.with_constraint(col_name.clone(), expression.clone()); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; - let table = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(()) + let table = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(()) + }) } #[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None))] pub fn drop_constraints( &mut self, + py: Python, name: String, raise_if_not_exists: bool, custom_metadata: Option>, ) -> PyResult<()> { - let mut cmd = DropConstraintBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_constraint(name) - .with_raise_if_not_exists(raise_if_not_exists); + py.allow_threads(|| { + let mut cmd = DropConstraintBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_constraint(name) + .with_raise_if_not_exists(raise_if_not_exists); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; - let table = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(()) + let table = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(()) + }) } #[allow(clippy::too_many_arguments)] @@ -766,11 +786,13 @@ impl RawDeltaTable { .collect()) } - pub fn update_incremental(&mut self) -> PyResult<()> { + pub fn update_incremental(&mut self, py: Python) -> PyResult<()> { #[allow(deprecated)] - Ok(rt()? - .block_on(self._table.update_incremental(None)) - .map_err(PythonError::from)?) + py.allow_threads(|| { + Ok(rt()? + .block_on(self._table.update_incremental(None)) + .map_err(PythonError::from)?) + }) } pub fn dataset_partitions<'py>( @@ -893,6 +915,7 @@ impl RawDeltaTable { fn create_write_transaction( &mut self, + py: Python, add_actions: Vec, mode: &str, partition_by: Vec, @@ -900,102 +923,105 @@ impl RawDeltaTable { partitions_filters: Option>, custom_metadata: Option>, ) -> PyResult<()> { - let mode = mode.parse().map_err(PythonError::from)?; + py.allow_threads(|| { + let mode = mode.parse().map_err(PythonError::from)?; - let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - let existing_schema = self._table.get_schema().map_err(PythonError::from)?; + let existing_schema = self._table.get_schema().map_err(PythonError::from)?; - let mut actions: Vec = add_actions - .iter() - .map(|add| Action::Add(add.into())) - .collect(); + let mut actions: Vec = add_actions + .iter() + .map(|add| Action::Add(add.into())) + .collect(); - match mode { - SaveMode::Overwrite => { - let converted_filters = - convert_partition_filters(partitions_filters.unwrap_or_default()) + match mode { + SaveMode::Overwrite => { + let converted_filters = + convert_partition_filters(partitions_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + + let add_actions = self + ._table + .snapshot() + .map_err(PythonError::from)? + .get_active_add_actions_by_partitions(&converted_filters) .map_err(PythonError::from)?; - let add_actions = self - ._table - .snapshot() - .map_err(PythonError::from)? - .get_active_add_actions_by_partitions(&converted_filters) - .map_err(PythonError::from)?; - - for old_add in add_actions { - let old_add = old_add.map_err(PythonError::from)?; - let remove_action = Action::Remove(Remove { - path: old_add.path().to_string(), - deletion_timestamp: Some(current_timestamp()), - data_change: true, - extended_file_metadata: Some(true), - partition_values: Some( - old_add - .partition_values() - .map_err(PythonError::from)? - .iter() - .map(|(k, v)| { - ( - k.to_string(), - if v.is_null() { - None - } else { - Some(v.serialize()) - }, - ) - }) - .collect(), - ), - size: Some(old_add.size()), - deletion_vector: None, - tags: None, - base_row_id: None, - default_row_commit_version: None, - }); - actions.push(remove_action); - } + for old_add in add_actions { + let old_add = old_add.map_err(PythonError::from)?; + let remove_action = Action::Remove(Remove { + path: old_add.path().to_string(), + deletion_timestamp: Some(current_timestamp()), + data_change: true, + extended_file_metadata: Some(true), + partition_values: Some( + old_add + .partition_values() + .map_err(PythonError::from)? + .iter() + .map(|(k, v)| { + ( + k.to_string(), + if v.is_null() { + None + } else { + Some(v.serialize()) + }, + ) + }) + .collect(), + ), + size: Some(old_add.size()), + deletion_vector: None, + tags: None, + base_row_id: None, + default_row_commit_version: None, + }); + actions.push(remove_action); + } - // Update metadata with new schema - if &schema != existing_schema { - let mut metadata = self._table.metadata().map_err(PythonError::from)?.clone(); - metadata.schema_string = serde_json::to_string(&schema) - .map_err(DeltaTableError::from) - .map_err(PythonError::from)?; - actions.push(Action::Metadata(metadata)); + // Update metadata with new schema + if &schema != existing_schema { + let mut metadata = + self._table.metadata().map_err(PythonError::from)?.clone(); + metadata.schema_string = serde_json::to_string(&schema) + .map_err(DeltaTableError::from) + .map_err(PythonError::from)?; + actions.push(Action::Metadata(metadata)); + } } - } - _ => { - // This should be unreachable from Python - if &schema != existing_schema { - DeltaProtocolError::new_err("Cannot change schema except in overwrite."); + _ => { + // This should be unreachable from Python + if &schema != existing_schema { + DeltaProtocolError::new_err("Cannot change schema except in overwrite."); + } } } - } - let operation = DeltaOperation::Write { - mode, - partition_by: Some(partition_by), - predicate: None, - }; + let operation = DeltaOperation::Write { + mode, + partition_by: Some(partition_by), + predicate: None, + }; - let app_metadata = - custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect()); + let app_metadata = + custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect()); - let store = self._table.log_store(); + let store = self._table.log_store(); - rt()? - .block_on(commit( - &*store, - &actions, - operation, - Some(self._table.snapshot().map_err(PythonError::from)?), - app_metadata, - )) - .map_err(PythonError::from)?; + rt()? + .block_on(commit( + &*store, + &actions, + operation, + Some(self._table.snapshot().map_err(PythonError::from)?), + app_metadata, + )) + .map_err(PythonError::from)?; - Ok(()) + Ok(()) + }) } pub fn get_py_storage_backend(&self) -> PyResult { @@ -1007,20 +1033,24 @@ impl RawDeltaTable { }) } - pub fn create_checkpoint(&self) -> PyResult<()> { - rt()? - .block_on(create_checkpoint(&self._table)) - .map_err(PythonError::from)?; + pub fn create_checkpoint(&self, py: Python) -> PyResult<()> { + py.allow_threads(|| { + rt()? + .block_on(create_checkpoint(&self._table)) + .map_err(PythonError::from)?; - Ok(()) + Ok(()) + }) } - pub fn cleanup_metadata(&self) -> PyResult<()> { - rt()? - .block_on(cleanup_metadata(&self._table)) - .map_err(PythonError::from)?; + pub fn cleanup_metadata(&self, py: Python) -> PyResult<()> { + py.allow_threads(|| { + rt()? + .block_on(cleanup_metadata(&self._table)) + .map_err(PythonError::from)?; - Ok(()) + Ok(()) + }) } pub fn get_add_actions(&self, flatten: bool) -> PyResult> { @@ -1037,35 +1067,38 @@ impl RawDeltaTable { #[pyo3(signature = (predicate = None, writer_properties=None, custom_metadata=None))] pub fn delete( &mut self, + py: Python, predicate: Option, writer_properties: Option>>, custom_metadata: Option>, ) -> PyResult { - let mut cmd = DeleteBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ); - if let Some(predicate) = predicate { - cmd = cmd.with_predicate(predicate); - } - - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, + py.allow_threads(|| { + let mut cmd = DeleteBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), ); - } + if let Some(predicate) = predicate { + cmd = cmd.with_predicate(predicate); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - let (table, metrics) = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(serde_json::to_string(&metrics).unwrap()) + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; + + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + }) } /// Execute the File System Check command (FSCK) on the delta table: removes old reference to files that @@ -1463,6 +1496,7 @@ fn write_to_deltalake( #[pyfunction] #[allow(clippy::too_many_arguments)] fn create_deltalake( + py: Python, table_uri: String, schema: PyArrowType, partition_by: Vec, @@ -1473,48 +1507,51 @@ fn create_deltalake( storage_options: Option>, custom_metadata: Option>, ) -> PyResult<()> { - let table = DeltaTableBuilder::from_uri(table_uri) - .with_storage_options(storage_options.unwrap_or_default()) - .build() - .map_err(PythonError::from)?; - - let mode = mode.parse().map_err(PythonError::from)?; - let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - - let mut builder = DeltaOps(table) - .create() - .with_columns(schema.fields().clone()) - .with_save_mode(mode) - .with_partition_columns(partition_by); - - if let Some(name) = &name { - builder = builder.with_table_name(name); - }; + py.allow_threads(|| { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options.unwrap_or_default()) + .build() + .map_err(PythonError::from)?; - if let Some(description) = &description { - builder = builder.with_comment(description); - }; + let mode = mode.parse().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - if let Some(config) = configuration { - builder = builder.with_configuration(config); - }; + let mut builder = DeltaOps(table) + .create() + .with_columns(schema.fields().clone()) + .with_save_mode(mode) + .with_partition_columns(partition_by); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - builder = builder.with_metadata(json_metadata); - }; + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; - rt()? - .block_on(builder.into_future()) - .map_err(PythonError::from)?; + if let Some(description) = &description { + builder = builder.with_comment(description); + }; - Ok(()) + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; + + rt()? + .block_on(builder.into_future()) + .map_err(PythonError::from)?; + + Ok(()) + }) } #[pyfunction] #[allow(clippy::too_many_arguments)] fn write_new_deltalake( + py: Python, table_uri: String, schema: PyArrowType, add_actions: Vec, @@ -1526,47 +1563,50 @@ fn write_new_deltalake( storage_options: Option>, custom_metadata: Option>, ) -> PyResult<()> { - let table = DeltaTableBuilder::from_uri(table_uri) - .with_storage_options(storage_options.unwrap_or_default()) - .build() - .map_err(PythonError::from)?; + py.allow_threads(|| { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options.unwrap_or_default()) + .build() + .map_err(PythonError::from)?; - let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - let mut builder = DeltaOps(table) - .create() - .with_columns(schema.fields().clone()) - .with_partition_columns(partition_by) - .with_actions(add_actions.iter().map(|add| Action::Add(add.into()))); + let mut builder = DeltaOps(table) + .create() + .with_columns(schema.fields().clone()) + .with_partition_columns(partition_by) + .with_actions(add_actions.iter().map(|add| Action::Add(add.into()))); - if let Some(name) = &name { - builder = builder.with_table_name(name); - }; + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; - if let Some(description) = &description { - builder = builder.with_comment(description); - }; + if let Some(description) = &description { + builder = builder.with_comment(description); + }; - if let Some(config) = configuration { - builder = builder.with_configuration(config); - }; + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - builder = builder.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; - rt()? - .block_on(builder.into_future()) - .map_err(PythonError::from)?; + rt()? + .block_on(builder.into_future()) + .map_err(PythonError::from)?; - Ok(()) + Ok(()) + }) } #[pyfunction] #[allow(clippy::too_many_arguments)] fn convert_to_deltalake( + py: Python, uri: String, partition_schema: Option>, partition_strategy: Option, @@ -1576,44 +1616,47 @@ fn convert_to_deltalake( storage_options: Option>, custom_metadata: Option>, ) -> PyResult<()> { - let mut builder = ConvertToDeltaBuilder::new().with_location(uri); + py.allow_threads(|| { + let mut builder = ConvertToDeltaBuilder::new().with_location(uri); - if let Some(part_schema) = partition_schema { - let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?; - builder = builder.with_partition_schema(schema.fields().clone()); - } + if let Some(part_schema) = partition_schema { + let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?; + builder = builder.with_partition_schema(schema.fields().clone()); + } - if let Some(partition_strategy) = &partition_strategy { - let strategy: PartitionStrategy = partition_strategy.parse().map_err(PythonError::from)?; - builder = builder.with_partition_strategy(strategy); - } + if let Some(partition_strategy) = &partition_strategy { + let strategy: PartitionStrategy = + partition_strategy.parse().map_err(PythonError::from)?; + builder = builder.with_partition_strategy(strategy); + } - if let Some(name) = &name { - builder = builder.with_table_name(name); - } + if let Some(name) = &name { + builder = builder.with_table_name(name); + } - if let Some(description) = &description { - builder = builder.with_comment(description); - } + if let Some(description) = &description { + builder = builder.with_comment(description); + } - if let Some(config) = configuration { - builder = builder.with_configuration(config); - }; + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; - if let Some(strg_options) = storage_options { - builder = builder.with_storage_options(strg_options); - }; + if let Some(strg_options) = storage_options { + builder = builder.with_storage_options(strg_options); + }; - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - builder = builder.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; - rt()? - .block_on(builder.into_future()) - .map_err(PythonError::from)?; - Ok(()) + rt()? + .block_on(builder.into_future()) + .map_err(PythonError::from)?; + Ok(()) + }) } #[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")] From 409d28dc8738f3c5133cca8c1db9e55f3517063b Mon Sep 17 00:00:00 2001 From: franz Date: Sat, 9 Mar 2024 02:58:48 +0100 Subject: [PATCH 2/5] formatting --- python/deltalake/table.py | 10 +++++----- python/deltalake/writer.py | 23 ++++++++++------------- python/docs/source/_ext/edit_on_github.py | 6 +++--- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index e6cf9578dc..a11a60ee09 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -162,7 +162,7 @@ def __init__( if compression_level is not None and compression is None: raise ValueError( - """Providing a compression level without the compression type is not possible, + """Providing a compression level without the compression type is not possible, please provide the compression as well.""" ) if isinstance(compression, str): @@ -1299,9 +1299,9 @@ def __init__( self.not_matched_insert_updates: Optional[List[Dict[str, str]]] = None self.not_matched_insert_predicate: Optional[List[Optional[str]]] = None self.not_matched_by_source_update_updates: Optional[List[Dict[str, str]]] = None - self.not_matched_by_source_update_predicate: Optional[ - List[Optional[str]] - ] = None + self.not_matched_by_source_update_predicate: Optional[List[Optional[str]]] = ( + None + ) self.not_matched_by_source_delete_predicate: Optional[List[str]] = None self.not_matched_by_source_delete_all: Optional[bool] = None @@ -1819,7 +1819,7 @@ def add_constraint( """ if len(constraints.keys()) > 1: raise ValueError( - """add_constraints is limited to a single constraint addition at once for now. + """add_constraints is limited to a single constraint addition at once for now. Please execute add_constraints multiple times with each time a different constraint.""" ) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index afbb7fc70f..6f844e3abb 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -109,8 +109,7 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["pyarrow"] = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: - ... +) -> None: ... @overload @@ -138,8 +137,7 @@ def write_deltalake( engine: Literal["rust"], writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: - ... +) -> None: ... @overload @@ -168,8 +166,7 @@ def write_deltalake( engine: Literal["rust"], writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: - ... +) -> None: ... def write_deltalake( @@ -439,7 +436,7 @@ def visitor(written_file: Any) -> None: raise DeltaProtocolError( "This table's min_writer_version is " f"{table_protocol.min_writer_version}, " - f"""but this method only supports version 2 or 7 with at max these features {SUPPORTED_WRITER_FEATURES} enabled. + f"""but this method only supports version 2 or 7 with at max these features {SUPPORTED_WRITER_FEATURES} enabled. Try engine='rust' instead which supports more features and writer versions.""" ) if ( @@ -462,12 +459,12 @@ def check_data_is_aligned_with_partition_filtering( ) -> None: if table is None: return - existed_partitions: FrozenSet[ - FrozenSet[Tuple[str, Optional[str]]] - ] = table._table.get_active_partitions() - allowed_partitions: FrozenSet[ - FrozenSet[Tuple[str, Optional[str]]] - ] = table._table.get_active_partitions(partition_filters) + existed_partitions: FrozenSet[FrozenSet[Tuple[str, Optional[str]]]] = ( + table._table.get_active_partitions() + ) + allowed_partitions: FrozenSet[FrozenSet[Tuple[str, Optional[str]]]] = ( + table._table.get_active_partitions(partition_filters) + ) partition_values = pa.RecordBatch.from_arrays( [ batch.column(column_name) diff --git a/python/docs/source/_ext/edit_on_github.py b/python/docs/source/_ext/edit_on_github.py index f7188f189a..a8bdc34268 100644 --- a/python/docs/source/_ext/edit_on_github.py +++ b/python/docs/source/_ext/edit_on_github.py @@ -38,9 +38,9 @@ def html_page_context(app, pagename, templatename, context, doctree): context["display_github"] = True context["github_user"] = app.config.edit_on_github_project.split("/")[0] context["github_repo"] = app.config.edit_on_github_project.split("/")[1] - context[ - "github_version" - ] = f"{app.config.edit_on_github_branch}/{app.config.page_source_prefix}/" + context["github_version"] = ( + f"{app.config.edit_on_github_branch}/{app.config.page_source_prefix}/" + ) def setup(app): From a1b919538adb80368d9269a4911a4f6125f3e1d3 Mon Sep 17 00:00:00 2001 From: franz Date: Sat, 9 Mar 2024 23:56:39 +0100 Subject: [PATCH 3/5] fix formatting --- python/deltalake/table.py | 6 +++--- python/deltalake/writer.py | 21 ++++++++++++--------- python/docs/source/_ext/edit_on_github.py | 6 +++--- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index a11a60ee09..1c14692229 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1299,9 +1299,9 @@ def __init__( self.not_matched_insert_updates: Optional[List[Dict[str, str]]] = None self.not_matched_insert_predicate: Optional[List[Optional[str]]] = None self.not_matched_by_source_update_updates: Optional[List[Dict[str, str]]] = None - self.not_matched_by_source_update_predicate: Optional[List[Optional[str]]] = ( - None - ) + self.not_matched_by_source_update_predicate: Optional[ + List[Optional[str]] + ] = None self.not_matched_by_source_delete_predicate: Optional[List[str]] = None self.not_matched_by_source_delete_all: Optional[bool] = None diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 6f844e3abb..9eea6f1dfb 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -109,7 +109,8 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["pyarrow"] = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: ... +) -> None: + ... @overload @@ -137,7 +138,8 @@ def write_deltalake( engine: Literal["rust"], writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: ... +) -> None: + ... @overload @@ -166,7 +168,8 @@ def write_deltalake( engine: Literal["rust"], writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., -) -> None: ... +) -> None: + ... def write_deltalake( @@ -459,12 +462,12 @@ def check_data_is_aligned_with_partition_filtering( ) -> None: if table is None: return - existed_partitions: FrozenSet[FrozenSet[Tuple[str, Optional[str]]]] = ( - table._table.get_active_partitions() - ) - allowed_partitions: FrozenSet[FrozenSet[Tuple[str, Optional[str]]]] = ( - table._table.get_active_partitions(partition_filters) - ) + existed_partitions: FrozenSet[ + FrozenSet[Tuple[str, Optional[str]]] + ] = table._table.get_active_partitions() + allowed_partitions: FrozenSet[ + FrozenSet[Tuple[str, Optional[str]]] + ] = table._table.get_active_partitions(partition_filters) partition_values = pa.RecordBatch.from_arrays( [ batch.column(column_name) diff --git a/python/docs/source/_ext/edit_on_github.py b/python/docs/source/_ext/edit_on_github.py index a8bdc34268..f7188f189a 100644 --- a/python/docs/source/_ext/edit_on_github.py +++ b/python/docs/source/_ext/edit_on_github.py @@ -38,9 +38,9 @@ def html_page_context(app, pagename, templatename, context, doctree): context["display_github"] = True context["github_user"] = app.config.edit_on_github_project.split("/")[0] context["github_repo"] = app.config.edit_on_github_project.split("/")[1] - context["github_version"] = ( - f"{app.config.edit_on_github_branch}/{app.config.page_source_prefix}/" - ) + context[ + "github_version" + ] = f"{app.config.edit_on_github_branch}/{app.config.page_source_prefix}/" def setup(app): From 70a8e9c59a950a4510b6daf6794fc6cf981672f5 Mon Sep 17 00:00:00 2001 From: franz Date: Mon, 11 Mar 2024 15:36:34 +0100 Subject: [PATCH 4/5] fix mut py obj --- python/src/lib.rs | 228 ++++++++++++++++++++++------------------------ 1 file changed, 110 insertions(+), 118 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index 0a0a4f3839..62e83f5cff 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -370,7 +370,6 @@ impl RawDeltaTable { ))] pub fn compact_optimize( &mut self, - py: Python, partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, @@ -378,42 +377,39 @@ impl RawDeltaTable { writer_properties: Option>>, custom_metadata: Option>, ) -> PyResult { - py.allow_threads(|| { - let mut cmd = OptimizeBuilder::new( - self._table.log_store(), - self._table.snapshot().map_err(PythonError::from)?.clone(), - ) - .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); - if let Some(size) = target_size { - cmd = cmd.with_target_size(size); - } - if let Some(commit_interval) = min_commit_interval { - cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); - } + let mut cmd = OptimizeBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; - let converted_filters = - convert_partition_filters(partition_filters.unwrap_or_default()) - .map_err(PythonError::from)?; - cmd = cmd.with_filters(&converted_filters); + let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(serde_json::to_string(&metrics).unwrap()) - }) + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) } /// Run z-order variation of optimize @@ -915,7 +911,6 @@ impl RawDeltaTable { fn create_write_transaction( &mut self, - py: Python, add_actions: Vec, mode: &str, partition_by: Vec, @@ -923,105 +918,102 @@ impl RawDeltaTable { partitions_filters: Option>, custom_metadata: Option>, ) -> PyResult<()> { - py.allow_threads(|| { - let mode = mode.parse().map_err(PythonError::from)?; + let mode = mode.parse().map_err(PythonError::from)?; - let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; - let existing_schema = self._table.get_schema().map_err(PythonError::from)?; + let existing_schema = self._table.get_schema().map_err(PythonError::from)?; - let mut actions: Vec = add_actions - .iter() - .map(|add| Action::Add(add.into())) - .collect(); + let mut actions: Vec = add_actions + .iter() + .map(|add| Action::Add(add.into())) + .collect(); - match mode { - SaveMode::Overwrite => { - let converted_filters = - convert_partition_filters(partitions_filters.unwrap_or_default()) - .map_err(PythonError::from)?; - - let add_actions = self - ._table - .snapshot() - .map_err(PythonError::from)? - .get_active_add_actions_by_partitions(&converted_filters) + match mode { + SaveMode::Overwrite => { + let converted_filters = + convert_partition_filters(partitions_filters.unwrap_or_default()) .map_err(PythonError::from)?; - for old_add in add_actions { - let old_add = old_add.map_err(PythonError::from)?; - let remove_action = Action::Remove(Remove { - path: old_add.path().to_string(), - deletion_timestamp: Some(current_timestamp()), - data_change: true, - extended_file_metadata: Some(true), - partition_values: Some( - old_add - .partition_values() - .map_err(PythonError::from)? - .iter() - .map(|(k, v)| { - ( - k.to_string(), - if v.is_null() { - None - } else { - Some(v.serialize()) - }, - ) - }) - .collect(), - ), - size: Some(old_add.size()), - deletion_vector: None, - tags: None, - base_row_id: None, - default_row_commit_version: None, - }); - actions.push(remove_action); - } + let add_actions = self + ._table + .snapshot() + .map_err(PythonError::from)? + .get_active_add_actions_by_partitions(&converted_filters) + .map_err(PythonError::from)?; - // Update metadata with new schema - if &schema != existing_schema { - let mut metadata = - self._table.metadata().map_err(PythonError::from)?.clone(); - metadata.schema_string = serde_json::to_string(&schema) - .map_err(DeltaTableError::from) - .map_err(PythonError::from)?; - actions.push(Action::Metadata(metadata)); - } + for old_add in add_actions { + let old_add = old_add.map_err(PythonError::from)?; + let remove_action = Action::Remove(Remove { + path: old_add.path().to_string(), + deletion_timestamp: Some(current_timestamp()), + data_change: true, + extended_file_metadata: Some(true), + partition_values: Some( + old_add + .partition_values() + .map_err(PythonError::from)? + .iter() + .map(|(k, v)| { + ( + k.to_string(), + if v.is_null() { + None + } else { + Some(v.serialize()) + }, + ) + }) + .collect(), + ), + size: Some(old_add.size()), + deletion_vector: None, + tags: None, + base_row_id: None, + default_row_commit_version: None, + }); + actions.push(remove_action); } - _ => { - // This should be unreachable from Python - if &schema != existing_schema { - DeltaProtocolError::new_err("Cannot change schema except in overwrite."); - } + + // Update metadata with new schema + if &schema != existing_schema { + let mut metadata = self._table.metadata().map_err(PythonError::from)?.clone(); + metadata.schema_string = serde_json::to_string(&schema) + .map_err(DeltaTableError::from) + .map_err(PythonError::from)?; + actions.push(Action::Metadata(metadata)); } } + _ => { + // This should be unreachable from Python + if &schema != existing_schema { + DeltaProtocolError::new_err("Cannot change schema except in overwrite."); + } + } + } - let operation = DeltaOperation::Write { - mode, - partition_by: Some(partition_by), - predicate: None, - }; + let operation = DeltaOperation::Write { + mode, + partition_by: Some(partition_by), + predicate: None, + }; - let app_metadata = - custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect()); + let app_metadata = + custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect()); - let store = self._table.log_store(); + let store = self._table.log_store(); - rt()? - .block_on(commit( - &*store, - &actions, - operation, - Some(self._table.snapshot().map_err(PythonError::from)?), - app_metadata, - )) - .map_err(PythonError::from)?; + rt()? + .block_on(commit( + &*store, + &actions, + operation, + Some(self._table.snapshot().map_err(PythonError::from)?), + app_metadata, + )) + .map_err(PythonError::from)?; - Ok(()) - }) + Ok(()) } pub fn get_py_storage_backend(&self) -> PyResult { From 97967cb2b1745cf4b62395f8d5914b53f2a6cb4e Mon Sep 17 00:00:00 2001 From: franz Date: Wed, 13 Mar 2024 09:51:56 +0100 Subject: [PATCH 5/5] don't modify self insinde the allow threads --- python/src/lib.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index 62e83f5cff..5322d6986f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -288,7 +288,7 @@ impl RawDeltaTable { enforce_retention_duration: bool, custom_metadata: Option>, ) -> PyResult> { - py.allow_threads(|| { + let cmd = py.allow_threads(|| { let mut cmd = VacuumBuilder::new( self._table.log_store(), self._table.snapshot().map_err(PythonError::from)?.clone(), @@ -305,12 +305,15 @@ impl RawDeltaTable { cmd = cmd.with_metadata(json_metadata); }; - let (table, metrics) = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(metrics.files_deleted) - }) + cmd + }); + + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + + self._table.state = table.state; + Ok(metrics.files_deleted) } /// Run the UPDATE command on the Delta Table @@ -1064,7 +1067,7 @@ impl RawDeltaTable { writer_properties: Option>>, custom_metadata: Option>, ) -> PyResult { - py.allow_threads(|| { + let (table_state, metrics) = py.allow_threads(|| { let mut cmd = DeleteBuilder::new( self._table.log_store(), self._table.snapshot().map_err(PythonError::from)?.clone(), @@ -1088,9 +1091,10 @@ impl RawDeltaTable { let (table, metrics) = rt()? .block_on(cmd.into_future()) .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(serde_json::to_string(&metrics).unwrap()) - }) + (table.state, metrics) + }); + self._table.state = table_state; + Ok(serde_json::to_string(&metrics).unwrap()) } /// Execute the File System Check command (FSCK) on the delta table: removes old reference to files that