Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): pass changesets to unwind methods #7879

Merged
merged 4 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ where
input.unwind_block_range_with_threshold(self.commit_threshold);

// Aggregate all transition changesets and make a list of accounts that have been changed.
provider.unwind_account_hashing(range)?;
provider.unwind_account_hashing_range(range)?;

let mut stage_checkpoint =
input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

provider.unwind_storage_hashing(BlockNumberAddress::range(range))?;
provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?;

let mut stage_checkpoint =
input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/index_account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

provider.unwind_account_history_indices(range)?;
provider.unwind_account_history_indices_range(range)?;

// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/stages/src/stages/index_storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

provider.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?;

Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
Expand Down
149 changes: 104 additions & 45 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2639,19 +2639,17 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> StorageTrieWriter for DatabaseProvid
}

impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<TX, Spec> {
fn unwind_account_hashing(
fn unwind_account_hashing<'a>(
&self,
range: RangeInclusive<BlockNumber>,
changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
// Aggregate all block changesets and make a list of accounts that have been changed.
// Note that collecting and then reversing the order is necessary to ensure that the
// changes are applied in the correct order.
let hashed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.map(|entry| entry.map(|(_, e)| (keccak256(e.address), e.info)))
.collect::<Result<Vec<_>, _>>()?
let hashed_accounts = changesets
.into_iter()
.map(|(_, e)| (keccak256(e.address), e.info))
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<BTreeMap<_, _>>();
Expand All @@ -2669,13 +2667,25 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<T
Ok(hashed_accounts)
}

fn unwind_account_hashing_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let changesets = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_account_hashing(changesets.iter())
}

fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
let hashed_accounts =
accounts.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
for (hashed_address, account) in &hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
Expand All @@ -2688,18 +2698,15 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<T

fn unwind_storage_hashing(
&self,
range: Range<BlockNumberAddress>,
changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
// Aggregate all block changesets and make list of accounts that have been changed.
let mut changesets = self.tx.cursor_read::<tables::StorageChangeSets>()?;
let mut hashed_storages = changesets
.walk_range(range)?
.map(|entry| {
entry.map(|(BlockNumberAddress((_, address)), storage_entry)| {
(keccak256(address), keccak256(storage_entry.key), storage_entry.value)
})
.into_iter()
.map(|(BlockNumberAddress((_, address)), storage_entry)| {
(keccak256(address), keccak256(storage_entry.key), storage_entry.value)
})
.collect::<Result<Vec<_>, _>>()?;
.collect::<Vec<_>>();
hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));

// Apply values to HashedState, and remove the account if it's None.
Expand All @@ -2724,6 +2731,18 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<T
Ok(hashed_storage_keys)
}

fn unwind_storage_hashing_range(
&self,
range: impl RangeBounds<BlockNumberAddress>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
let changesets = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_storage_hashing(changesets.into_iter())
}

fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
Expand Down Expand Up @@ -2845,16 +2864,14 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HashingWriter for DatabaseProvider<T
}

impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HistoryWriter for DatabaseProvider<TX, Spec> {
fn unwind_account_history_indices(
fn unwind_account_history_indices<'a>(
&self,
range: RangeInclusive<BlockNumber>,
changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
) -> ProviderResult<usize> {
let mut last_indices = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.map(|entry| entry.map(|(index, account)| (account.address, index)))
.collect::<Result<Vec<_>, _>>()?;
let mut last_indices = changesets
.into_iter()
.map(|(index, account)| (account.address, *index))
.collect::<Vec<_>>();
last_indices.sort_by_key(|(a, _)| *a);

// Unwind the account history index.
Expand All @@ -2881,6 +2898,18 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HistoryWriter for DatabaseProvider<T
Ok(changesets)
}

fn unwind_account_history_indices_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<usize> {
let changesets = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_account_history_indices(changesets.iter())
}

fn insert_account_history_index(
&self,
account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
Expand All @@ -2893,16 +2922,12 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HistoryWriter for DatabaseProvider<T

fn unwind_storage_history_indices(
&self,
range: Range<BlockNumberAddress>,
changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
) -> ProviderResult<usize> {
let mut storage_changesets = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(range)?
.map(|entry| {
entry.map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
})
.collect::<Result<Vec<_>, _>>()?;
let mut storage_changesets = changesets
.into_iter()
.map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
.collect::<Vec<_>>();
storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));

let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
Expand Down Expand Up @@ -2931,6 +2956,18 @@ impl<TX: DbTxMut + DbTx, Spec: Send + Sync> HistoryWriter for DatabaseProvider<T
Ok(changesets)
}

fn unwind_storage_history_indices_range(
&self,
range: impl RangeBounds<BlockNumberAddress>,
) -> ProviderResult<usize> {
let changesets = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_storage_history_indices(changesets.into_iter())
}

fn insert_storage_history_index(
&self,
storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
Expand Down Expand Up @@ -2973,10 +3010,14 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Chain> {
let storage_range = BlockNumberAddress::range(range.clone());
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;

// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(range.clone())?;
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
Expand All @@ -2987,12 +3028,19 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
}

// Unwind account history indices.
self.unwind_account_history_indices(range.clone())?;
self.unwind_account_history_indices(changed_accounts.iter())?;
let storage_range = BlockNumberAddress::range(range.clone());

let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;

// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(storage_range.clone())?;
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
Expand All @@ -3003,7 +3051,7 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
}

// Unwind storage history indices.
self.unwind_storage_history_indices(storage_range)?;
self.unwind_storage_history_indices(changed_storages.iter().copied())?;

// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
Expand Down Expand Up @@ -3061,10 +3109,14 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let storage_range = BlockNumberAddress::range(range.clone());
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;

// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(range.clone())?;
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
Expand All @@ -3075,12 +3127,19 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
}

// Unwind account history indices.
self.unwind_account_history_indices(range.clone())?;
self.unwind_account_history_indices(changed_accounts.iter())?;

let storage_range = BlockNumberAddress::range(range.clone());
let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;

// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(storage_range.clone())?;
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
Expand All @@ -3091,7 +3150,7 @@ impl<TX: DbTxMut + DbTx + 'static, Spec: Send + Sync + EthereumHardforks + 'stat
}

// Unwind storage history indices.
self.unwind_storage_history_indices(storage_range)?;
self.unwind_storage_history_indices(changed_storages.iter().copied())?;

// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
Expand Down
30 changes: 25 additions & 5 deletions crates/storage/provider/src/traits/hashing.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use alloy_primitives::{Address, BlockNumber, B256};
use auto_impl::auto_impl;
use reth_db_api::models::BlockNumberAddress;
use reth_db::models::{AccountBeforeTx, BlockNumberAddress};
use reth_primitives::{Account, StorageEntry};
use reth_storage_errors::provider::ProviderResult;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
ops::{Range, RangeInclusive},
ops::{RangeBounds, RangeInclusive},
};

/// Hashing Writer
Expand All @@ -16,9 +16,19 @@ pub trait HashingWriter: Send + Sync {
/// # Returns
///
/// Set of hashed keys of updated accounts.
fn unwind_account_hashing(
fn unwind_account_hashing<'a>(
&self,
range: RangeInclusive<BlockNumber>,
changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>>;

/// Unwind and clear account hashing in a given block range.
///
/// # Returns
///
/// Set of hashed keys of updated accounts.
fn unwind_account_hashing_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>>;

/// Inserts all accounts into [reth_db::tables::AccountsHistory] table.
Expand All @@ -38,7 +48,17 @@ pub trait HashingWriter: Send + Sync {
/// Mapping of hashed keys of updated accounts to their respective updated hashed slots.
fn unwind_storage_hashing(
&self,
range: Range<BlockNumberAddress>,
changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>>;

/// Unwind and clear storage hashing in a given block range.
///
/// # Returns
///
/// Mapping of hashed keys of updated accounts to their respective updated hashed slots.
fn unwind_storage_hashing_range(
&self,
range: impl RangeBounds<BlockNumberAddress>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>>;

/// Iterates over storages and inserts them to hashing table.
Expand Down
Loading
Loading