Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DatabaseProvider delegates to SnapshotProvider on HeadersProvider #5593

Merged
merged 13 commits into from
Dec 15, 2023
16 changes: 16 additions & 0 deletions crates/storage/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,19 @@ pub use chain::{Chain, DisplayBlocksChain};

pub mod bundle_state;
pub use bundle_state::{BundleStateWithReceipts, OriginalValuesKnown, StateChanges, StateReverts};

pub(crate) fn to_range<R: std::ops::RangeBounds<u64>>(bounds: R) -> std::ops::Range<u64> {
let start = match bounds.start_bound() {
std::ops::Bound::Included(&v) => v,
std::ops::Bound::Excluded(&v) => v + 1,
std::ops::Bound::Unbounded => 0,
};

let end = match bounds.end_bound() {
std::ops::Bound::Included(&v) => v + 1,
std::ops::Bound::Excluded(&v) => v,
std::ops::Bound::Unbounded => u64::MAX,
};

start..end
}
201 changes: 162 additions & 39 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
bundle_state::{BundleStateInit, BundleStateWithReceipts, RevertsInit},
providers::{database::metrics, SnapshotProvider},
to_range,
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
Expand Down Expand Up @@ -39,9 +40,9 @@ use reth_primitives::{
trie::Nibbles,
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders,
ChainInfo, ChainSpec, GotExpected, Hardfork, Head, Header, PruneCheckpoint, PruneModes,
PruneSegment, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StorageEntry,
TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash,
TxHash, TxNumber, Withdrawal, B256, U256,
PruneSegment, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, SnapshotSegment,
StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered,
TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, B256, U256,
};
use reth_trie::{prefix_set::PrefixSetMut, StateRoot};
use revm::primitives::{BlockEnv, CfgEnv, SpecId};
Expand Down Expand Up @@ -195,6 +196,83 @@ impl<TX: DbTx> DatabaseProvider<TX> {
.walk(Some(T::Key::default()))?
.collect::<Result<Vec<_>, DatabaseError>>()
}

/// Gets data within a specified range, potentially spanning different snapshots and database.
///
/// # Arguments
/// * `segment` - The segment of the snapshot to query.
/// * `block_range` - The range of data to fetch.
/// * `fetch_from_snapshot` - A function to fetch data from the snapshot.
/// * `fetch_from_database` - A function to fetch data from the database.
/// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
/// terminated when this function returns false, thereby filtering the data based on the
/// provided condition.
fn get_range_with_snapshot<T, P, FS, FD>(
&self,
segment: SnapshotSegment,
block_range: Range<u64>,
fetch_from_snapshot: FS,
fetch_from_database: FD,
mut predicate: P,
) -> ProviderResult<Vec<T>>
where
FS: Fn(&SnapshotProvider, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
FD: Fn(Range<u64>, P) -> ProviderResult<Vec<T>>,
P: FnMut(&T) -> bool,
{
let mut adjusted_range = to_range(block_range);
let mut data = Vec::new();

if let Some(snapshot_provider) = &self.snapshot_provider {
if let Some(snapshot_upper_bound) = snapshot_provider.get_highest_snapshot(segment) {
if adjusted_range.start <= snapshot_upper_bound {
data.extend(fetch_from_snapshot(
snapshot_provider,
adjusted_range.start..adjusted_range.end.min(snapshot_upper_bound + 1),
&mut predicate,
)?);
}
adjusted_range.start = adjusted_range.start.max(snapshot_upper_bound + 1);
}
}

if adjusted_range.end > adjusted_range.start {
data.extend(fetch_from_database(adjusted_range, predicate)?)
}

Ok(data)
}

/// Retrieves data from the database or snapshot, wherever it's available.
///
/// # Arguments
/// * `segment` - The segment of the snapshot to check against.
/// * `block_number` - Requested block number
/// * `fetch_from_snapshot` - A closure that defines how to fetch the data from the snapshot
/// provider.
/// * `fetch_from_database` - A closure that defines how to fetch the data from the database
/// when the snapshot doesn't contain the required data or is not available.
fn get_with_snapshot<T, FS, FD>(
&self,
segment: SnapshotSegment,
block_number: u64,
fetch_from_snapshot: FS,
fetch_from_database: FD,
) -> ProviderResult<Option<T>>
where
FS: Fn(&SnapshotProvider) -> ProviderResult<Option<T>>,
FD: Fn() -> ProviderResult<Option<T>>,
{
if let Some(provider) = &self.snapshot_provider {
if provider
.get_highest_snapshot(segment)
.map_or(false, |snapshot_upper_bound| snapshot_upper_bound >= block_number)
{
return fetch_from_snapshot(provider);
}
}
fetch_from_database()
}
}

impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Expand Down Expand Up @@ -931,7 +1009,12 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
}

fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Header>> {
Ok(self.tx.get::<tables::Headers>(num)?)
self.get_with_snapshot(
SnapshotSegment::Headers,
num,
|snapshot| snapshot.header_by_number(num),
|| Ok(self.tx.get::<tables::Headers>(num)?),
)
}

fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
Expand All @@ -949,65 +1032,105 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
return Ok(Some(td))
}

Ok(self.tx.get::<tables::HeaderTD>(number)?.map(|td| td.0))
self.get_with_snapshot(
SnapshotSegment::Headers,
number,
|snapshot| snapshot.header_td_by_number(number),
|| Ok(self.tx.get::<tables::HeaderTD>(number)?.map(|td| td.0)),
)
}

fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> ProviderResult<Vec<Header>> {
let mut cursor = self.tx.cursor_read::<tables::Headers>()?;
cursor
.walk_range(range)?
.map(|result| result.map(|(_, header)| header).map_err(Into::into))
.collect::<ProviderResult<Vec<_>>>()
self.get_range_with_snapshot(
SnapshotSegment::Headers,
to_range(range),
|snapshot, range, _| snapshot.headers_range(range),
|range, _| {
self.tx
.cursor_read::<tables::Headers>()?
.walk_range(range)?
.map(|result| result.map(|(_, header)| header).map_err(Into::into))
.collect::<ProviderResult<Vec<_>>>()
},
|_| true,
)
}

fn sealed_header(&self, number: BlockNumber) -> ProviderResult<Option<SealedHeader>> {
if let Some(header) = self.header_by_number(number)? {
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
Ok(Some(header.seal(hash)))
} else {
Ok(None)
}
self.get_with_snapshot(
SnapshotSegment::Headers,
number,
|snapshot| snapshot.sealed_header(number),
|| {
if let Some(header) = self.header_by_number(number)? {
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
Ok(Some(header.seal(hash)))
} else {
Ok(None)
}
},
)
}

fn sealed_headers_while(
&self,
range: impl RangeBounds<BlockNumber>,
mut predicate: impl FnMut(&SealedHeader) -> bool,
predicate: impl FnMut(&SealedHeader) -> bool,
) -> ProviderResult<Vec<SealedHeader>> {
let mut headers = vec![];
for entry in self.tx.cursor_read::<tables::Headers>()?.walk_range(range)? {
let (number, header) = entry?;
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
let sealed = header.seal(hash);
if !predicate(&sealed) {
break
}
headers.push(sealed);
}
Ok(headers)
self.get_range_with_snapshot(
SnapshotSegment::Headers,
to_range(range),
|snapshot, range, predicate| snapshot.sealed_headers_while(range, predicate),
|range, mut predicate| {
let mut headers = vec![];
for entry in self.tx.cursor_read::<tables::Headers>()?.walk_range(range)? {
let (number, header) = entry?;
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
let sealed = header.seal(hash);
if !predicate(&sealed) {
break
}
headers.push(sealed);
}
Ok(headers)
},
predicate,
)
}
}

impl<TX: DbTx> BlockHashReader for DatabaseProvider<TX> {
fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
Ok(self.tx.get::<tables::CanonicalHeaders>(number)?)
self.get_with_snapshot(
SnapshotSegment::Headers,
number,
|snapshot| snapshot.block_hash(number),
|| Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
)
}

fn canonical_hashes_range(
&self,
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
let range = start..end;
let mut cursor = self.tx.cursor_read::<tables::CanonicalHeaders>()?;
cursor
.walk_range(range)?
.map(|result| result.map(|(_, hash)| hash).map_err(Into::into))
.collect::<ProviderResult<Vec<_>>>()
self.get_range_with_snapshot(
SnapshotSegment::Headers,
start..end,
|snapshot, range, _| snapshot.canonical_hashes_range(range.start, range.end),
|range, _| {
self.tx
.cursor_read::<tables::CanonicalHeaders>()?
.walk_range(range)?
.map(|result| result.map(|(_, hash)| hash).map_err(Into::into))
.collect::<ProviderResult<Vec<_>>>()
},
|_| true,
)
}
}

Expand Down
21 changes: 3 additions & 18 deletions crates/storage/provider/src/providers/snapshot/jar.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::LoadedJarRef;
use crate::{
BlockHashReader, BlockNumReader, HeaderProvider, ReceiptProvider, TransactionsProvider,
to_range, BlockHashReader, BlockNumReader, HeaderProvider, ReceiptProvider,
TransactionsProvider,
};
use reth_db::{
codecs::CompactU256,
Expand All @@ -11,7 +12,7 @@ use reth_primitives::{
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, Receipt, SealedHeader,
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
};
use std::ops::{Deref, Range, RangeBounds};
use std::ops::{Deref, RangeBounds};

/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
Expand Down Expand Up @@ -286,19 +287,3 @@ impl<'a> ReceiptProvider for SnapshotJarProvider<'a> {
Err(ProviderError::UnsupportedProvider)
}
}

fn to_range<R: RangeBounds<u64>>(bounds: R) -> Range<u64> {
let start = match bounds.start_bound() {
std::ops::Bound::Included(&v) => v,
std::ops::Bound::Excluded(&v) => v + 1,
std::ops::Bound::Unbounded => 0,
};

let end = match bounds.end_bound() {
std::ops::Bound::Included(&v) => v + 1,
std::ops::Bound::Excluded(&v) => v,
std::ops::Bound::Unbounded => u64::MAX,
};

start..end
}
Loading