From be2fb54d76170e9c187183308496fa88ad9172a1 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 11 Jun 2024 14:53:50 +0100 Subject: [PATCH 1/6] feat(provider): `BlockReader::sealed_block_with_senders_range` --- crates/primitives/src/header.rs | 6 + crates/stages/stages/src/stages/execution.rs | 2 +- .../provider/src/providers/database/mod.rs | 7 + .../src/providers/database/provider.rs | 172 ++++++++++++------ crates/storage/provider/src/providers/mod.rs | 7 + .../src/providers/static_file/manager.rs | 7 + .../storage/provider/src/test_utils/mock.rs | 7 + .../storage/provider/src/test_utils/noop.rs | 7 + crates/storage/storage-api/src/block.rs | 11 +- 9 files changed, 169 insertions(+), 57 deletions(-) diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index 25c846972c8f..abb5e1f34144 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -109,6 +109,12 @@ pub struct Header { pub extra_data: Bytes, } +impl AsRef for Header { + fn as_ref(&self) -> &Self { + self + } +} + impl Default for Header { fn default() -> Self { Self { diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 373c7993dbc2..63b4cace2518 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -395,7 +395,7 @@ where // Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent. if self.exex_manager_handle.has_exexs() { // Get the blocks for the unwound range. - let blocks = provider.get_take_block_range::(range.clone())?; + let blocks = provider.sealed_block_with_senders_range(range.clone())?; let previous_input = self.post_unwind_commit_input.replace(Chain::new( blocks, bundle_state_with_receipts, diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 0fe0eeb6df3b..d869e66979db 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -339,6 +339,13 @@ impl BlockReader for ProviderFactory { ) -> ProviderResult> { self.provider()?.block_with_senders_range(range) } + + fn sealed_block_with_senders_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + self.provider()?.sealed_block_with_senders_range(range) + } } impl TransactionsProvider for ProviderFactory { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index b8a9beda853b..0da0c742c8c9 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1317,15 +1317,18 @@ impl BlockNumReader for DatabaseProvider { } impl DatabaseProvider { - fn process_block_range( + fn process_block_range( &self, range: RangeInclusive, + headers_range: HF, mut assemble_block: F, ) -> ProviderResult> where + H: AsRef
, + HF: FnOnce(RangeInclusive) -> ProviderResult>, F: FnMut( Range, - Header, + H, Vec
, Option, Option, @@ -1338,53 +1341,59 @@ impl DatabaseProvider { let len = range.end().saturating_sub(*range.start()) as usize; let mut blocks = Vec::with_capacity(len); - let headers = self.headers_range(range)?; + let headers = headers_range(range)?; let mut ommers_cursor = self.tx.cursor_read::()?; let mut withdrawals_cursor = self.tx.cursor_read::()?; let mut requests_cursor = self.tx.cursor_read::()?; let mut block_body_cursor = self.tx.cursor_read::()?; for header in headers { + let header_ref = header.as_ref(); // If the body indices are not found, this means that the transactions either do // not exist in the database yet, or they do exit but are // not indexed. If they exist but are not indexed, we don't // have enough information to return the block anyways, so // we skip the block. - if let Some((_, block_body_indices)) = block_body_cursor.seek_exact(header.number)? { + if let Some((_, block_body_indices)) = + block_body_cursor.seek_exact(header_ref.number)? + { let tx_range = block_body_indices.tx_num_range(); // If we are past shanghai, then all blocks should have a withdrawal list, // even if empty let withdrawals = - if self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp) { + if self.chain_spec.is_shanghai_active_at_timestamp(header_ref.timestamp) { Some( withdrawals_cursor - .seek_exact(header.number)? + .seek_exact(header_ref.number)? .map(|(_, w)| w.withdrawals) .unwrap_or_default(), ) } else { None }; - let requests = if self.chain_spec.is_prague_active_at_timestamp(header.timestamp) { - Some(requests_cursor.seek_exact(header.number)?.unwrap_or_default().1) - } else { - None - }; + let requests = + if self.chain_spec.is_prague_active_at_timestamp(header_ref.timestamp) { + Some(requests_cursor.seek_exact(header_ref.number)?.unwrap_or_default().1) + } else { + None + }; let ommers = - if self.chain_spec.final_paris_total_difficulty(header.number).is_some() { + if self.chain_spec.final_paris_total_difficulty(header_ref.number).is_some() { Vec::new() } else { ommers_cursor - .seek_exact(header.number)? + .seek_exact(header_ref.number)? .map(|(_, o)| o.ommers) .unwrap_or_default() }; + if let Ok(b) = assemble_block(tx_range, header, ommers, withdrawals, requests) { blocks.push(b); } } } + Ok(blocks) } } @@ -1515,17 +1524,21 @@ impl BlockReader for DatabaseProvider { fn block_range(&self, range: RangeInclusive) -> ProviderResult> { let mut tx_cursor = self.tx.cursor_read::()?; - self.process_block_range(range, |tx_range, header, ommers, withdrawals, requests| { - let body = if tx_range.is_empty() { - Vec::new() - } else { - self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)? - .into_iter() - .map(Into::into) - .collect() - }; - Ok(Block { header, body, ommers, withdrawals, requests }) - }) + self.process_block_range( + range, + |range| self.headers_range(range), + |tx_range, header, ommers, withdrawals, requests| { + let body = if tx_range.is_empty() { + Vec::new() + } else { + self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)? + .into_iter() + .map(Into::into) + .collect() + }; + Ok(Block { header, body, ommers, withdrawals, requests }) + }, + ) } fn block_with_senders_range( @@ -1535,42 +1548,95 @@ impl BlockReader for DatabaseProvider { let mut tx_cursor = self.tx.cursor_read::()?; let mut senders_cursor = self.tx.cursor_read::()?; - self.process_block_range(range, |tx_range, header, ommers, withdrawals, requests| { - let (body, senders) = if tx_range.is_empty() { - (Vec::new(), Vec::new()) - } else { - let body = self - .transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? - .into_iter() - .map(Into::into) - .collect::>(); - // fetch senders from the senders table - let known_senders = - senders_cursor + self.process_block_range( + range, + |range| self.headers_range(range), + |tx_range, header, ommers, withdrawals, requests| { + let (body, senders) = if tx_range.is_empty() { + (Vec::new(), Vec::new()) + } else { + let body = self + .transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? + .into_iter() + .map(Into::into) + .collect::>(); + // fetch senders from the senders table + let known_senders = senders_cursor .walk_range(tx_range.clone())? .collect::, _>>()?; - let mut senders = Vec::with_capacity(body.len()); - for (tx_num, tx) in tx_range.zip(body.iter()) { - match known_senders.get(&tx_num) { - None => { - // recover the sender from the transaction if not found - let sender = tx - .recover_signer_unchecked() - .ok_or_else(|| ProviderError::SenderRecoveryError)?; - senders.push(sender); + let mut senders = Vec::with_capacity(body.len()); + for (tx_num, tx) in tx_range.zip(body.iter()) { + match known_senders.get(&tx_num) { + None => { + // recover the sender from the transaction if not found + let sender = tx + .recover_signer_unchecked() + .ok_or_else(|| ProviderError::SenderRecoveryError)?; + senders.push(sender); + } + Some(sender) => senders.push(*sender), } - Some(sender) => senders.push(*sender), } - } - (body, senders) - }; + (body, senders) + }; - Block { header, body, ommers, withdrawals, requests } - .try_with_senders_unchecked(senders) - .map_err(|_| ProviderError::SenderRecoveryError) - }) + Block { header, body, ommers, withdrawals, requests } + .try_with_senders_unchecked(senders) + .map_err(|_| ProviderError::SenderRecoveryError) + }, + ) + } + + fn sealed_block_with_senders_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + let mut tx_cursor = self.tx.cursor_read::()?; + let mut senders_cursor = self.tx.cursor_read::()?; + + self.process_block_range( + range, + |range| self.sealed_headers_range(range), + |tx_range, header, ommers, withdrawals, requests| { + let (body, senders) = if tx_range.is_empty() { + (Vec::new(), Vec::new()) + } else { + let body = self + .transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? + .into_iter() + .map(Into::into) + .collect::>(); + // fetch senders from the senders table + let known_senders = senders_cursor + .walk_range(tx_range.clone())? + .collect::, _>>()?; + + let mut senders = Vec::with_capacity(body.len()); + for (tx_num, tx) in tx_range.zip(body.iter()) { + match known_senders.get(&tx_num) { + None => { + // recover the sender from the transaction if not found + let sender = tx + .recover_signer_unchecked() + .ok_or_else(|| ProviderError::SenderRecoveryError)?; + senders.push(sender); + } + Some(sender) => senders.push(*sender), + } + } + + (body, senders) + }; + + SealedBlockWithSenders::new( + SealedBlock { header, body, ommers, withdrawals, requests }, + senders, + ) + .ok_or(ProviderError::SenderRecoveryError) + }, + ) } } diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index ede69cdf6eee..c9e478070f7e 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -337,6 +337,13 @@ where ) -> ProviderResult> { self.database.block_with_senders_range(range) } + + fn sealed_block_with_senders_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + self.database.sealed_block_with_senders_range(range) + } } impl TransactionsProvider for BlockchainProvider diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 9b41bac0924b..25831935ef5e 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1467,6 +1467,13 @@ impl BlockReader for StaticFileProvider { ) -> ProviderResult> { Err(ProviderError::UnsupportedProvider) } + + fn sealed_block_with_senders_range( + &self, + _range: RangeInclusive, + ) -> ProviderResult> { + Err(ProviderError::UnsupportedProvider) + } } impl WithdrawalsProvider for StaticFileProvider { diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 744888013e4e..3e72ad4cd8eb 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -491,6 +491,13 @@ impl BlockReader for MockEthProvider { ) -> ProviderResult> { Ok(vec![]) } + + fn sealed_block_with_senders_range( + &self, + _range: RangeInclusive, + ) -> ProviderResult> { + Ok(vec![]) + } } impl BlockReaderIdExt for MockEthProvider { diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 078ef5d3dbec..719437b71739 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -123,6 +123,13 @@ impl BlockReader for NoopProvider { ) -> ProviderResult> { Ok(vec![]) } + + fn sealed_block_with_senders_range( + &self, + _range: RangeInclusive, + ) -> ProviderResult> { + Ok(vec![]) + } } impl BlockReaderIdExt for NoopProvider { diff --git a/crates/storage/storage-api/src/block.rs b/crates/storage/storage-api/src/block.rs index 539930f5a5c6..42ab05f22503 100644 --- a/crates/storage/storage-api/src/block.rs +++ b/crates/storage/storage-api/src/block.rs @@ -123,14 +123,19 @@ pub trait BlockReader: /// Note: returns only available blocks fn block_range(&self, range: RangeInclusive) -> ProviderResult>; - /// retrieves a range of blocks from the database, along with the senders of each + /// Returns a range of blocks from the database, along with the senders of each /// transaction in the blocks. - /// - /// The `transaction_kind` parameter determines whether to return its hash fn block_with_senders_range( &self, range: RangeInclusive, ) -> ProviderResult>; + + /// Returns a range of sealed blocks from the database, along with the senders of each + /// transaction in the blocks. + fn sealed_block_with_senders_range( + &self, + range: RangeInclusive, + ) -> ProviderResult>; } /// Trait extension for `BlockReader`, for types that implement `BlockId` conversion. From f7a6a1c67e23497334fdb529ca699502c2677621 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 11 Jun 2024 15:01:40 +0100 Subject: [PATCH 2/6] introduce helper function --- .../src/providers/database/provider.rs | 139 ++++++++---------- 1 file changed, 64 insertions(+), 75 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 0da0c742c8c9..070788cf45eb 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -314,6 +314,11 @@ impl DatabaseProvider { &self.tx } + /// Returns a reference to the [`ChainSpec`]. + pub fn chain_spec(&self) -> &ChainSpec { + &self.chain_spec + } + /// Return full table as Vec pub fn table(&self) -> Result>, DatabaseError> where @@ -353,9 +358,59 @@ impl DatabaseProvider { ) } - /// Returns a reference to the [`ChainSpec`]. - pub fn chain_spec(&self) -> &ChainSpec { - &self.chain_spec + fn block_with_senders_range, B>( + &self, + headers_range: impl FnOnce(RangeInclusive) -> ProviderResult>, + mut assemble_block: impl FnMut( + H, + Vec, + Vec
, + Option, + Option, + Vec
, + ) -> ProviderResult, + range: RangeInclusive, + ) -> ProviderResult> { + let mut tx_cursor = self.tx.cursor_read::()?; + let mut senders_cursor = self.tx.cursor_read::()?; + + self.process_block_range( + range, + headers_range, + |tx_range, header, ommers, withdrawals, requests| { + let (body, senders) = if tx_range.is_empty() { + (Vec::new(), Vec::new()) + } else { + let body = self + .transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? + .into_iter() + .map(Into::into) + .collect::>(); + // fetch senders from the senders table + let known_senders = senders_cursor + .walk_range(tx_range.clone())? + .collect::, _>>()?; + + let mut senders = Vec::with_capacity(body.len()); + for (tx_num, tx) in tx_range.zip(body.iter()) { + match known_senders.get(&tx_num) { + None => { + // recover the sender from the transaction if not found + let sender = tx + .recover_signer_unchecked() + .ok_or_else(|| ProviderError::SenderRecoveryError)?; + senders.push(sender); + } + Some(sender) => senders.push(*sender), + } + } + + (body, senders) + }; + + assemble_block(header, body, ommers, withdrawals, requests, senders) + }, + ) } } @@ -1545,47 +1600,14 @@ impl BlockReader for DatabaseProvider { &self, range: RangeInclusive, ) -> ProviderResult> { - let mut tx_cursor = self.tx.cursor_read::()?; - let mut senders_cursor = self.tx.cursor_read::()?; - - self.process_block_range( - range, + self.block_with_senders_range( |range| self.headers_range(range), - |tx_range, header, ommers, withdrawals, requests| { - let (body, senders) = if tx_range.is_empty() { - (Vec::new(), Vec::new()) - } else { - let body = self - .transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? - .into_iter() - .map(Into::into) - .collect::>(); - // fetch senders from the senders table - let known_senders = senders_cursor - .walk_range(tx_range.clone())? - .collect::, _>>()?; - - let mut senders = Vec::with_capacity(body.len()); - for (tx_num, tx) in tx_range.zip(body.iter()) { - match known_senders.get(&tx_num) { - None => { - // recover the sender from the transaction if not found - let sender = tx - .recover_signer_unchecked() - .ok_or_else(|| ProviderError::SenderRecoveryError)?; - senders.push(sender); - } - Some(sender) => senders.push(*sender), - } - } - - (body, senders) - }; - + |header, body, ommers, withdrawals, requests, senders| { Block { header, body, ommers, withdrawals, requests } .try_with_senders_unchecked(senders) .map_err(|_| ProviderError::SenderRecoveryError) }, + range, ) } @@ -1593,49 +1615,16 @@ impl BlockReader for DatabaseProvider { &self, range: RangeInclusive, ) -> ProviderResult> { - let mut tx_cursor = self.tx.cursor_read::()?; - let mut senders_cursor = self.tx.cursor_read::()?; - - self.process_block_range( - range, + self.block_with_senders_range( |range| self.sealed_headers_range(range), - |tx_range, header, ommers, withdrawals, requests| { - let (body, senders) = if tx_range.is_empty() { - (Vec::new(), Vec::new()) - } else { - let body = self - .transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? - .into_iter() - .map(Into::into) - .collect::>(); - // fetch senders from the senders table - let known_senders = senders_cursor - .walk_range(tx_range.clone())? - .collect::, _>>()?; - - let mut senders = Vec::with_capacity(body.len()); - for (tx_num, tx) in tx_range.zip(body.iter()) { - match known_senders.get(&tx_num) { - None => { - // recover the sender from the transaction if not found - let sender = tx - .recover_signer_unchecked() - .ok_or_else(|| ProviderError::SenderRecoveryError)?; - senders.push(sender); - } - Some(sender) => senders.push(*sender), - } - } - - (body, senders) - }; - + |header, body, ommers, withdrawals, requests, senders| { SealedBlockWithSenders::new( SealedBlock { header, body, ommers, withdrawals, requests }, senders, ) .ok_or(ProviderError::SenderRecoveryError) }, + range, ) } } From ef0b6df997048a24248f31d33dde9a88a5b7c47d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 11 Jun 2024 16:27:16 +0100 Subject: [PATCH 3/6] fixes after review --- .../src/providers/database/provider.rs | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 070788cf45eb..dc60b38846d4 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -358,10 +358,21 @@ impl DatabaseProvider { ) } - fn block_with_senders_range, B>( + /// Returns a range of blocks from the database, along with the senders of each + /// transaction in the blocks. + /// + /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to + /// construct a block for the provided inputs + fn block_with_senders_range( &self, - headers_range: impl FnOnce(RangeInclusive) -> ProviderResult>, - mut assemble_block: impl FnMut( + headers_range: HF, + mut assemble_block: BF, + range: RangeInclusive, + ) -> ProviderResult> + where + H: AsRef
, + HF: FnOnce(RangeInclusive) -> ProviderResult>, + BF: FnMut( H, Vec, Vec
, @@ -369,8 +380,7 @@ impl DatabaseProvider { Option, Vec
, ) -> ProviderResult, - range: RangeInclusive, - ) -> ProviderResult> { + { let mut tx_cursor = self.tx.cursor_read::()?; let mut senders_cursor = self.tx.cursor_read::()?; From be14cfe1eb65860bb100e3ac3929e189d72b4df5 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 11 Jun 2024 16:31:07 +0100 Subject: [PATCH 4/6] fnmut, fnonce -> fn --- crates/storage/provider/src/providers/database/provider.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index dc60b38846d4..56bd2b571e1d 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -366,13 +366,13 @@ impl DatabaseProvider { fn block_with_senders_range( &self, headers_range: HF, - mut assemble_block: BF, + assemble_block: BF, range: RangeInclusive, ) -> ProviderResult> where H: AsRef
, - HF: FnOnce(RangeInclusive) -> ProviderResult>, - BF: FnMut( + HF: Fn(RangeInclusive) -> ProviderResult>, + BF: Fn( H, Vec, Vec
, From b91fe87fff02c733cdcc3c7f22fe5aef49c550d6 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 11 Jun 2024 17:09:24 +0100 Subject: [PATCH 5/6] add comments and reorganize database provider impls --- .../src/providers/database/provider.rs | 341 +++++++++--------- 1 file changed, 175 insertions(+), 166 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 56bd2b571e1d..10249c36d91b 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -123,46 +123,6 @@ impl DatabaseProvider { } } -impl DatabaseProvider { - /// Iterates over read only values in the given table and collects them into a vector. - /// - /// Early-returns if the range is empty, without opening a cursor transaction. - fn cursor_read_collect>( - &self, - range: impl RangeBounds, - ) -> ProviderResult> { - let capacity = match range_size_hint(&range) { - Some(0) | None => return Ok(Vec::new()), - Some(capacity) => capacity, - }; - let mut cursor = self.tx.cursor_read::()?; - self.cursor_collect_with_capacity(&mut cursor, range, capacity) - } - - /// Iterates over read only values in the given table and collects them into a vector. - fn cursor_collect>( - &self, - cursor: &mut impl DbCursorRO, - range: impl RangeBounds, - ) -> ProviderResult> { - let capacity = range_size_hint(&range).unwrap_or(0); - self.cursor_collect_with_capacity(cursor, range, capacity) - } - - fn cursor_collect_with_capacity>( - &self, - cursor: &mut impl DbCursorRO, - range: impl RangeBounds, - capacity: usize, - ) -> ProviderResult> { - let mut items = Vec::with_capacity(capacity); - for entry in cursor.walk_range(range)? { - items.push(entry?.1); - } - Ok(items) - } -} - impl DatabaseProvider { /// Storage provider for state at that given block pub fn state_provider_by_block_number( @@ -319,6 +279,17 @@ impl DatabaseProvider { &self.chain_spec } + /// Disables long-lived read transaction safety guarantees for leaks prevention and + /// observability improvements. + /// + /// CAUTION: In most of the cases, you want the safety guarantees for long read transactions + /// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning + /// that Reth as a node is offline and not progressing. + pub fn disable_long_read_transaction_safety(mut self) -> Self { + self.tx.disable_long_read_transaction_safety(); + self + } + /// Return full table as Vec pub fn table(&self) -> Result>, DatabaseError> where @@ -330,15 +301,42 @@ impl DatabaseProvider { .collect::, DatabaseError>>() } - /// Disables long-lived read transaction safety guarantees for leaks prevention and - /// observability improvements. + /// Iterates over read only values in the given table and collects them into a vector. /// - /// CAUTION: In most of the cases, you want the safety guarantees for long read transactions - /// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning - /// that Reth as a node is offline and not progressing. - pub fn disable_long_read_transaction_safety(mut self) -> Self { - self.tx.disable_long_read_transaction_safety(); - self + /// Early-returns if the range is empty, without opening a cursor transaction. + fn cursor_read_collect>( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + let capacity = match range_size_hint(&range) { + Some(0) | None => return Ok(Vec::new()), + Some(capacity) => capacity, + }; + let mut cursor = self.tx.cursor_read::()?; + self.cursor_collect_with_capacity(&mut cursor, range, capacity) + } + + /// Iterates over read only values in the given table and collects them into a vector. + fn cursor_collect>( + &self, + cursor: &mut impl DbCursorRO, + range: impl RangeBounds, + ) -> ProviderResult> { + let capacity = range_size_hint(&range).unwrap_or(0); + self.cursor_collect_with_capacity(cursor, range, capacity) + } + + fn cursor_collect_with_capacity>( + &self, + cursor: &mut impl DbCursorRO, + range: impl RangeBounds, + capacity: usize, + ) -> ProviderResult> { + let mut items = Vec::with_capacity(capacity); + for entry in cursor.walk_range(range)? { + items.push(entry?.1); + } + Ok(items) } fn transactions_by_tx_range_with_cursor( @@ -358,16 +356,112 @@ impl DatabaseProvider { ) } + /// Returns a range of blocks from the database. + /// + /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to + /// construct blocks from the following inputs: + /// – Header + /// - Range of transaction numbers + /// – Ommers + /// – Withdrawals + /// – Requests + /// – Senders + fn block_range( + &self, + range: RangeInclusive, + headers_range: HF, + mut assemble_block: F, + ) -> ProviderResult> + where + H: AsRef
, + HF: FnOnce(RangeInclusive) -> ProviderResult>, + F: FnMut( + H, + Range, + Vec
, + Option, + Option, + ) -> ProviderResult, + { + if range.is_empty() { + return Ok(Vec::new()) + } + + let len = range.end().saturating_sub(*range.start()) as usize; + let mut blocks = Vec::with_capacity(len); + + let headers = headers_range(range)?; + let mut ommers_cursor = self.tx.cursor_read::()?; + let mut withdrawals_cursor = self.tx.cursor_read::()?; + let mut requests_cursor = self.tx.cursor_read::()?; + let mut block_body_cursor = self.tx.cursor_read::()?; + + for header in headers { + let header_ref = header.as_ref(); + // If the body indices are not found, this means that the transactions either do + // not exist in the database yet, or they do exit but are + // not indexed. If they exist but are not indexed, we don't + // have enough information to return the block anyways, so + // we skip the block. + if let Some((_, block_body_indices)) = + block_body_cursor.seek_exact(header_ref.number)? + { + let tx_range = block_body_indices.tx_num_range(); + + // If we are past shanghai, then all blocks should have a withdrawal list, + // even if empty + let withdrawals = + if self.chain_spec.is_shanghai_active_at_timestamp(header_ref.timestamp) { + Some( + withdrawals_cursor + .seek_exact(header_ref.number)? + .map(|(_, w)| w.withdrawals) + .unwrap_or_default(), + ) + } else { + None + }; + let requests = + if self.chain_spec.is_prague_active_at_timestamp(header_ref.timestamp) { + Some(requests_cursor.seek_exact(header_ref.number)?.unwrap_or_default().1) + } else { + None + }; + let ommers = + if self.chain_spec.final_paris_total_difficulty(header_ref.number).is_some() { + Vec::new() + } else { + ommers_cursor + .seek_exact(header_ref.number)? + .map(|(_, o)| o.ommers) + .unwrap_or_default() + }; + + if let Ok(b) = assemble_block(header, tx_range, ommers, withdrawals, requests) { + blocks.push(b); + } + } + } + + Ok(blocks) + } + /// Returns a range of blocks from the database, along with the senders of each /// transaction in the blocks. /// /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to - /// construct a block for the provided inputs + /// construct blocks from the following inputs: + /// – Header + /// - Transactions + /// – Ommers + /// – Withdrawals + /// – Requests + /// – Senders fn block_with_senders_range( &self, + range: RangeInclusive, headers_range: HF, assemble_block: BF, - range: RangeInclusive, ) -> ProviderResult> where H: AsRef
, @@ -384,43 +478,40 @@ impl DatabaseProvider { let mut tx_cursor = self.tx.cursor_read::()?; let mut senders_cursor = self.tx.cursor_read::()?; - self.process_block_range( - range, - headers_range, - |tx_range, header, ommers, withdrawals, requests| { - let (body, senders) = if tx_range.is_empty() { - (Vec::new(), Vec::new()) - } else { - let body = self - .transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? - .into_iter() - .map(Into::into) - .collect::>(); - // fetch senders from the senders table - let known_senders = senders_cursor + self.block_range(range, headers_range, |header, tx_range, ommers, withdrawals, requests| { + let (body, senders) = if tx_range.is_empty() { + (Vec::new(), Vec::new()) + } else { + let body = self + .transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? + .into_iter() + .map(Into::into) + .collect::>(); + // fetch senders from the senders table + let known_senders = + senders_cursor .walk_range(tx_range.clone())? .collect::, _>>()?; - let mut senders = Vec::with_capacity(body.len()); - for (tx_num, tx) in tx_range.zip(body.iter()) { - match known_senders.get(&tx_num) { - None => { - // recover the sender from the transaction if not found - let sender = tx - .recover_signer_unchecked() - .ok_or_else(|| ProviderError::SenderRecoveryError)?; - senders.push(sender); - } - Some(sender) => senders.push(*sender), + let mut senders = Vec::with_capacity(body.len()); + for (tx_num, tx) in tx_range.zip(body.iter()) { + match known_senders.get(&tx_num) { + None => { + // recover the sender from the transaction if not found + let sender = tx + .recover_signer_unchecked() + .ok_or_else(|| ProviderError::SenderRecoveryError)?; + senders.push(sender); } + Some(sender) => senders.push(*sender), } + } - (body, senders) - }; + (body, senders) + }; - assemble_block(header, body, ommers, withdrawals, requests, senders) - }, - ) + assemble_block(header, body, ommers, withdrawals, requests, senders) + }) } } @@ -1381,88 +1472,6 @@ impl BlockNumReader for DatabaseProvider { } } -impl DatabaseProvider { - fn process_block_range( - &self, - range: RangeInclusive, - headers_range: HF, - mut assemble_block: F, - ) -> ProviderResult> - where - H: AsRef
, - HF: FnOnce(RangeInclusive) -> ProviderResult>, - F: FnMut( - Range, - H, - Vec
, - Option, - Option, - ) -> ProviderResult, - { - if range.is_empty() { - return Ok(Vec::new()) - } - - let len = range.end().saturating_sub(*range.start()) as usize; - let mut blocks = Vec::with_capacity(len); - - let headers = headers_range(range)?; - let mut ommers_cursor = self.tx.cursor_read::()?; - let mut withdrawals_cursor = self.tx.cursor_read::()?; - let mut requests_cursor = self.tx.cursor_read::()?; - let mut block_body_cursor = self.tx.cursor_read::()?; - - for header in headers { - let header_ref = header.as_ref(); - // If the body indices are not found, this means that the transactions either do - // not exist in the database yet, or they do exit but are - // not indexed. If they exist but are not indexed, we don't - // have enough information to return the block anyways, so - // we skip the block. - if let Some((_, block_body_indices)) = - block_body_cursor.seek_exact(header_ref.number)? - { - let tx_range = block_body_indices.tx_num_range(); - - // If we are past shanghai, then all blocks should have a withdrawal list, - // even if empty - let withdrawals = - if self.chain_spec.is_shanghai_active_at_timestamp(header_ref.timestamp) { - Some( - withdrawals_cursor - .seek_exact(header_ref.number)? - .map(|(_, w)| w.withdrawals) - .unwrap_or_default(), - ) - } else { - None - }; - let requests = - if self.chain_spec.is_prague_active_at_timestamp(header_ref.timestamp) { - Some(requests_cursor.seek_exact(header_ref.number)?.unwrap_or_default().1) - } else { - None - }; - let ommers = - if self.chain_spec.final_paris_total_difficulty(header_ref.number).is_some() { - Vec::new() - } else { - ommers_cursor - .seek_exact(header_ref.number)? - .map(|(_, o)| o.ommers) - .unwrap_or_default() - }; - - if let Ok(b) = assemble_block(tx_range, header, ommers, withdrawals, requests) { - blocks.push(b); - } - } - } - - Ok(blocks) - } -} - impl BlockReader for DatabaseProvider { fn find_block_by_hash(&self, hash: B256, source: BlockSource) -> ProviderResult> { if source.is_database() { @@ -1589,10 +1598,10 @@ impl BlockReader for DatabaseProvider { fn block_range(&self, range: RangeInclusive) -> ProviderResult> { let mut tx_cursor = self.tx.cursor_read::()?; - self.process_block_range( + self.block_range( range, |range| self.headers_range(range), - |tx_range, header, ommers, withdrawals, requests| { + |header, tx_range, ommers, withdrawals, requests| { let body = if tx_range.is_empty() { Vec::new() } else { @@ -1611,13 +1620,13 @@ impl BlockReader for DatabaseProvider { range: RangeInclusive, ) -> ProviderResult> { self.block_with_senders_range( + range, |range| self.headers_range(range), |header, body, ommers, withdrawals, requests, senders| { Block { header, body, ommers, withdrawals, requests } .try_with_senders_unchecked(senders) .map_err(|_| ProviderError::SenderRecoveryError) }, - range, ) } @@ -1626,6 +1635,7 @@ impl BlockReader for DatabaseProvider { range: RangeInclusive, ) -> ProviderResult> { self.block_with_senders_range( + range, |range| self.sealed_headers_range(range), |header, body, ommers, withdrawals, requests, senders| { SealedBlockWithSenders::new( @@ -1634,7 +1644,6 @@ impl BlockReader for DatabaseProvider { ) .ok_or(ProviderError::SenderRecoveryError) }, - range, ) } } From 8f94c334e4cb2692da1cb87bcd4a45777053d465 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 11 Jun 2024 17:17:44 +0100 Subject: [PATCH 6/6] fix clippy --- .../src/providers/database/provider.rs | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 10249c36d91b..f1aeafbbc09e 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -360,12 +360,12 @@ impl DatabaseProvider { /// /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to /// construct blocks from the following inputs: - /// – Header - /// - Range of transaction numbers - /// – Ommers - /// – Withdrawals - /// – Requests - /// – Senders + /// – Header + /// - Range of transaction numbers + /// – Ommers + /// – Withdrawals + /// – Requests + /// – Senders fn block_range( &self, range: RangeInclusive, @@ -451,12 +451,12 @@ impl DatabaseProvider { /// /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to /// construct blocks from the following inputs: - /// – Header - /// - Transactions - /// – Ommers - /// – Withdrawals - /// – Requests - /// – Senders + /// – Header + /// - Transactions + /// – Ommers + /// – Withdrawals + /// – Requests + /// – Senders fn block_with_senders_range( &self, range: RangeInclusive,