diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 20d878399f5b..1d874ee578ae 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -1809,7 +1809,7 @@ mod tests { .with_pending_blocks((block2.number + 1, HashSet::new())) .assert(&tree); - assert!(tree.make_canonical(&block1a_hash).is_ok()); + assert_matches!(tree.make_canonical(&block1a_hash), Ok(_)); // Trie state: // b2a b2 (side chain) // | / diff --git a/crates/primitives/src/receipt.rs b/crates/primitives/src/receipt.rs index 37c6fd37c13a..5bf07391e7ec 100644 --- a/crates/primitives/src/receipt.rs +++ b/crates/primitives/src/receipt.rs @@ -120,21 +120,18 @@ impl Receipts { /// Retrieves gas spent by transactions as a vector of tuples (transaction index, gas used). pub fn gas_spent_by_tx(&self) -> Result, PruneSegmentError> { - self.last() - .map(|block_r| { - block_r - .iter() - .enumerate() - .map(|(id, tx_r)| { - if let Some(receipt) = tx_r.as_ref() { - Ok((id as u64, receipt.cumulative_gas_used)) - } else { - Err(PruneSegmentError::ReceiptsPruned) - } - }) - .collect::, PruneSegmentError>>() - }) - .unwrap_or(Ok(vec![])) + let Some(block_r) = self.last() else { + return Ok(vec![]); + }; + let mut out = Vec::with_capacity(block_r.len()); + for (id, tx_r) in block_r.iter().enumerate() { + if let Some(receipt) = tx_r.as_ref() { + out.push((id as u64, receipt.cumulative_gas_used)); + } else { + return Err(PruneSegmentError::ReceiptsPruned); + } + } + Ok(out) } } diff --git a/crates/storage/db/src/abstraction/cursor.rs b/crates/storage/db/src/abstraction/cursor.rs index ef2707c301df..306b010f53bf 100644 --- a/crates/storage/db/src/abstraction/cursor.rs +++ b/crates/storage/db/src/abstraction/cursor.rs @@ -153,7 +153,7 @@ where } } -impl<'cursor, T: Table, CURSOR: DbCursorRO> std::iter::Iterator for Walker<'cursor, T, CURSOR> { +impl<'cursor, T: Table, CURSOR: DbCursorRO> Iterator for Walker<'cursor, T, CURSOR> { type Item = Result, DatabaseError>; fn next(&mut self) -> Option { let start = self.start.take(); @@ -227,9 +227,7 @@ impl<'cursor, T: Table, CURSOR: DbCursorRW + DbCursorRO> ReverseWalker<'cu } } -impl<'cursor, T: Table, CURSOR: DbCursorRO> std::iter::Iterator - for ReverseWalker<'cursor, T, CURSOR> -{ +impl<'cursor, T: Table, CURSOR: DbCursorRO> Iterator for ReverseWalker<'cursor, T, CURSOR> { type Item = Result, DatabaseError>; fn next(&mut self) -> Option { @@ -270,10 +268,9 @@ where } } -impl<'cursor, T: Table, CURSOR: DbCursorRO> std::iter::Iterator - for RangeWalker<'cursor, T, CURSOR> -{ +impl<'cursor, T: Table, CURSOR: DbCursorRO> Iterator for RangeWalker<'cursor, T, CURSOR> { type Item = Result, DatabaseError>; + fn next(&mut self) -> Option { if self.is_done { return None @@ -292,11 +289,10 @@ impl<'cursor, T: Table, CURSOR: DbCursorRO> std::iter::Iterator } }, Some(res @ Err(_)) => Some(res), - None if matches!(self.end_key, Bound::Unbounded) => { - self.is_done = true; + None => { + self.is_done = matches!(self.end_key, Bound::Unbounded); None } - _ => None, } } } @@ -361,9 +357,7 @@ impl<'cursor, T: DupSort, CURSOR: DbCursorRW + DbDupCursorRO> DupWalker<'c } } -impl<'cursor, T: DupSort, CURSOR: DbDupCursorRO> std::iter::Iterator - for DupWalker<'cursor, T, CURSOR> -{ +impl<'cursor, T: DupSort, CURSOR: DbDupCursorRO> Iterator for DupWalker<'cursor, T, CURSOR> { type Item = Result, DatabaseError>; fn next(&mut self) -> Option { let start = self.start.take(); diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 63017be2d524..1c0cfa0e2bc2 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -1,8 +1,5 @@ //! Cursor wrapper for libmdbx-sys. -use reth_interfaces::db::{DatabaseWriteError, DatabaseWriteOperation}; -use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds}; - use crate::{ common::{PairResult, ValueOnlyResult}, cursor::{ @@ -10,11 +7,13 @@ use crate::{ ReverseWalker, Walker, }, metrics::{Operation, OperationMetrics}, - table::{Compress, DupSort, Encode, Table}, + table::{Compress, Decode, Decompress, DupSort, Encode, Table}, tables::utils::*, DatabaseError, }; +use reth_interfaces::db::{DatabaseWriteError, DatabaseWriteOperation}; use reth_libmdbx::{self, Error as MDBXError, TransactionKind, WriteFlags, RO, RW}; +use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds}; /// Read only Cursor. pub type CursorRO = Cursor; @@ -56,12 +55,17 @@ impl Cursor { } } -/// Takes `(key, value)` from the database and decodes it appropriately. -#[macro_export] -macro_rules! decode { - ($v:expr) => { - $v.map_err(|e| $crate::DatabaseError::Read(e.into()))?.map(decoder::).transpose() - }; +/// Decodes a `(key, value)` pair from the database. +#[allow(clippy::type_complexity)] +pub fn decode( + res: Result, Cow<'_, [u8]>)>, impl Into>, +) -> PairResult +where + T: Table, + T::Key: Decode, + T::Value: Decompress, +{ + res.map_err(|e| DatabaseError::Read(e.into()))?.map(decoder::).transpose() } /// Some types don't support compression (eg. B256), and we don't want to be copying them to the @@ -80,39 +84,36 @@ macro_rules! compress_to_buf_or_ref { impl DbCursorRO for Cursor { fn first(&mut self) -> PairResult { - decode!(self.inner.first()) + decode::(self.inner.first()) } fn seek_exact(&mut self, key: ::Key) -> PairResult { - decode!(self.inner.set_key(key.encode().as_ref())) + decode::(self.inner.set_key(key.encode().as_ref())) } fn seek(&mut self, key: ::Key) -> PairResult { - decode!(self.inner.set_range(key.encode().as_ref())) + decode::(self.inner.set_range(key.encode().as_ref())) } fn next(&mut self) -> PairResult { - decode!(self.inner.next()) + decode::(self.inner.next()) } fn prev(&mut self) -> PairResult { - decode!(self.inner.prev()) + decode::(self.inner.prev()) } fn last(&mut self) -> PairResult { - decode!(self.inner.last()) + decode::(self.inner.last()) } fn current(&mut self) -> PairResult { - decode!(self.inner.get_current()) + decode::(self.inner.get_current()) } fn walk(&mut self, start_key: Option) -> Result, DatabaseError> { let start = if let Some(start_key) = start_key { - self.inner - .set_range(start_key.encode().as_ref()) - .map_err(|e| DatabaseError::Read(e.into()))? - .map(decoder::) + decode::(self.inner.set_range(start_key.encode().as_ref())).transpose() } else { self.first().transpose() }; @@ -130,10 +131,8 @@ impl DbCursorRO for Cursor { unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds"); } Bound::Unbounded => self.inner.first(), - } - .map_err(|e| DatabaseError::Read(e.into()))? - .map(decoder::); - + }; + let start = decode::(start).transpose(); Ok(RangeWalker::new(self, start, range.end_bound().cloned())) } @@ -142,7 +141,7 @@ impl DbCursorRO for Cursor { start_key: Option, ) -> Result, DatabaseError> { let start = if let Some(start_key) = start_key { - decode!(self.inner.set_range(start_key.encode().as_ref())) + decode::(self.inner.set_range(start_key.encode().as_ref())) } else { self.last() } @@ -155,12 +154,12 @@ impl DbCursorRO for Cursor { impl DbDupCursorRO for Cursor { /// Returns the next `(key, value)` pair of a DUPSORT table. fn next_dup(&mut self) -> PairResult { - decode!(self.inner.next_dup()) + decode::(self.inner.next_dup()) } /// Returns the next `(key, value)` pair skipping the duplicates. fn next_no_dup(&mut self) -> PairResult { - decode!(self.inner.next_nodup()) + decode::(self.inner.next_nodup()) } /// Returns the next `value` of a duplicate `key`. diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 6c85dc12c807..56ac8f3f69d3 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -50,7 +50,7 @@ use revm::primitives::{BlockEnv, CfgEnv, SpecId}; use std::{ collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet}, fmt::Debug, - ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive}, + ops::{Bound, Deref, DerefMut, Range, RangeBounds, RangeInclusive}, sync::{mpsc, Arc}, time::{Duration, Instant}, }; @@ -112,6 +112,54 @@ impl DatabaseProvider { } } +impl DatabaseProvider { + fn cursor_read_collect, R>( + &self, + range: impl RangeBounds, + mut f: impl FnMut(T::Value) -> Result, + ) -> Result, DatabaseError> { + self.cursor_read_collect_with_key::(range, |_, v| f(v)) + } + + fn cursor_read_collect_with_key, R>( + &self, + range: impl RangeBounds, + f: impl FnMut(T::Key, T::Value) -> Result, + ) -> Result, DatabaseError> { + let capacity = match range_size_hint(&range) { + Some(0) | None => return Ok(Vec::new()), + Some(capacity) => capacity, + }; + let mut cursor = self.tx.cursor_read::()?; + self.cursor_collect_with_capacity(&mut cursor, range, capacity, f) + } + + fn cursor_collect, R>( + &self, + cursor: &mut impl DbCursorRO, + range: impl RangeBounds, + mut f: impl FnMut(T::Value) -> Result, + ) -> Result, DatabaseError> { + let capacity = range_size_hint(&range).unwrap_or(0); + self.cursor_collect_with_capacity(cursor, range, capacity, |_, v| f(v)) + } + + fn cursor_collect_with_capacity, R>( + &self, + cursor: &mut impl DbCursorRO, + range: impl RangeBounds, + capacity: usize, + mut f: impl FnMut(T::Key, T::Value) -> Result, + ) -> Result, DatabaseError> { + let mut items = Vec::with_capacity(capacity); + for entry in cursor.walk_range(range)? { + let (key, value) = entry?; + items.push(f(key, value)?); + } + Ok(items) + } +} + /// For a given key, unwind all history shards that are below the given block number. /// /// S - Sharded key subtype. @@ -1179,14 +1227,7 @@ impl BlockReader for DatabaseProvider { // we skip the block. if let Some((_, block_body_indices)) = block_body_cursor.seek_exact(num)? { let tx_range = block_body_indices.tx_num_range(); - let body = if tx_range.is_empty() { - Vec::new() - } else { - tx_cursor - .walk_range(tx_range)? - .map(|result| result.map(|(_, tx)| tx.into())) - .collect::, _>>()? - }; + let body = self.cursor_collect(&mut tx_cursor, tx_range, |tx| Ok(tx.into()))?; // If we are past shanghai, then all blocks should have a withdrawal list, // even if empty @@ -1355,19 +1396,14 @@ impl TransactionsProvider for DatabaseProvider { &self, id: BlockHashOrNumber, ) -> ProviderResult>> { - let mut tx_cursor = self.tx.cursor_read::()?; if let Some(block_number) = self.convert_hash_or_number(id)? { if let Some(body) = self.block_body_indices(block_number)? { - let tx_range = body.tx_num_range(); - return if tx_range.is_empty() { - Ok(Some(Vec::new())) - } else { - let transactions = tx_cursor - .walk_range(tx_range)? - .map(|result| result.map(|(_, tx)| tx.into())) - .collect::, _>>()?; - Ok(Some(transactions)) - }; + return self + .cursor_read_collect::(body.tx_num_range(), |tx| { + Ok(tx.into()) + }) + .map(Some) + .map_err(Into::into); } } Ok(None) @@ -1377,48 +1413,25 @@ impl TransactionsProvider for DatabaseProvider { &self, range: impl RangeBounds, ) -> ProviderResult>> { - let mut results = Vec::new(); - let mut body_cursor = self.tx.cursor_read::()?; let mut tx_cursor = self.tx.cursor_read::()?; - for entry in body_cursor.walk_range(range)? { - let (_, body) = entry?; - let tx_num_range = body.tx_num_range(); - if tx_num_range.is_empty() { - results.push(Vec::new()); - } else { - results.push( - tx_cursor - .walk_range(tx_num_range)? - .map(|result| result.map(|(_, tx)| tx.into())) - .collect::, _>>()?, - ); - } - } - Ok(results) + self.cursor_read_collect::(range, |body| { + self.cursor_collect(&mut tx_cursor, body.tx_num_range(), |tx| Ok(tx.into())) + }) + .map_err(Into::into) } fn transactions_by_tx_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - Ok(self - .tx - .cursor_read::()? - .walk_range(range)? - .map(|entry| entry.map(|tx| tx.1)) - .collect::, _>>()?) + self.cursor_read_collect::(range, Ok).map_err(Into::into) } fn senders_by_tx_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - Ok(self - .tx - .cursor_read::()? - .walk_range(range)? - .map(|entry| entry.map(|sender| sender.1)) - .collect::, _>>()?) + self.cursor_read_collect::(range, Ok).map_err(Into::into) } fn transaction_sender(&self, id: TxNumber) -> ProviderResult> { @@ -1442,17 +1455,10 @@ impl ReceiptProvider for DatabaseProvider { fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult>> { if let Some(number) = self.convert_hash_or_number(block)? { if let Some(body) = self.block_body_indices(number)? { - let tx_range = body.tx_num_range(); - return if tx_range.is_empty() { - Ok(Some(Vec::new())) - } else { - let mut receipts_cursor = self.tx.cursor_read::()?; - let receipts = receipts_cursor - .walk_range(tx_range)? - .map(|result| result.map(|(_, receipt)| receipt)) - .collect::, _>>()?; - Ok(Some(receipts)) - }; + return self + .cursor_read_collect::(body.tx_num_range(), Ok) + .map(Some) + .map_err(Into::into); } } Ok(None) @@ -1682,41 +1688,28 @@ impl HashingWriter for DatabaseProvider { &self, range: RangeInclusive, ) -> ProviderResult>> { - let mut hashed_accounts_cursor = self.tx.cursor_write::()?; - // Aggregate all block changesets and make a list of accounts that have been changed. + // Note that collecting and then reversing the order is necessary to ensure that the + // changes are applied in the correct order. let hashed_accounts = self .tx .cursor_read::()? .walk_range(range)? + .map(|entry| entry.map(|(_, e)| (keccak256(e.address), e.info))) .collect::, _>>()? .into_iter() .rev() - // fold all account to get the old balance/nonces and account that needs to be removed - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap>, (_, account_before)| { - accounts.insert(account_before.address, account_before.info); - accounts - }, - ) - .into_iter() - // hash addresses and collect it inside sorted BTreeMap. - // We are doing keccak only once per address. - .map(|(address, account)| (keccak256(address), account)) .collect::>(); - hashed_accounts - .iter() - // Apply values to HashedState (if Account is None remove it); - .try_for_each(|(hashed_address, account)| -> ProviderResult<()> { - if let Some(account) = account { - hashed_accounts_cursor.upsert(*hashed_address, *account)?; - } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() { - hashed_accounts_cursor.delete_current()?; - } - Ok(()) - })?; + // Apply values to HashedState, and remove the account if it's None. + let mut hashed_accounts_cursor = self.tx.cursor_write::()?; + for (hashed_address, account) in &hashed_accounts { + if let Some(account) = account { + hashed_accounts_cursor.upsert(*hashed_address, *account)?; + } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() { + hashed_accounts_cursor.delete_current()?; + } + } Ok(hashed_accounts) } @@ -1726,24 +1719,15 @@ impl HashingWriter for DatabaseProvider { accounts: impl IntoIterator)>, ) -> ProviderResult>> { let mut hashed_accounts_cursor = self.tx.cursor_write::()?; - - let hashed_accounts = accounts.into_iter().fold( - BTreeMap::new(), - |mut map: BTreeMap>, (address, account)| { - map.insert(keccak256(address), account); - map - }, - ); - - hashed_accounts.iter().try_for_each(|(hashed_address, account)| -> ProviderResult<()> { + let hashed_accounts = + accounts.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::>(); + for (hashed_address, account) in &hashed_accounts { if let Some(account) = account { - hashed_accounts_cursor.upsert(*hashed_address, *account)? + hashed_accounts_cursor.upsert(*hashed_address, *account)?; } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() { hashed_accounts_cursor.delete_current()?; } - Ok(()) - })?; - + } Ok(hashed_accounts) } @@ -1751,54 +1735,36 @@ impl HashingWriter for DatabaseProvider { &self, range: Range, ) -> ProviderResult>> { - let mut hashed_storage = self.tx.cursor_dup_write::()?; - // Aggregate all block changesets and make list of accounts that have been changed. - let hashed_storages = self - .tx - .cursor_read::()? + let mut changesets = self.tx.cursor_read::()?; + let mut hashed_storages = changesets .walk_range(range)? - .collect::, _>>()? - .into_iter() - .rev() - // fold all account to get the old balance/nonces and account that needs to be removed - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap<(Address, B256), U256>, - (BlockNumberAddress((_, address)), storage_entry)| { - accounts.insert((address, storage_entry.key), storage_entry.value); - accounts - }, - ) - .into_iter() - // hash addresses and collect it inside sorted BTreeMap. - // We are doing keccak only once per address. - .map(|((address, key), value)| ((keccak256(address), keccak256(key)), value)) - .collect::>(); - - let mut hashed_storage_keys: HashMap> = HashMap::default(); - for (hashed_address, hashed_slot) in hashed_storages.keys() { - hashed_storage_keys.entry(*hashed_address).or_default().insert(*hashed_slot); - } + .map(|entry| { + entry.map(|(BlockNumberAddress((_, address)), storage_entry)| { + (keccak256(address), keccak256(storage_entry.key), storage_entry.value) + }) + }) + .collect::, _>>()?; + hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk)); - hashed_storages - .into_iter() - // Apply values to HashedStorage (if Value is zero just remove it); - .try_for_each(|((hashed_address, key), value)| -> ProviderResult<()> { - if hashed_storage - .seek_by_key_subkey(hashed_address, key)? - .filter(|entry| entry.key == key) - .is_some() - { - hashed_storage.delete_current()?; - } + // Apply values to HashedState, and remove the account if it's None. + let mut hashed_storage_keys: HashMap> = HashMap::new(); + let mut hashed_storage = self.tx.cursor_dup_write::()?; + for (hashed_address, key, value) in hashed_storages.into_iter().rev() { + hashed_storage_keys.entry(hashed_address).or_default().insert(key); - if value != U256::ZERO { - hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; - } - Ok(()) - })?; + if hashed_storage + .seek_by_key_subkey(hashed_address, key)? + .filter(|entry| entry.key == key) + .is_some() + { + hashed_storage.delete_current()?; + } + if value != U256::ZERO { + hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; + } + } Ok(hashed_storage_keys) } @@ -1958,29 +1924,18 @@ impl HistoryWriter for DatabaseProvider { &self, range: Range, ) -> ProviderResult { - let storage_changesets = self + let mut storage_changesets = self .tx .cursor_read::()? .walk_range(range)? + .map(|entry| { + entry.map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn)) + }) .collect::, _>>()?; - let changesets = storage_changesets.len(); - - let last_indices = storage_changesets - .into_iter() - // reverse so we can get lowest block number where we need to unwind account. - .rev() - // fold all storages and get last block number - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap<(Address, B256), u64>, (index, storage)| { - // we just need address and lowest block number. - accounts.insert((index.address(), storage.key), index.block_number()); - accounts - }, - ); + storage_changesets.sort_by_key(|(address, key, _)| (*address, *key)); let mut cursor = self.tx.cursor_write::()?; - for ((address, storage_key), rem_index) in last_indices { + for &(address, storage_key, rem_index) in &storage_changesets { let partial_shard = unwind_history_shards::<_, tables::StorageHistory, _>( &mut cursor, StorageShardedKey::last(address, storage_key), @@ -2001,6 +1956,7 @@ impl HistoryWriter for DatabaseProvider { } } + let changesets = storage_changesets.len(); Ok(changesets) } @@ -2008,27 +1964,17 @@ impl HistoryWriter for DatabaseProvider { &self, range: RangeInclusive, ) -> ProviderResult { - let account_changeset = self + let mut last_indices = self .tx .cursor_read::()? .walk_range(range)? + .map(|entry| entry.map(|(index, account)| (account.address, index))) .collect::, _>>()?; - let changesets = account_changeset.len(); - - let last_indices = account_changeset - .into_iter() - // reverse so we can get lowest block number where we need to unwind account. - .rev() - // fold all account and get last block number - .fold(BTreeMap::new(), |mut accounts: BTreeMap, (index, account)| { - // we just need address and lowest block number. - accounts.insert(account.address, index); - accounts - }); + last_indices.sort_by_key(|(a, _)| *a); // Unwind the account history index. let mut cursor = self.tx.cursor_write::()?; - for (address, rem_index) in last_indices { + for &(address, rem_index) in &last_indices { let partial_shard = unwind_history_shards::<_, tables::AccountHistory, _>( &mut cursor, ShardedKey::last(address), @@ -2046,6 +1992,7 @@ impl HistoryWriter for DatabaseProvider { } } + let changesets = last_indices.len(); Ok(changesets) } } @@ -2125,6 +2072,7 @@ impl BlockExecutionWriter for DatabaseProvider { } trie_updates.flush(&self.tx)?; } + // get blocks let blocks = self.get_take_block_range::(chain_spec, range.clone())?; let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1)); @@ -2348,3 +2296,17 @@ impl PruneCheckpointWriter for DatabaseProvider { Ok(self.tx.put::(segment, checkpoint)?) } } + +fn range_size_hint(range: &impl RangeBounds) -> Option { + let start = match range.start_bound().cloned() { + Bound::Included(start) => start, + Bound::Excluded(start) => start.checked_add(1)?, + Bound::Unbounded => 0, + }; + let end = match range.end_bound().cloned() { + Bound::Included(end) => end.saturating_add(1), + Bound::Excluded(end) => end, + Bound::Unbounded => return None, + }; + end.checked_sub(start).map(|x| x as _) +}