Skip to content

Commit

Permalink
feat: swap Dashmap for StaticFileWriters on StaticFileProvider (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Aug 7, 2024
1 parent e7214af commit 7486d5b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 38 deletions.
29 changes: 9 additions & 20 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::{
metrics::StaticFileProviderMetrics, LoadedJar, StaticFileJarProvider, StaticFileProviderRW,
StaticFileProviderRWRefMut, BLOCKS_PER_STATIC_FILE,
metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
BLOCKS_PER_STATIC_FILE,
};
use crate::{
to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, DatabaseProvider,
HeaderProvider, ReceiptProvider, RequestsProvider, StageCheckpointReader, StatsReader,
TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider,
};
use dashmap::{mapref::entry::Entry as DashMapEntry, DashMap};
use dashmap::DashMap;
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_db::{
Expand Down Expand Up @@ -114,8 +115,8 @@ pub struct StaticFileProviderInner {
/// Whether [`StaticFileJarProvider`] loads filters into memory. If not, `by_hash` queries
/// won't be able to be queried directly.
load_filters: bool,
/// Maintains a map of `StaticFile` writers for each [`StaticFileSegment`]
writers: DashMap<StaticFileSegment, StaticFileProviderRW>,
/// Maintains a writer set of [`StaticFileSegment`].
writers: StaticFileWriters,
metrics: Option<Arc<StaticFileProviderMetrics>>,
/// Access rights of the provider.
access: StaticFileAccess,
Expand Down Expand Up @@ -1055,17 +1056,8 @@ impl StaticFileWriter for StaticFileProvider {
}

trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
Ok(match self.writers.entry(segment) {
DashMapEntry::Occupied(entry) => entry.into_ref(),
DashMapEntry::Vacant(entry) => {
let writer = StaticFileProviderRW::new(
segment,
block,
Arc::downgrade(&self.0),
self.metrics.clone(),
)?;
entry.insert(writer)
}
self.writers.get_or_create(segment, || {
StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
})
}

Expand All @@ -1077,10 +1069,7 @@ impl StaticFileWriter for StaticFileProvider {
}

fn commit(&self) -> ProviderResult<()> {
for mut writer in self.writers.iter_mut() {
writer.commit()?;
}
Ok(())
self.writers.commit()
}

fn ensure_file_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
Expand Down
66 changes: 63 additions & 3 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
};
use crate::providers::static_file::metrics::StaticFileProviderOperation;
use dashmap::mapref::one::RefMut;
use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
use reth_codecs::Compact;
use reth_db_api::models::CompactU256;
use reth_nippy_jar::{ConsistencyFailStrategy, NippyJar, NippyJarError, NippyJarWriter};
Expand All @@ -20,8 +20,68 @@ use std::{
};
use tracing::debug;

/// Mutable reference to a dashmap element of [`StaticFileProviderRW`].
pub type StaticFileProviderRWRefMut<'a> = RefMut<'a, StaticFileSegment, StaticFileProviderRW>;
/// Static file writers for every known [`StaticFileSegment`].
///
/// WARNING: Trying to use more than one writer for the same segment type **will result in a
/// deadlock**.
#[derive(Debug, Default)]
pub(crate) struct StaticFileWriters {
headers: RwLock<Option<StaticFileProviderRW>>,
transactions: RwLock<Option<StaticFileProviderRW>>,
receipts: RwLock<Option<StaticFileProviderRW>>,
}

impl StaticFileWriters {
pub(crate) fn get_or_create(
&self,
segment: StaticFileSegment,
create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW>,
) -> ProviderResult<StaticFileProviderRWRefMut<'_>> {
let mut write_guard = match segment {
StaticFileSegment::Headers => self.headers.write(),
StaticFileSegment::Transactions => self.transactions.write(),
StaticFileSegment::Receipts => self.receipts.write(),
};

if write_guard.is_none() {
*write_guard = Some(create_fn()?);
}

Ok(StaticFileProviderRWRefMut(write_guard))
}

pub(crate) fn commit(&self) -> ProviderResult<()> {
for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
let mut writer = writer_lock.write();
if let Some(writer) = writer.as_mut() {
writer.commit()?;
}
}
Ok(())
}
}

/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
#[derive(Debug)]
pub struct StaticFileProviderRWRefMut<'a>(
pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW>>,
);

impl<'a> std::ops::DerefMut for StaticFileProviderRWRefMut<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
// This is always created by [`StaticFileWriters::get_or_create`]
self.0.as_mut().expect("static file writer provider should be init")
}
}

impl<'a> std::ops::Deref for StaticFileProviderRWRefMut<'a> {
type Target = StaticFileProviderRW;

fn deref(&self) -> &Self::Target {
// This is always created by [`StaticFileWriters::get_or_create`]
self.0.as_ref().expect("static file writer provider should be init")
}
}

#[derive(Debug)]
/// Extends `StaticFileProvider` with writing capabilities
Expand Down
25 changes: 10 additions & 15 deletions crates/storage/provider/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ where

debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage");

// Only write receipts to static files if there is no receipt pruning configured.
let mut state_writer = if self.database().prune_modes_ref().has_receipts_pruning() {
UnifiedStorageWriter::from_database(self.database())
} else {
UnifiedStorageWriter::from(
self.database(),
self.static_file().get_writer(first_block.number, StaticFileSegment::Receipts)?,
)
};

// TODO: remove all the clones and do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
Expand All @@ -175,21 +185,6 @@ where
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();

// Only write receipts to static files if there is no receipt pruning configured.
let mut state_writer = if self.database().prune_modes_ref().has_receipts_pruning() {
UnifiedStorageWriter::from_database(self.database())
} else {
// This should be inside the hotloop, because preferably there should only be one
// mutable reference to a static file writer, since there's a 3 in 100 chance that
// another segment shares the same shard as the `Receipts` one. Which would result
// in a deadlock.
UnifiedStorageWriter::from(
self.database(),
self.static_file()
.get_writer(first_block.number, StaticFileSegment::Receipts)?,
)
};
state_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;

// insert hashes and intermediate merkle nodes
Expand Down

0 comments on commit 7486d5b

Please sign in to comment.