From 52328422aad608705e68e118e911362c9adf761d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 29 Oct 2024 13:18:12 +0000 Subject: [PATCH] feat(storage): pass changesets to unwind methods (#7879) --- .../stages/src/stages/hashing_account.rs | 2 +- .../stages/src/stages/hashing_storage.rs | 2 +- .../src/stages/index_account_history.rs | 2 +- .../src/stages/index_storage_history.rs | 2 +- .../src/providers/database/provider.rs | 149 ++++++++++++------ crates/storage/provider/src/traits/hashing.rs | 30 +++- crates/storage/provider/src/traits/history.rs | 27 +++- 7 files changed, 155 insertions(+), 59 deletions(-) diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index 14afb37d81db..5b4f720972f1 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -234,7 +234,7 @@ where input.unwind_block_range_with_threshold(self.commit_threshold); // Aggregate all transition changesets and make a list of accounts that have been changed. - provider.unwind_account_hashing(range)?; + provider.unwind_account_hashing_range(range)?; let mut stage_checkpoint = input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default(); diff --git a/crates/stages/stages/src/stages/hashing_storage.rs b/crates/stages/stages/src/stages/hashing_storage.rs index ef070d30c6d6..dcabbe83ee64 100644 --- a/crates/stages/stages/src/stages/hashing_storage.rs +++ b/crates/stages/stages/src/stages/hashing_storage.rs @@ -169,7 +169,7 @@ where let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - provider.unwind_storage_hashing(BlockNumberAddress::range(range))?; + provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?; let mut stage_checkpoint = input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default(); diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 8b10283fb4b7..38c238e5d988 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -134,7 +134,7 @@ where let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - provider.unwind_account_history_indices(range)?; + provider.unwind_account_history_indices_range(range)?; // from HistoryIndex higher than that number. Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index ac645b8dd754..ba61e6312302 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -140,7 +140,7 @@ where let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - provider.unwind_storage_history_indices(BlockNumberAddress::range(range))?; + provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?; Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index e59a4f5635f4..2dcc3f92d705 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -2639,19 +2639,17 @@ impl StorageTrieWriter for DatabaseProvid } impl HashingWriter for DatabaseProvider { - fn unwind_account_hashing( + fn unwind_account_hashing<'a>( &self, - range: RangeInclusive, + changesets: impl Iterator, ) -> ProviderResult>> { // Aggregate all block changesets and make a list of accounts that have been changed. // Note that collecting and then reversing the order is necessary to ensure that the // changes are applied in the correct order. - let hashed_accounts = self - .tx - .cursor_read::()? - .walk_range(range)? - .map(|entry| entry.map(|(_, e)| (keccak256(e.address), e.info))) - .collect::, _>>()? + let hashed_accounts = changesets + .into_iter() + .map(|(_, e)| (keccak256(e.address), e.info)) + .collect::>() .into_iter() .rev() .collect::>(); @@ -2669,13 +2667,25 @@ impl HashingWriter for DatabaseProvider, + ) -> ProviderResult>> { + let changesets = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()?; + self.unwind_account_hashing(changesets.iter()) + } + fn insert_account_for_hashing( &self, - accounts: impl IntoIterator)>, + changesets: impl IntoIterator)>, ) -> ProviderResult>> { let mut hashed_accounts_cursor = self.tx.cursor_write::()?; let hashed_accounts = - accounts.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::>(); + changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::>(); for (hashed_address, account) in &hashed_accounts { if let Some(account) = account { hashed_accounts_cursor.upsert(*hashed_address, *account)?; @@ -2688,18 +2698,15 @@ impl HashingWriter for DatabaseProvider, + changesets: impl Iterator, ) -> ProviderResult>> { // Aggregate all block changesets and make list of accounts that have been changed. - let mut changesets = self.tx.cursor_read::()?; let mut hashed_storages = changesets - .walk_range(range)? - .map(|entry| { - entry.map(|(BlockNumberAddress((_, address)), storage_entry)| { - (keccak256(address), keccak256(storage_entry.key), storage_entry.value) - }) + .into_iter() + .map(|(BlockNumberAddress((_, address)), storage_entry)| { + (keccak256(address), keccak256(storage_entry.key), storage_entry.value) }) - .collect::, _>>()?; + .collect::>(); hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk)); // Apply values to HashedState, and remove the account if it's None. @@ -2724,6 +2731,18 @@ impl HashingWriter for DatabaseProvider, + ) -> ProviderResult>> { + let changesets = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()?; + self.unwind_storage_hashing(changesets.into_iter()) + } + fn insert_storage_for_hashing( &self, storages: impl IntoIterator)>, @@ -2845,16 +2864,14 @@ impl HashingWriter for DatabaseProvider HistoryWriter for DatabaseProvider { - fn unwind_account_history_indices( + fn unwind_account_history_indices<'a>( &self, - range: RangeInclusive, + changesets: impl Iterator, ) -> ProviderResult { - let mut last_indices = self - .tx - .cursor_read::()? - .walk_range(range)? - .map(|entry| entry.map(|(index, account)| (account.address, index))) - .collect::, _>>()?; + let mut last_indices = changesets + .into_iter() + .map(|(index, account)| (account.address, *index)) + .collect::>(); last_indices.sort_by_key(|(a, _)| *a); // Unwind the account history index. @@ -2881,6 +2898,18 @@ impl HistoryWriter for DatabaseProvider, + ) -> ProviderResult { + let changesets = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()?; + self.unwind_account_history_indices(changesets.iter()) + } + fn insert_account_history_index( &self, account_transitions: impl IntoIterator)>, @@ -2893,16 +2922,12 @@ impl HistoryWriter for DatabaseProvider, + changesets: impl Iterator, ) -> ProviderResult { - let mut storage_changesets = self - .tx - .cursor_read::()? - .walk_range(range)? - .map(|entry| { - entry.map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn)) - }) - .collect::, _>>()?; + let mut storage_changesets = changesets + .into_iter() + .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn)) + .collect::>(); storage_changesets.sort_by_key(|(address, key, _)| (*address, *key)); let mut cursor = self.tx.cursor_write::()?; @@ -2931,6 +2956,18 @@ impl HistoryWriter for DatabaseProvider, + ) -> ProviderResult { + let changesets = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()?; + self.unwind_storage_history_indices(changesets.into_iter()) + } + fn insert_storage_history_index( &self, storage_transitions: impl IntoIterator)>, @@ -2973,10 +3010,14 @@ impl, ) -> ProviderResult { - let storage_range = BlockNumberAddress::range(range.clone()); + let changed_accounts = self + .tx + .cursor_read::()? + .walk_range(range.clone())? + .collect::, _>>()?; // Unwind account hashes. Add changed accounts to account prefix set. - let hashed_addresses = self.unwind_account_hashing(range.clone())?; + let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?; let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len()); let mut destroyed_accounts = HashSet::default(); for (hashed_address, account) in hashed_addresses { @@ -2987,12 +3028,19 @@ impl()? + .walk_range(storage_range)? + .collect::, _>>()?; // Unwind storage hashes. Add changed account and storage keys to corresponding prefix // sets. let mut storage_prefix_sets = HashMap::::default(); - let storage_entries = self.unwind_storage_hashing(storage_range.clone())?; + let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?; for (hashed_address, hashed_slots) in storage_entries { account_prefix_set.insert(Nibbles::unpack(hashed_address)); let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len()); @@ -3003,7 +3051,7 @@ impl, ) -> ProviderResult<()> { - let storage_range = BlockNumberAddress::range(range.clone()); + let changed_accounts = self + .tx + .cursor_read::()? + .walk_range(range.clone())? + .collect::, _>>()?; // Unwind account hashes. Add changed accounts to account prefix set. - let hashed_addresses = self.unwind_account_hashing(range.clone())?; + let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?; let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len()); let mut destroyed_accounts = HashSet::default(); for (hashed_address, account) in hashed_addresses { @@ -3075,12 +3127,19 @@ impl()? + .walk_range(storage_range)? + .collect::, _>>()?; // Unwind storage hashes. Add changed account and storage keys to corresponding prefix // sets. let mut storage_prefix_sets = HashMap::::default(); - let storage_entries = self.unwind_storage_hashing(storage_range.clone())?; + let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?; for (hashed_address, hashed_slots) in storage_entries { account_prefix_set.insert(Nibbles::unpack(hashed_address)); let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len()); @@ -3091,7 +3150,7 @@ impl( &self, - range: RangeInclusive, + changesets: impl Iterator, + ) -> ProviderResult>>; + + /// Unwind and clear account hashing in a given block range. + /// + /// # Returns + /// + /// Set of hashed keys of updated accounts. + fn unwind_account_hashing_range( + &self, + range: impl RangeBounds, ) -> ProviderResult>>; /// Inserts all accounts into [reth_db::tables::AccountsHistory] table. @@ -38,7 +48,17 @@ pub trait HashingWriter: Send + Sync { /// Mapping of hashed keys of updated accounts to their respective updated hashed slots. fn unwind_storage_hashing( &self, - range: Range, + changesets: impl Iterator, + ) -> ProviderResult>>; + + /// Unwind and clear storage hashing in a given block range. + /// + /// # Returns + /// + /// Mapping of hashed keys of updated accounts to their respective updated hashed slots. + fn unwind_storage_hashing_range( + &self, + range: impl RangeBounds, ) -> ProviderResult>>; /// Iterates over storages and inserts them to hashing table. diff --git a/crates/storage/provider/src/traits/history.rs b/crates/storage/provider/src/traits/history.rs index cbf9bece4b94..4eadd6031c35 100644 --- a/crates/storage/provider/src/traits/history.rs +++ b/crates/storage/provider/src/traits/history.rs @@ -1,8 +1,9 @@ use alloy_primitives::{Address, BlockNumber, B256}; use auto_impl::auto_impl; -use reth_db_api::models::BlockNumberAddress; +use reth_db::models::{AccountBeforeTx, BlockNumberAddress}; +use reth_primitives::StorageEntry; use reth_storage_errors::provider::ProviderResult; -use std::ops::{Range, RangeInclusive}; +use std::ops::{RangeBounds, RangeInclusive}; /// History Writer #[auto_impl(&, Arc, Box)] @@ -10,9 +11,17 @@ pub trait HistoryWriter: Send + Sync { /// Unwind and clear account history indices. /// /// Returns number of changesets walked. - fn unwind_account_history_indices( + fn unwind_account_history_indices<'a>( &self, - range: RangeInclusive, + changesets: impl Iterator, + ) -> ProviderResult; + + /// Unwind and clear account history indices in a given block range. + /// + /// Returns number of changesets walked. + fn unwind_account_history_indices_range( + &self, + range: impl RangeBounds, ) -> ProviderResult; /// Insert account change index to database. Used inside AccountHistoryIndex stage @@ -26,7 +35,15 @@ pub trait HistoryWriter: Send + Sync { /// Returns number of changesets walked. fn unwind_storage_history_indices( &self, - range: Range, + changesets: impl Iterator, + ) -> ProviderResult; + + /// Unwind and clear storage history indices in a given block range. + /// + /// Returns number of changesets walked. + fn unwind_storage_history_indices_range( + &self, + range: impl RangeBounds, ) -> ProviderResult; /// Insert storage change index to database. Used inside StorageHistoryIndex stage