Skip to content

Commit

Permalink
feat(storage): pass changesets to unwind methods (#7879)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Oct 29, 2024
1 parent 6f3600d commit 5232842
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 59 deletions.
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ where
input.unwind_block_range_with_threshold(self.commit_threshold);

// Aggregate all transition changesets and make a list of accounts that have been changed.
provider.unwind_account_hashing(range)?;
provider.unwind_account_hashing_range(range)?;

let mut stage_checkpoint =
input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

provider.unwind_storage_hashing(BlockNumberAddress::range(range))?;
provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?;

let mut stage_checkpoint =
input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/index_account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

provider.unwind_account_history_indices(range)?;
provider.unwind_account_history_indices_range(range)?;

// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/index_storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

provider.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?;

Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
Expand Down
149 changes: 104 additions & 45 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2639,19 +2639,17 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> StorageTrieWriter for DatabaseProvid
}

impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<TX, Spec> {
fn unwind_account_hashing(
fn unwind_account_hashing<'a>(
&self,
range: RangeInclusive<BlockNumber>,
changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
// Aggregate all block changesets and make a list of accounts that have been changed.
// Note that collecting and then reversing the order is necessary to ensure that the
// changes are applied in the correct order.
let hashed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.map(|entry| entry.map(|(_, e)| (keccak256(e.address), e.info)))
.collect::<Result<Vec<_>, _>>()?
let hashed_accounts = changesets
.into_iter()
.map(|(_, e)| (keccak256(e.address), e.info))
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<BTreeMap<_, _>>();
Expand All @@ -2669,13 +2667,25 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<T
Ok(hashed_accounts)
}

fn unwind_account_hashing_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let changesets = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_account_hashing(changesets.iter())
}

fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
let hashed_accounts =
accounts.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
for (hashed_address, account) in &hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
Expand All @@ -2688,18 +2698,15 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<T

fn unwind_storage_hashing(
&self,
range: Range<BlockNumberAddress>,
changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
// Aggregate all block changesets and make list of accounts that have been changed.
let mut changesets = self.tx.cursor_read::<tables::StorageChangeSets>()?;
let mut hashed_storages = changesets
.walk_range(range)?
.map(|entry| {
entry.map(|(BlockNumberAddress((_, address)), storage_entry)| {
(keccak256(address), keccak256(storage_entry.key), storage_entry.value)
})
.into_iter()
.map(|(BlockNumberAddress((_, address)), storage_entry)| {
(keccak256(address), keccak256(storage_entry.key), storage_entry.value)
})
.collect::<Result<Vec<_>, _>>()?;
.collect::<Vec<_>>();
hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));

// Apply values to HashedState, and remove the account if it's None.
Expand All @@ -2724,6 +2731,18 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<T
Ok(hashed_storage_keys)
}

fn unwind_storage_hashing_range(
&self,
range: impl RangeBounds<BlockNumberAddress>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
let changesets = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_storage_hashing(changesets.into_iter())
}

fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
Expand Down Expand Up @@ -2845,16 +2864,14 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<T
}

impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HistoryWriter for DatabaseProvider<TX, Spec> {
fn unwind_account_history_indices(
fn unwind_account_history_indices<'a>(
&self,
range: RangeInclusive<BlockNumber>,
changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
) -> ProviderResult<usize> {
let mut last_indices = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.map(|entry| entry.map(|(index, account)| (account.address, index)))
.collect::<Result<Vec<_>, _>>()?;
let mut last_indices = changesets
.into_iter()
.map(|(index, account)| (account.address, *index))
.collect::<Vec<_>>();
last_indices.sort_by_key(|(a, _)| *a);

// Unwind the account history index.
Expand All @@ -2881,6 +2898,18 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HistoryWriter for DatabaseProvider<T
Ok(changesets)
}

fn unwind_account_history_indices_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<usize> {
let changesets = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_account_history_indices(changesets.iter())
}

fn insert_account_history_index(
&self,
account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
Expand All @@ -2893,16 +2922,12 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HistoryWriter for DatabaseProvider<T

fn unwind_storage_history_indices(
&self,
range: Range<BlockNumberAddress>,
changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
) -> ProviderResult<usize> {
let mut storage_changesets = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(range)?
.map(|entry| {
entry.map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
})
.collect::<Result<Vec<_>, _>>()?;
let mut storage_changesets = changesets
.into_iter()
.map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
.collect::<Vec<_>>();
storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));

let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
Expand Down Expand Up @@ -2931,6 +2956,18 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HistoryWriter for DatabaseProvider<T
Ok(changesets)
}

fn unwind_storage_history_indices_range(
&self,
range: impl RangeBounds<BlockNumberAddress>,
) -> ProviderResult<usize> {
let changesets = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_storage_history_indices(changesets.into_iter())
}

fn insert_storage_history_index(
&self,
storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
Expand Down Expand Up @@ -2973,10 +3010,14 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Chain> {
let storage_range = BlockNumberAddress::range(range.clone());
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;

// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(range.clone())?;
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
Expand All @@ -2987,12 +3028,19 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
}

// Unwind account history indices.
self.unwind_account_history_indices(range.clone())?;
self.unwind_account_history_indices(changed_accounts.iter())?;
let storage_range = BlockNumberAddress::range(range.clone());

let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;

// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(storage_range.clone())?;
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
Expand All @@ -3003,7 +3051,7 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
}

// Unwind storage history indices.
self.unwind_storage_history_indices(storage_range)?;
self.unwind_storage_history_indices(changed_storages.iter().copied())?;

// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
Expand Down Expand Up @@ -3061,10 +3109,14 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let storage_range = BlockNumberAddress::range(range.clone());
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;

// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(range.clone())?;
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
Expand All @@ -3075,12 +3127,19 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
}

// Unwind account history indices.
self.unwind_account_history_indices(range.clone())?;
self.unwind_account_history_indices(changed_accounts.iter())?;

let storage_range = BlockNumberAddress::range(range.clone());
let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;

// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(storage_range.clone())?;
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
Expand All @@ -3091,7 +3150,7 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
}

// Unwind storage history indices.
self.unwind_storage_history_indices(storage_range)?;
self.unwind_storage_history_indices(changed_storages.iter().copied())?;

// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
Expand Down
30 changes: 25 additions & 5 deletions crates/storage/provider/src/traits/hashing.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use alloy_primitives::{Address, BlockNumber, B256};
use auto_impl::auto_impl;
use reth_db_api::models::BlockNumberAddress;
use reth_db::models::{AccountBeforeTx, BlockNumberAddress};
use reth_primitives::{Account, StorageEntry};
use reth_storage_errors::provider::ProviderResult;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
ops::{Range, RangeInclusive},
ops::{RangeBounds, RangeInclusive},
};

/// Hashing Writer
Expand All @@ -16,9 +16,19 @@ pub trait HashingWriter: Send + Sync {
/// # Returns
///
/// Set of hashed keys of updated accounts.
fn unwind_account_hashing(
fn unwind_account_hashing<'a>(
&self,
range: RangeInclusive<BlockNumber>,
changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>>;

/// Unwind and clear account hashing in a given block range.
///
/// # Returns
///
/// Set of hashed keys of updated accounts.
fn unwind_account_hashing_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>>;

/// Inserts all accounts into [reth_db::tables::AccountsHistory] table.
Expand All @@ -38,7 +48,17 @@ pub trait HashingWriter: Send + Sync {
/// Mapping of hashed keys of updated accounts to their respective updated hashed slots.
fn unwind_storage_hashing(
&self,
range: Range<BlockNumberAddress>,
changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>>;

/// Unwind and clear storage hashing in a given block range.
///
/// # Returns
///
/// Mapping of hashed keys of updated accounts to their respective updated hashed slots.
fn unwind_storage_hashing_range(
&self,
range: impl RangeBounds<BlockNumberAddress>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>>;

/// Iterates over storages and inserts them to hashing table.
Expand Down
Loading

0 comments on commit 5232842

Please sign in to comment.