diff --git a/Cargo.lock b/Cargo.lock index 1f5c30c88c54..db87e927479e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7433,6 +7433,7 @@ dependencies = [ "futures", "metrics", "mini-moka", + "parking_lot", "proptest", "rand 0.8.5", "rayon", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 7f2e84408bda..956095e5f7ee 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -46,7 +46,7 @@ revm-primitives.workspace = true # common futures.workspace = true thiserror.workspace = true -tokio = { workspace = true, features = ["macros", "sync"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] } mini-moka = { workspace = true, features = ["sync"] } # metrics @@ -58,6 +58,7 @@ schnellru.workspace = true rayon.workspace = true tracing.workspace = true derive_more.workspace = true +parking_lot.workspace = true # optional deps for test-utils reth-prune-types = { workspace = true, optional = true } diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs index f9c545014dcf..33467fa7dbe5 100644 --- a/crates/engine/tree/src/tree/cached_state.rs +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -483,7 +483,7 @@ impl Default for ProviderCacheBuilder { /// A saved cache that has been used for executing a specific block, which has been updated for its /// execution. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct SavedCache { /// The hash of the block these caches were used to execute. hash: B256, @@ -496,6 +496,15 @@ pub(crate) struct SavedCache { } impl SavedCache { + /// Creates a new instance with the internals + pub(super) const fn new( + hash: B256, + caches: ProviderCaches, + metrics: CachedStateMetrics, + ) -> Self { + Self { hash, caches, metrics } + } + /// Returns the hash for this cache pub(crate) const fn executed_block_hash(&self) -> B256 { self.hash @@ -505,6 +514,11 @@ impl SavedCache { pub(crate) fn split(self) -> (ProviderCaches, CachedStateMetrics) { (self.caches, self.metrics) } + + /// Returns the [`ProviderCaches`] belonging to the tracked hash. + pub(crate) fn cache(&self) -> &ProviderCaches { + &self.caches + } } /// Cache for an account's storage slots diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 8c026d063c21..b02a3e22b2f4 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -85,11 +85,16 @@ pub mod error; mod invalid_block_hook; mod invalid_headers; mod metrics; +mod payload_processor; mod persistence_state; pub mod root; mod trie_updates; -use crate::tree::{config::MIN_BLOCKS_FOR_PIPELINE_RUN, error::AdvancePersistenceError}; +use crate::tree::{ + config::MIN_BLOCKS_FOR_PIPELINE_RUN, + error::AdvancePersistenceError, + payload_processor::{executor::WorkloadExecutor, PayloadProcessor}, +}; pub use block_buffer::BlockBuffer; pub use config::TreeConfig; pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook}; @@ -601,6 +606,9 @@ where invalid_block_hook: Box>, /// The engine API variant of this handler engine_kind: EngineApiKind, + /// The type responsible for processing new payloads + payload_processor: PayloadProcessor, + /// The most recent cache used for execution. most_recent_cache: Option, /// Thread pool used for the state root task and prewarming @@ -681,6 +689,9 @@ where .expect("Failed to create proof worker thread pool"), ); + let payload_processor = + PayloadProcessor::new(WorkloadExecutor::new(), evm_config.clone(), &config); + Self { provider, executor_provider, @@ -700,6 +711,7 @@ where incoming_tx, invalid_block_hook: Box::new(NoopInvalidBlockHook), engine_kind, + payload_processor, most_recent_cache: None, thread_pool, } @@ -2650,6 +2662,317 @@ where Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) } + fn insert_block_inner2( + &mut self, + block: RecoveredBlock, + ) -> Result { + let block_num_hash = block.num_hash(); + debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree"); + + if self.block_by_hash(block.hash())?.is_some() { + return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid)) + } + + let start = Instant::now(); + + trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus"); + + // validate block consensus rules + self.validate_block(&block)?; + + trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider"); + let Some(state_provider) = self.state_provider(block.parent_hash())? else { + // we don't have the state required to execute this block, buffering it and find the + // missing parent block + let missing_ancestor = self + .state + .buffer + .lowest_ancestor(&block.parent_hash()) + .map(|block| block.parent_num_hash()) + .unwrap_or_else(|| block.parent_num_hash()); + + self.state.buffer.insert_block(block); + + return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { + head: self.state.tree_state.current_canonical_head, + missing_ancestor, + })) + }; + + // now validate against the parent + let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| { + InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound( + block.parent_hash().into(), + )) + })?; + if let Err(e) = + self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block) + { + warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash()); + return Err(e.into()) + } + + // We only run the parallel state root if we are currently persisting blocks that are all + // ancestors of the one we are executing. If we're committing ancestor blocks, then: any + // trie updates being committed are a subset of the in-memory trie updates collected before + // fetching reverts. So any diff in reverts (pre vs post commit) is already covered by the + // in-memory trie updates we collect in `compute_state_root_parallel`. + // + // See https://github.com/paradigmxyz/reth/issues/12688 for more details + let is_descendant_of_persisting_blocks = + self.is_descendant_of_persisting_blocks(block.header()); + + todo!() + // // Atomic bool for letting the prewarm tasks know when to stop + // let cancel_execution = ManualCancel::default(); + // + // let (state_root_handle, state_root_task_config, state_root_sender, state_hook) = + // if is_descendant_of_persisting_blocks && self.config.use_state_root_task() { + // let consistent_view = + // ConsistentDbView::new_with_latest_tip(self.provider.clone())?; + // + // // Compute trie input + // let trie_input_start = Instant::now(); + // let trie_input = self + // .compute_trie_input(consistent_view.clone(), block.header().parent_hash()) + // .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?; + // + // // Create state root config + // let config_start = Instant::now(); + // let state_root_config = + // StateRootConfig::new_from_input(consistent_view, trie_input); + // + // let trie_input_elapsed = config_start - trie_input_start; + // self.metrics + // .block_validation + // .trie_input_duration + // .set(trie_input_elapsed.as_secs_f64()); + // + // let config_elapsed = config_start.elapsed(); + // self.metrics + // .block_validation + // .state_root_config_duration + // .set(config_elapsed.as_secs_f64()); + // + // let state_root_task = + // StateRootTask::new(state_root_config.clone(), self.thread_pool.clone()); + // let state_root_sender = state_root_task.state_root_message_sender(); + // let state_hook = Box::new(state_root_task.state_hook()) as Box; + // ( + // Some(state_root_task.spawn()), + // Some(state_root_config), + // Some(state_root_sender), + // state_hook, + // ) + // } else { + // (None, None, None, Box::new(NoopHook::default()) as Box) + // }; + // + // let (caches, cache_metrics) = if let Some(cache) = + // self.take_latest_cache(block.parent_hash()) + // { + // cache.split() + // } else { + // ( + // ProviderCacheBuilder::default().build_caches(self.config. + // cross_block_cache_size()), CachedStateMetrics::zeroed(), + // ) + // }; + // + // // Use cached state provider before executing, used in execution after prewarming threads + // // complete + // let state_provider = CachedStateProvider::new_with_caches( + // state_provider, + // caches.clone(), + // cache_metrics.clone(), + // ); + // + // // This prevents caches from being saved without all prewarm execution tasks being + // completed let prewarm_task_lock = Arc::new(RwLock::new(())); + // + // if self.config.use_caching_and_prewarming() { + // debug!(target: "engine::tree", "Spawning prewarm threads"); + // let prewarm_start = Instant::now(); + // let prewarm_metrics = self.metrics.prewarm.clone(); + // + // // Prewarm transactions + // for (tx_idx, tx) in block.transactions_recovered().enumerate() { + // let state_root_sender = state_root_sender.clone(); + // + // let start = Instant::now(); + // self.prewarm_transaction( + // block.header().clone(), + // tx.cloned(), + // caches.clone(), + // cache_metrics.clone(), + // state_root_sender, + // cancel_execution.clone(), + // prewarm_task_lock.clone(), + // prewarm_metrics.clone(), + // )?; + // let elapsed = start.elapsed(); + // debug!(target: "engine::tree", ?tx_idx, elapsed = ?elapsed, "Spawned transaction + // prewarm"); } + // + // prewarm_metrics.transactions.set(block.transaction_count() as f64); + // prewarm_metrics.transactions_histogram.record(block.transaction_count() as f64); + // + // drop(state_root_sender); + // let elapsed = prewarm_start.elapsed(); + // debug!(target: "engine::tree", ?elapsed, "Done spawning prewarm threads"); + // + // self.metrics.prewarm.spawn_duration.set(elapsed); + // self.metrics.prewarm.spawn_duration_histogram.record(elapsed); + // } + // trace!(target: "engine::tree", block=?block_num_hash, "Executing block"); + // + // let executor = + // self.executor_provider.executor(StateProviderDatabase::new(&state_provider)); let + // execution_start = Instant::now(); let output = + // self.metrics.executor.execute_metered(executor, &block, state_hook)?; + // let execution_time = execution_start.elapsed(); + // trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, + // "Executed block"); + // + // // Ensure that prewarm tasks don't send proof messages after state root sender is dropped + // cancel_execution.cancel(); + // + // if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) { + // // call post-block hook + // self.invalid_block_hook.on_invalid_block(&parent_block, &block, &output, None); + // return Err(err.into()) + // } + // + // let hashed_state = self.provider.hashed_post_state(&output.state); + // + // if let Err(err) = self + // .payload_validator + // .validate_block_post_execution_with_hashed_state(&hashed_state, &block) + // { + // // call post-block hook + // self.invalid_block_hook.on_invalid_block(&parent_block, &block, &output, None); + // return Err(err.into()) + // } + // + // trace!(target: "engine::tree", block=?block_num_hash, "Calculating block state root"); + // let root_time = Instant::now(); + // + // // We attempt to compute state root in parallel if we are currently not persisting + // // anything to database. This is safe, because the database state cannot + // // change until we finish parallel computation. It is important that nothing + // // is being persisted as we are computing in parallel, because we initialize + // // a different database transaction per thread and it might end up with a + // // different view of the database. + // let (state_root, trie_output, root_elapsed) = if is_descendant_of_persisting_blocks { + // if self.config.use_state_root_task() { + // let state_root_handle = state_root_handle + // .expect("state root handle must exist if legacy_state_root is false"); + // let state_root_config = state_root_task_config.expect("task config is present"); + // + // // Handle state root result from task using handle + // self.handle_state_root_result( + // state_root_handle, + // state_root_config, + // block.sealed_block(), + // &hashed_state, + // &state_provider, + // root_time, + // )? + // } else { + // match self.compute_state_root_parallel(block.header().parent_hash(), + // &hashed_state) { + // Ok(result) => { + // info!( + // target: "engine::tree", + // block = ?block_num_hash, + // regular_state_root = ?result.0, + // "Regular root task finished" + // ); + // (result.0, result.1, root_time.elapsed()) + // } + // Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) + // => { debug!(target: "engine", %error, "Parallel state root + // computation failed consistency check, falling back"); let (root, + // updates) = + // state_provider.state_root_with_updates(hashed_state.clone())?; + // (root, updates, root_time.elapsed()) } + // Err(error) => return Err(InsertBlockErrorKind::Other(Box::new(error))), + // } + // } + // } else { + // debug!(target: "engine::tree", block=?block_num_hash, + // ?is_descendant_of_persisting_blocks, "Failed to compute state root in parallel"); + // let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?; + // (root, updates, root_time.elapsed()) + // }; + // + // if state_root != block.header().state_root() { + // // call post-block hook + // self.invalid_block_hook.on_invalid_block( + // &parent_block, + // &block, + // &output, + // Some((&trie_output, state_root)), + // ); + // return Err(ConsensusError::BodyStateRootDiff( + // GotExpected { got: state_root, expected: block.header().state_root() }.into(), + // ) + // .into()) + // } + // + // self.metrics.block_validation.record_state_root(&trie_output, + // root_elapsed.as_secs_f64()); debug!(target: "engine::tree", ?root_elapsed, + // block=?block_num_hash, "Calculated state root"); + // + // if self.config.use_caching_and_prewarming() { + // let save_cache_start = Instant::now(); + // // this is the only place / thread a writer is acquired, so we would have already + // // crashed if we had a poisoned rwlock + // // + // // we use a lock here and in prewarming, so we do not save the cache if a prewarm + // task // is still running, since it would update the cache with stale data. + // It's unlikely that // prewarm tasks are still running at this point however + // drop(prewarm_task_lock.write().unwrap()); + // // apply state updates to cache and save it (if saving was successful) + // self.most_recent_cache = state_provider.save_cache(block.hash(), &output.state).ok(); + // let elapsed = save_cache_start.elapsed(); + // + // // record how long it took to save caches + // self.metrics.block_validation.cache_saving_duration.set(elapsed.as_secs_f64()); + // } + // + // let executed: ExecutedBlockWithTrieUpdates = ExecutedBlockWithTrieUpdates { + // block: ExecutedBlock { + // recovered_block: Arc::new(block), + // execution_output: Arc::new(ExecutionOutcome::from((output, + // block_num_hash.number))), hashed_state: Arc::new(hashed_state), + // }, + // trie: Arc::new(trie_output), + // }; + // + // // if the parent is the canonical head, we can insert the block as the pending block + // if self.state.tree_state.canonical_block_hash() == + // executed.recovered_block().parent_hash() { + // debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block"); + // self.canonical_in_memory_state.set_pending_block(executed.clone()); + // } + // + // self.state.tree_state.insert_executed(executed.clone()); + // self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64); + // + // // emit insert event + // let elapsed = start.elapsed(); + // let engine_event = if self.is_fork(block_num_hash.hash)? { + // BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed) + // } else { + // BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) + // }; + // self.emit_event(EngineApiEvent::BeaconConsensus(engine_event)); + // + // debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block"); + // Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) + } + /// Compute state root for the given hashed post state in parallel. /// /// # Returns diff --git a/crates/engine/tree/src/tree/payload_processor/executor.rs b/crates/engine/tree/src/tree/payload_processor/executor.rs new file mode 100644 index 000000000000..33c1c33e507f --- /dev/null +++ b/crates/engine/tree/src/tree/payload_processor/executor.rs @@ -0,0 +1,60 @@ +use rayon::ThreadPool as RayonPool; +use std::sync::Arc; +use tokio::{runtime::Runtime, task::JoinHandle}; + +/// An executor for mixed I/O and CPU workloads. +#[derive(Debug, Clone)] +pub(crate) struct WorkloadExecutor { + inner: WorkloadExecutorInner, +} + +impl WorkloadExecutor { + /// Creates a new instance with default settings. + pub(crate) fn new() -> Self { + // Create runtime for I/O operations + let runtime = Arc::new(Runtime::new().unwrap()); + // Create Rayon thread pool for CPU work + let rayon_pool = Arc::new(rayon::ThreadPoolBuilder::new().build().unwrap()); + + Self { inner: WorkloadExecutorInner { runtime, rayon_pool } } + } + + /// Creates a new executor with the given number of threads for cpu bound work (rayon). + pub(super) fn with_num_cpu_threads(cpu_threads: usize) -> Self { + // Create runtime for I/O operations + let runtime = Arc::new(Runtime::new().unwrap()); + + // Create Rayon thread pool for CPU work + let rayon_pool = + Arc::new(rayon::ThreadPoolBuilder::new().num_threads(cpu_threads).build().unwrap()); + + Self { inner: WorkloadExecutorInner { runtime, rayon_pool } } + } + + /// Returns access to the tokio runtime + pub(super) fn runtime(&self) -> &Arc { + &self.inner.runtime + } + + /// Shorthand for [`Runtime::spawn_blocking`] + #[track_caller] + pub(super) fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.runtime().spawn_blocking(func) + } + + /// Returns access to the rayon pool + pub(super) fn rayon_pool(&self) -> &Arc { + &self.inner.rayon_pool + } +} + +#[derive(Debug, Clone)] +struct WorkloadExecutorInner { + // TODO: replace with main tokio handle instead or even task executor? + runtime: Arc, + rayon_pool: Arc, +} diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs new file mode 100644 index 000000000000..62176346f881 --- /dev/null +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -0,0 +1,335 @@ +//! Entrypoint for payload processing. +#![allow(dead_code)] // TODO remove + +use crate::tree::{ + cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache}, + payload_processor::{ + executor::WorkloadExecutor, + prewarm::{PrewarmContext, PrewarmTask, PrewarmTaskEvent}, + sparse_trie::SparseTrieTask, + }, + StateProviderBuilder, TreeConfig, +}; +use alloy_consensus::{transaction::Recovered, BlockHeader}; +use alloy_primitives::B256; +use multiproof::*; +use parking_lot::RwLock; +use reth_evm::{ConfigureEvm, ConfigureEvmEnvFor}; +use reth_primitives_traits::{NodePrimitives, RecoveredBlock}; +use reth_provider::{ + providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider, + StateProviderFactory, StateReader, +}; +use reth_revm::db::BundleState; +use reth_trie::TrieInput; +use std::sync::{ + mpsc, + mpsc::{channel, Sender}, + Arc, +}; + +pub(crate) mod executor; + +mod multiproof; +mod prewarm; +mod sparse_trie; + +/// Entrypoint for executing the payload. +pub(super) struct PayloadProcessor { + /// The executor used by to spawn tasks. + executor: WorkloadExecutor, + /// The most recent cache used for execution. + execution_cache: ExecutionCache, + /// Metrics for trie operations + trie_metrics: StateRootTaskMetrics, + /// Cross-block cache size in bytes. + cross_block_cache_size: u64, + /// Determines how to configure the evm for execution. + evm_config: Evm, + + _m: std::marker::PhantomData, +} + +impl PayloadProcessor { + pub(super) fn new(executor: WorkloadExecutor, evm_config: Evm, config: &TreeConfig) -> Self { + Self { + executor, + execution_cache: Default::default(), + trie_metrics: Default::default(), + cross_block_cache_size: config.cross_block_cache_size(), + evm_config, + _m: Default::default(), + } + } +} + +impl PayloadProcessor +where + N: NodePrimitives, + Evm: ConfigureEvmEnvFor + + 'static + + ConfigureEvm
+ + 'static, +{ + /// Executes the payload based on the configured settings. + pub(super) fn execute(&self) { + // TODO helpers for executing in sync? + } + + /// Spawns all background tasks and returns a handle connected to the tasks. + /// + /// - Transaction prewarming task + /// - State root task + /// - Sparse trie task + /// + /// # Transaction prewarming task + /// + /// Responsible for feeding state updates to the multi proof task. + /// + /// This task runs until: + /// - externally cancelled (e.g. sequential block execution is complete) + /// + /// ## Multi proof task + /// + /// Responsible for preparing sparse trie messages for the sparse trie task. + /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an + /// output back to this task. + /// + /// Receives updates from sequential execution. + /// This task runs until it receives a shutdown signal, which should be after after the block + /// was fully executed. + /// + /// ## Sparse trie task + /// + /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`]. + /// + /// This task runs until there are no further updates to process. + /// + /// + /// This returns a handle to await the final state root and to interact with the tasks (e.g. + /// canceling) + pub(super) fn spawn

( + &self, + block: RecoveredBlock, + consistent_view: ConsistentDbView

, + trie_input: TrieInput, + provider_builder: StateProviderBuilder, + ) -> PayloadHandle + where + P: DatabaseProviderFactory + + BlockReader + + StateProviderFactory + + StateReader + + StateCommitmentProvider + + Clone + + 'static, + { + let (to_sparse_trie, sparse_trie_rx) = channel(); + // spawn multiproof task + let state_root_config = StateRootConfig::new_from_input(consistent_view, trie_input); + let multi_proof_task = + StateRootTask2::new(state_root_config.clone(), self.executor.clone(), to_sparse_trie); + + // wire the multiproof task to the prewarm task + let to_multi_proof = Some(multi_proof_task.state_root_message_sender()); + + let prewarm_handle = + self.spawn_prewarming_with(block, provider_builder, to_multi_proof.clone()); + + // spawn multi-proof task + self.executor.spawn_blocking(move || { + multi_proof_task.run(); + }); + + let sparse_trie_task = SparseTrieTask { + executor: self.executor.clone(), + updates: sparse_trie_rx, + config: state_root_config, + metrics: self.trie_metrics.clone(), + + // TODO settings + max_concurrency: 4, + }; + + // wire the sparse trie to the state root response receiver + let (state_root_tx, state_root_rx) = channel(); + self.executor.spawn_blocking(move || { + let res = sparse_trie_task.run(); + let _ = state_root_tx.send(res); + }); + + PayloadHandle { to_multi_proof, prewarm_handle, state_root: Some(state_root_rx) } + } + + /// Spawn prewarming exclusively + pub(super) fn spawn_prewarming

( + &self, + block: RecoveredBlock, + provider_builder: StateProviderBuilder, + ) -> PrewarmTaskHandle + where + P: BlockReader + + StateProviderFactory + + StateReader + + StateCommitmentProvider + + Clone + + 'static, + { + self.spawn_prewarming_with(block, provider_builder, None) + } + + /// Spawn prewarming optionally wired to the multiproof task for target updates. + fn spawn_prewarming_with

( + &self, + block: RecoveredBlock, + provider_builder: StateProviderBuilder, + to_multi_proof: Option>, + ) -> PrewarmTaskHandle + where + P: BlockReader + + StateProviderFactory + + StateReader + + StateCommitmentProvider + + Clone + + 'static, + { + let (cache, cache_metrics) = self.cache_for(block.header().parent_hash()).split(); + // configure prewarming + let prewarm_ctx = PrewarmContext { + header: block.clone_sealed_header(), + evm_config: self.evm_config.clone(), + cache: cache.clone(), + cache_metrics, + provider: provider_builder, + }; + + let txs = block.transactions_recovered().map(Recovered::cloned).collect(); + let prewarm_task = PrewarmTask::new( + self.executor.clone(), + self.execution_cache.clone(), + prewarm_ctx, + to_multi_proof, + txs, + ); + let to_prewarm_task = prewarm_task.actions_tx(); + + // spawn pre-warm task + self.executor.spawn_blocking(move || { + prewarm_task.run(); + }); + PrewarmTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task) } + } + + /// Returns the cache for the given parent hash. + /// + /// If the given hash is different then what is recently cached, then this will create a new + /// instance. + fn cache_for(&self, parent_hash: B256) -> SavedCache { + self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| { + let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size); + SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed()) + }) + } +} + +/// Handle to all the spawned tasks. +pub(super) struct PayloadHandle { + /// Channel for evm state updates + to_multi_proof: Option>, + // must include the receiver of the state root wired to the sparse trie + prewarm_handle: PrewarmTaskHandle, + /// Receiver for the state root + state_root: Option>, +} + +impl PayloadHandle { + /// Awaits the state root + pub(super) fn state_root(&self) -> StateRootResult { + todo!() + } + + // TODO add state hook + // pub fn state_hook(&self) -> impl OnStateHook { + + /// Terminates the pre-warming transaction processing. + /// + /// Note: This does not terminate the task yet. + pub(super) fn stop_prewarming_execution(&self) { + self.prewarm_handle.stop_prewarming_execution() + } + + /// Terminates the entire pre-warming task. + /// + /// If the [`BundleState`] is provided it will update the shared cache. + pub(super) fn terminate_prewarming_execution(&mut self, block_output: Option) { + self.prewarm_handle.terminate_prewarming_execution(block_output) + } +} + +/// Access to the spawned [`PrewarmTask`]. +pub(crate) struct PrewarmTaskHandle { + /// The shared cache the task operates with. + cache: ProviderCaches, + /// Channel to the spawned prewarm task if any + to_prewarm_task: Option>, +} + +impl PrewarmTaskHandle { + /// Terminates the pre-warming transaction processing. + /// + /// Note: This does not terminate the task yet. + pub(super) fn stop_prewarming_execution(&self) { + self.to_prewarm_task + .as_ref() + .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok()); + } + + /// Terminates the entire pre-warming task. + /// + /// If the [`BundleState`] is provided it will update the shared cache. + pub(super) fn terminate_prewarming_execution(&mut self, block_output: Option) { + self.to_prewarm_task + .take() + .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok()); + } +} + +impl Drop for PrewarmTaskHandle { + fn drop(&mut self) { + // Ensure we always terminate on drop + self.terminate_prewarming_execution(None); + } +} + +/// Shared access to most recently used cache. +/// +/// This cache is intended to used for processing the payload in the following manner: +/// - Get Cache if the payload's parent block matches the parent block +/// - Update cache upon successful payload execution +/// +/// This process assumes that payloads are received sequentially. +#[derive(Clone, Debug, Default)] +struct ExecutionCache { + /// Guarded cloneable cache identified by a block hash. + inner: Arc>>, +} + +impl ExecutionCache { + /// Returns the cache if the currently store cache is for the given `parent_hash` + pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option { + let cache = self.inner.read(); + cache + .as_ref() + .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone())) + } + + /// Clears the tracked cashe + pub(crate) fn clear(&self) { + self.inner.write().take(); + } + + /// Stores the provider cache + pub(crate) fn save_cache(&self, cache: SavedCache) { + self.inner.write().replace(cache); + } +} diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs new file mode 100644 index 000000000000..6e83c2f121d1 --- /dev/null +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -0,0 +1,1426 @@ +//! State root task related functionality. + +use crate::tree::payload_processor::{executor::WorkloadExecutor, sparse_trie::SparseTrieEvent}; +use alloy_primitives::map::HashSet; +use derive_more::derive::Deref; +use metrics::Histogram; +use reth_errors::{ProviderError, ProviderResult}; +use reth_evm::system_calls::{OnStateHook, StateChangeSource}; +use reth_metrics::Metrics; +use reth_provider::{ + providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider, +}; +use reth_revm::state::EvmState; +use reth_trie::{ + prefix_set::TriePrefixSetsMut, + updates::{TrieUpdates, TrieUpdatesSorted}, + HashedPostState, HashedPostStateSorted, HashedStorage, MultiProof, MultiProofTargets, + TrieInput, +}; +use reth_trie_parallel::{proof::ParallelProof, root::ParallelStateRootError}; +use revm_primitives::{keccak256, B256}; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, + }, + time::{Duration, Instant}, +}; +use tracing::{debug, error, trace}; + +/// Outcome of the state root computation, including the state root itself with +/// the trie updates and the total time spent. +#[derive(Debug)] +pub(crate) struct StateRootComputeOutcome { + /// The computed state root and trie updates + pub state_root: (B256, TrieUpdates), + /// The total time spent calculating the state root + pub total_time: Duration, + /// The time spent calculating the state root since the last state update + pub time_from_last_update: Duration, +} + +/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the +/// state. +#[derive(Default, Debug)] +pub(super) struct SparseTrieUpdate { + /// The state update that was used to calculate the proof + pub(crate) state: HashedPostState, + /// The calculated multiproof + pub(crate) multiproof: MultiProof, +} + +impl SparseTrieUpdate { + /// Returns true if the update is empty. + pub(super) fn is_empty(&self) -> bool { + self.state.is_empty() && self.multiproof.is_empty() + } + + /// Construct update from multiproof. + pub(super) fn from_multiproof(multiproof: MultiProof) -> Self { + Self { multiproof, ..Default::default() } + } + + /// Extend update with contents of the other. + pub(super) fn extend(&mut self, other: Self) { + self.state.extend(other.state); + self.multiproof.extend(other.multiproof); + } +} + +/// Result of the state root calculation +pub(crate) type StateRootResult = Result; + +/// Common configuration for state root tasks +#[derive(Debug, Clone)] +pub(super) struct StateRootConfig { + /// View over the state in the database. + pub consistent_view: ConsistentDbView, + /// The sorted collection of cached in-memory intermediate trie nodes that + /// can be reused for computation. + pub nodes_sorted: Arc, + /// The sorted in-memory overlay hashed state. + pub state_sorted: Arc, + /// The collection of prefix sets for the computation. Since the prefix sets _always_ + /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here, + /// if we have cached nodes for them. + pub prefix_sets: Arc, +} + +impl StateRootConfig { + /// Creates a new state root config from the consistent view and the trie input. + pub(super) fn new_from_input( + consistent_view: ConsistentDbView, + input: TrieInput, + ) -> Self { + Self { + consistent_view, + nodes_sorted: Arc::new(input.nodes.into_sorted()), + state_sorted: Arc::new(input.state.into_sorted()), + prefix_sets: Arc::new(input.prefix_sets), + } + } +} + +/// Messages used internally by the state root task +#[derive(Debug)] +pub(super) enum StateRootMessage { + /// Prefetch proof targets + PrefetchProofs(MultiProofTargets), + /// New state update from transaction execution with its source + StateUpdate(StateChangeSource, EvmState), + /// Empty proof for a specific state update + EmptyProof { + /// The index of this proof in the sequence of state updates + sequence_number: u64, + /// The state update that was used to calculate the proof + state: HashedPostState, + }, + /// Proof calculation completed for a specific state update + ProofCalculated(Box), + /// Error during proof calculation + ProofCalculationError(ProviderError), + /// Signals state update stream end. + FinishedStateUpdates, +} + +/// Message about completion of proof calculation for a specific state update +#[derive(Debug)] +pub(super) struct ProofCalculated { + /// The index of this proof in the sequence of state updates + sequence_number: u64, + /// Sparse trie update + update: SparseTrieUpdate, + /// Total number of account targets + account_targets: usize, + /// Total number of storage slot targets + storage_targets: usize, + /// The time taken to calculate the proof. + elapsed: Duration, +} + +/// Whether or not a proof was fetched due to a state update, or due to a prefetch command. +#[derive(Debug)] +pub(super) enum ProofFetchSource { + /// The proof was fetched due to a prefetch command. + Prefetch, + /// The proof was fetched due to a state update. + StateUpdate, +} + +/// Handle to track proof calculation ordering +#[derive(Debug, Default)] +pub(crate) struct ProofSequencer { + /// The next proof sequence number to be produced. + next_sequence: u64, + /// The next sequence number expected to be delivered. + next_to_deliver: u64, + /// Buffer for out-of-order proofs and corresponding state updates + pending_proofs: BTreeMap, +} + +impl ProofSequencer { + /// Creates a new proof sequencer + pub(crate) fn new() -> Self { + Self::default() + } + + /// Gets the next sequence number and increments the counter + pub(crate) fn next_sequence(&mut self) -> u64 { + let seq = self.next_sequence; + self.next_sequence += 1; + seq + } + + /// Adds a proof with the corresponding state update and returns all sequential proofs and state + /// updates if we have a continuous sequence + pub(crate) fn add_proof( + &mut self, + sequence: u64, + update: SparseTrieUpdate, + ) -> Vec { + if sequence >= self.next_to_deliver { + self.pending_proofs.insert(sequence, update); + } + + // return early if we don't have the next expected proof + if !self.pending_proofs.contains_key(&self.next_to_deliver) { + return Vec::new() + } + + let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len()); + let mut current_sequence = self.next_to_deliver; + + // keep collecting proofs and state updates as long as we have consecutive sequence numbers + while let Some(pending) = self.pending_proofs.remove(¤t_sequence) { + consecutive_proofs.push(pending); + current_sequence += 1; + + // if we don't have the next number, stop collecting + if !self.pending_proofs.contains_key(¤t_sequence) { + break; + } + } + + self.next_to_deliver += consecutive_proofs.len() as u64; + + consecutive_proofs + } + + /// Returns true if we still have pending proofs + pub(crate) fn has_pending(&self) -> bool { + !self.pending_proofs.is_empty() + } +} + +/// A wrapper for the sender that signals completion when dropped +#[derive(Deref, Debug)] +pub(super) struct StateHookSender(Sender); + +impl StateHookSender { + pub(crate) const fn new(inner: Sender) -> Self { + Self(inner) + } +} + +impl Drop for StateHookSender { + fn drop(&mut self) { + // Send completion signal when the sender is dropped + let _ = self.0.send(StateRootMessage::FinishedStateUpdates); + } +} + +fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState { + let mut hashed_state = HashedPostState::with_capacity(update.len()); + + for (address, account) in update { + if account.is_touched() { + let hashed_address = keccak256(address); + trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update"); + + let destroyed = account.is_selfdestructed(); + let info = if destroyed { None } else { Some(account.info.into()) }; + hashed_state.accounts.insert(hashed_address, info); + + let mut changed_storage_iter = account + .storage + .into_iter() + .filter(|(_slot, value)| value.is_changed()) + .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value)) + .peekable(); + + if destroyed { + hashed_state.storages.insert(hashed_address, HashedStorage::new(true)); + } else if changed_storage_iter.peek().is_some() { + hashed_state + .storages + .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter)); + } + } + } + + hashed_state +} + +/// Input parameters for spawning a multiproof calculation. +#[derive(Debug)] +struct MultiproofInput { + config: StateRootConfig, + source: Option, + hashed_state_update: HashedPostState, + proof_targets: MultiProofTargets, + proof_sequence_number: u64, + state_root_message_sender: Sender, +} + +/// Manages concurrent multiproof calculations. +/// Takes care of not having more calculations in flight than a given thread +/// pool size, further calculation requests are queued and spawn later, after +/// availability has been signaled. +#[derive(Debug)] +struct MultiproofManager { + /// Maximum number of concurrent calculations. + max_concurrent: usize, + /// Currently running calculations. + inflight: usize, + /// Queued calculations. + pending: VecDeque>, + /// Executor for tassks + executor: WorkloadExecutor, +} + +impl MultiproofManager +where + Factory: + DatabaseProviderFactory + StateCommitmentProvider + Clone + 'static, +{ + /// Creates a new [`MultiproofManager`]. + fn new(executor: WorkloadExecutor, max_concurrent: usize) -> Self { + debug_assert!(max_concurrent != 0); + Self { + executor, + max_concurrent, + inflight: 0, + pending: VecDeque::with_capacity(max_concurrent), + } + } + + /// Spawns a new multiproof calculation or enqueues it for later if + /// `max_concurrent` are already inflight. + fn spawn_or_queue(&mut self, input: MultiproofInput) { + // If there are no proof targets, we can just send an empty multiproof back immediately + if input.proof_targets.is_empty() { + debug!( + sequence_number = input.proof_sequence_number, + "No proof targets, sending empty multiproof back immediately" + ); + let _ = input.state_root_message_sender.send(StateRootMessage::EmptyProof { + sequence_number: input.proof_sequence_number, + state: input.hashed_state_update, + }); + return + } + + if self.inflight >= self.max_concurrent { + self.pending.push_back(input); + return; + } + + self.spawn_multiproof(input); + } + + /// Signals that a multiproof calculation has finished and there's room to + /// spawn a new calculation if needed. + fn on_calculation_complete(&mut self) { + self.inflight = self.inflight.saturating_sub(1); + + if let Some(input) = self.pending.pop_front() { + self.spawn_multiproof(input); + } + } + + /// Spawns a single multiproof calculation task. + fn spawn_multiproof( + &mut self, + MultiproofInput { + config, + source, + hashed_state_update, + proof_targets, + proof_sequence_number, + state_root_message_sender, + }: MultiproofInput, + ) { + let executor = self.executor.clone(); + + self.executor.spawn_blocking(move || { + let account_targets = proof_targets.len(); + let storage_targets = proof_targets.values().map(|slots| slots.len()).sum(); + + trace!( + target: "engine::root", + proof_sequence_number, + ?proof_targets, + account_targets, + storage_targets, + "Starting multiproof calculation", + ); + let start = Instant::now(); + let result = calculate_multiproof(executor, config, proof_targets); + let elapsed = start.elapsed(); + trace!( + target: "engine::root", + proof_sequence_number, + ?elapsed, + ?source, + account_targets, + storage_targets, + "Multiproof calculated", + ); + + match result { + Ok(proof) => { + let _ = state_root_message_sender.send(StateRootMessage::ProofCalculated( + Box::new(ProofCalculated { + sequence_number: proof_sequence_number, + update: SparseTrieUpdate { + state: hashed_state_update, + multiproof: proof, + }, + account_targets, + storage_targets, + elapsed, + }), + )); + } + Err(error) => { + let _ = state_root_message_sender + .send(StateRootMessage::ProofCalculationError(error)); + } + } + }); + + self.inflight += 1; + } +} + +#[derive(Metrics, Clone)] +#[metrics(scope = "tree.root")] +pub(crate) struct StateRootTaskMetrics { + /// Histogram of proof calculation durations. + pub proof_calculation_duration_histogram: Histogram, + /// Histogram of proof calculation account targets. + pub proof_calculation_account_targets_histogram: Histogram, + /// Histogram of proof calculation storage targets. + pub proof_calculation_storage_targets_histogram: Histogram, + + /// Histogram of sparse trie update durations. + pub sparse_trie_update_duration_histogram: Histogram, + /// Histogram of sparse trie final update durations. + pub sparse_trie_final_update_duration_histogram: Histogram, + + /// Histogram of state updates received. + pub state_updates_received_histogram: Histogram, + /// Histogram of proofs processed. + pub proofs_processed_histogram: Histogram, + /// Histogram of state root update iterations. + pub state_root_iterations_histogram: Histogram, +} + +/// Standalone task that receives a transaction state stream and updates relevant +/// data structures to calculate state root. +/// +/// It is responsible of initializing a blinded sparse trie and subscribe to +/// transaction state stream. As it receives transaction execution results, it +/// fetches the proofs for relevant accounts from the database and reveal them +/// to the tree. +/// Then it updates relevant leaves according to the result of the transaction. +/// This feeds updates to the sparse trie task. +// TODO(mattsse): rename to MultiProofTask +#[derive(Debug)] +pub(super) struct StateRootTask2 { + /// Task configuration. + config: StateRootConfig, + /// Receiver for state root related messages. + rx: Receiver, + /// Sender for state root related messages. + tx: Sender, + /// Sender for state updates emitted by this type. + to_sparse_trie: Sender, + /// Proof targets that have been already fetched. + fetched_proof_targets: MultiProofTargets, + /// Proof sequencing handler. + proof_sequencer: ProofSequencer, + /// Reference to the executor used to spawn workloads. + executor: WorkloadExecutor, + /// Manages calculation of multiproofs. + multiproof_manager: MultiproofManager, + /// State root task metrics + metrics: StateRootTaskMetrics, +} + +impl StateRootTask2 +where + Factory: + DatabaseProviderFactory + StateCommitmentProvider + Clone + 'static, +{ + /// Creates a new state root task with the unified message channel + pub(super) fn new( + config: StateRootConfig, + executor: WorkloadExecutor, + to_sparse_trie: Sender, + ) -> Self { + let (tx, rx) = channel(); + Self { + config, + rx, + tx, + to_sparse_trie, + fetched_proof_targets: Default::default(), + proof_sequencer: ProofSequencer::new(), + executor: executor.clone(), + // TODO settings + multiproof_manager: MultiproofManager::new(executor, 4), + metrics: StateRootTaskMetrics::default(), + } + } + + /// Returns a [`Sender`] that can be used to send arbitrary [`StateRootMessage`]s to this task. + pub(super) fn state_root_message_sender(&self) -> Sender { + self.tx.clone() + } + + /// Returns a [`StateHookSender`] that can be used to send state updates to this task. + pub(super) fn state_hook_sender(&self) -> StateHookSender { + StateHookSender::new(self.tx.clone()) + } + + /// Returns a state hook to be used to send state updates to this task. + pub(super) fn state_hook(&self) -> impl OnStateHook { + let state_hook = self.state_hook_sender(); + + move |source: StateChangeSource, state: &EvmState| { + if let Err(error) = + state_hook.send(StateRootMessage::StateUpdate(source, state.clone())) + { + error!(target: "engine::root", ?error, "Failed to send state update"); + } + } + } + + /// Handles request for proof prefetch. + fn on_prefetch_proof(&mut self, targets: MultiProofTargets) { + let proof_targets = self.get_prefetch_proof_targets(targets); + extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets); + + self.multiproof_manager.spawn_or_queue(MultiproofInput { + config: self.config.clone(), + source: None, + hashed_state_update: Default::default(), + proof_targets, + proof_sequence_number: self.proof_sequencer.next_sequence(), + state_root_message_sender: self.tx.clone(), + }); + } + + /// Calls `get_proof_targets` with existing proof targets for prefetching. + fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets { + // Here we want to filter out any targets that are already fetched + // + // This means we need to remove any storage slots that have already been fetched + let mut duplicates = 0; + + // First remove all storage targets that are subsets of already fetched storage slots + targets.retain(|hashed_address, target_storage| { + let keep = self + .fetched_proof_targets + .get(hashed_address) + // do NOT remove if None, because that means the account has not been fetched yet + .is_none_or(|fetched_storage| { + // remove if a subset + !target_storage.is_subset(fetched_storage) + }); + + if !keep { + duplicates += target_storage.len(); + } + + keep + }); + + // For all non-subset remaining targets, we have to calculate the difference + for (hashed_address, target_storage) in &mut targets { + let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else { + // this means the account has not been fetched yet, so we must fetch everything + // associated with this account + continue + }; + + let prev_target_storage_len = target_storage.len(); + + // keep only the storage slots that have not been fetched yet + // + // we already removed subsets, so this should only remove duplicates + target_storage.retain(|slot| !fetched_storage.contains(slot)); + + duplicates += prev_target_storage_len - target_storage.len(); + } + + if duplicates > 0 { + trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets"); + } + + targets + } + + /// Handles state updates. + /// + /// Returns proof targets derived from the state update. + fn on_state_update( + &mut self, + source: StateChangeSource, + update: EvmState, + proof_sequence_number: u64, + ) { + let hashed_state_update = evm_state_to_hashed_post_state(update); + let proof_targets = get_proof_targets(&hashed_state_update, &self.fetched_proof_targets); + extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets); + + self.multiproof_manager.spawn_or_queue(MultiproofInput { + config: self.config.clone(), + source: Some(source), + hashed_state_update, + proof_targets, + proof_sequence_number, + state_root_message_sender: self.tx.clone(), + }); + } + + /// Handler for new proof calculated, aggregates all the existing sequential proofs. + fn on_proof( + &mut self, + sequence_number: u64, + update: SparseTrieUpdate, + ) -> Option { + let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update); + + ready_proofs + .into_iter() + // Merge all ready proofs and state updates + .reduce(|mut acc_update, update| { + acc_update.extend(update); + acc_update + }) + // Return None if the resulting proof is empty + .filter(|proof| !proof.is_empty()) + } + + /// Starts the main loop that handles all incoming messages, fetches proofs, applies them to the + /// sparse trie, updates the sparse trie, and eventually returns the state root. + /// + /// The lifecycle is the following: + /// 1. Either [`StateRootMessage::PrefetchProofs`] or [`StateRootMessage::StateUpdate`] is + /// received from the engine. + /// * For [`StateRootMessage::StateUpdate`], the state update is hashed with + /// [`evm_state_to_hashed_post_state`], and then (proof targets)[`MultiProofTargets`] are + /// extracted with [`get_proof_targets`]. + /// * For both messages, proof targets are deduplicated according to `fetched_proof_targets`, + /// so that the proofs for accounts and storage slots that were already fetched are not + /// requested again. + /// 2. Using the proof targets, a new multiproof is calculated using + /// [`MultiproofManager::spawn_or_queue`]. + /// * If the list of proof targets is empty, the [`StateRootMessage::EmptyProof`] message is + /// sent back to this task along with the original state update. + /// * Otherwise, the multiproof is calculated and the [`StateRootMessage::ProofCalculated`] + /// message is sent back to this task along with the resulting multiproof, proof targets + /// and original state update. + /// 3. Either [`StateRootMessage::EmptyProof`] or [`StateRootMessage::ProofCalculated`] is + /// received. + /// * The multiproof is added to the (proof sequencer)[`ProofSequencer`]. + /// * If the proof sequencer has a contiguous sequence of multiproofs in the same order as + /// state updates arrived (i.e. transaction order), such sequence is returned. + /// 4. Once there's a sequence of contiguous multiproofs along with the proof targets and state + /// updates associated with them, a [`SparseTrieUpdate`] is generated and sent to the sparse + /// trie task that's running in [`run_sparse_trie`]. + /// * Sparse trie task reveals the multiproof, updates the sparse trie, computes storage trie + /// roots, and calculates RLP nodes of the state trie below + /// [`SPARSE_TRIE_INCREMENTAL_LEVEL`]. + /// 5. Steps above are repeated until this task receives a + /// [`StateRootMessage::FinishedStateUpdates`]. + /// * Once this message is received, on every [`StateRootMessage::EmptyProof`] and + /// [`StateRootMessage::ProofCalculated`] message, we check if there are any proofs are + /// currently being calculated, or if there are any pending proofs in the proof sequencer + /// left to be revealed using [`check_end_condition`]. + /// * If there are none left, we drop the sparse trie task sender channel, and it signals + /// [`run_sparse_trie`] to calculate the state root of the full state trie, and send it + /// back to this task via [`StateRootMessage::RootCalculated`] message. + /// 6. On [`StateRootMessage::RootCalculated`] message, the loop exits and the the state root is + /// returned. + pub(crate) fn run(mut self) { + let mut prefetch_proofs_received = 0; + let mut updates_received = 0; + let mut proofs_processed = 0; + + let mut updates_finished = false; + + // Timestamp when the first state update was received + let mut _first_update_time = None; + // Timestamp when the last state update was received + let mut _last_update_time = None; + + loop { + trace!(target: "engine::root", "entering main channel receiving loop"); + match self.rx.recv() { + Ok(message) => match message { + StateRootMessage::PrefetchProofs(targets) => { + trace!(target: "engine::root", "processing StateRootMessage::PrefetchProofs"); + prefetch_proofs_received += 1; + debug!( + target: "engine::root", + targets = targets.len(), + storage_targets = targets.values().map(|slots| + slots.len()).sum::(), + total_prefetches = prefetch_proofs_received, + "Prefetching proofs" + ); + self.on_prefetch_proof(targets); + } + StateRootMessage::StateUpdate(source, update) => { + trace!(target: "engine::root", "processing + StateRootMessage::StateUpdate"); + if updates_received == 0 { + _first_update_time = Some(Instant::now()); + debug!(target: "engine::root", "Started state root calculation"); + } + _last_update_time = Some(Instant::now()); + + updates_received += 1; + debug!( + target: "engine::root", + ?source, + len = update.len(), + total_updates = updates_received, + "Received new state update" + ); + let next_sequence = self.proof_sequencer.next_sequence(); + self.on_state_update(source, update, next_sequence); + } + StateRootMessage::FinishedStateUpdates => { + trace!(target: "engine::root", "processing StateRootMessage::FinishedStateUpdates"); + updates_finished = true; + + if check_end_condition(CheckEndConditionParams { + proofs_processed, + updates_received, + prefetch_proofs_received, + updates_finished, + proof_sequencer: &self.proof_sequencer, + }) { + debug!( + target: "engine::root", + "State updates finished and all proofs processed, ending calculation" + ); + }; + break + } + StateRootMessage::EmptyProof { sequence_number, state } => { + trace!(target: "engine::root", "processing StateRootMessage::EmptyProof"); + + proofs_processed += 1; + + if let Some(combined_update) = self.on_proof( + sequence_number, + SparseTrieUpdate { state, multiproof: MultiProof::default() }, + ) { + let _ = self.to_sparse_trie.send(combined_update); + } + + if check_end_condition(CheckEndConditionParams { + proofs_processed, + updates_received, + prefetch_proofs_received, + updates_finished, + proof_sequencer: &self.proof_sequencer, + }) { + debug!( + target: "engine::root", + "State updates finished and all proofs processed, ending calculation" + ); + + break + }; + } + StateRootMessage::ProofCalculated(proof_calculated) => { + trace!(target: "engine::root", "processing + StateRootMessage::ProofCalculated"); + + // we increment proofs_processed for both state updates and prefetches, + // because both are used for the root termination condition. + proofs_processed += 1; + + self.metrics + .proof_calculation_duration_histogram + .record(proof_calculated.elapsed); + self.metrics + .proof_calculation_account_targets_histogram + .record(proof_calculated.account_targets as f64); + self.metrics + .proof_calculation_storage_targets_histogram + .record(proof_calculated.storage_targets as f64); + + debug!( + target: "engine::root", + sequence = proof_calculated.sequence_number, + total_proofs = proofs_processed, + "Processing calculated proof" + ); + + self.multiproof_manager.on_calculation_complete(); + + if let Some(combined_update) = + self.on_proof(proof_calculated.sequence_number, proof_calculated.update) + { + let _ = self.to_sparse_trie.send(combined_update); + } + + if check_end_condition(CheckEndConditionParams { + proofs_processed, + updates_received, + prefetch_proofs_received, + updates_finished, + proof_sequencer: &self.proof_sequencer, + }) { + debug!( + target: "engine::root", + "State updates finished and all proofs processed, ending calculation"); + break + }; + } + StateRootMessage::ProofCalculationError(_) => { + // TODO send error + // return Err(ParallelStateRootError::Other(format!( + // "could not calculate multiproof: {e:?}" + // ))) + return + } + }, + Err(_) => { + // this means our internal message channel is closed, which shouldn't happen + // in normal operation since we hold both ends + error!( + target: "engine::root", + "Internal message channel closed unexpectedly" + ); + // TODO send error + + // return Err(ParallelStateRootError::Other( + // "Internal message channel closed unexpectedly".into(), + // )); + } + } + } + } +} + +/// Convenience params struct to pass to [`check_end_condition`]. +struct CheckEndConditionParams<'a> { + proofs_processed: u64, + updates_received: u64, + prefetch_proofs_received: u64, + updates_finished: bool, + proof_sequencer: &'a ProofSequencer, +} + +// Returns true if all state updates finished and all profs processed. +fn check_end_condition( + CheckEndConditionParams { + proofs_processed, + updates_received, + prefetch_proofs_received, + updates_finished, + proof_sequencer, + }: CheckEndConditionParams<'_>, +) -> bool { + let all_proofs_received = proofs_processed >= updates_received + prefetch_proofs_received; + let no_pending = !proof_sequencer.has_pending(); + debug!( + target: "engine::root", + proofs_processed, + updates_received, + prefetch_proofs_received, + no_pending, + updates_finished, + "Checking end condition" + ); + all_proofs_received && no_pending && updates_finished +} + +/// Returns accounts only with those storages that were not already fetched, and +/// if there are no such storages and the account itself was already fetched, the +/// account shouldn't be included. +fn get_proof_targets( + state_update: &HashedPostState, + fetched_proof_targets: &MultiProofTargets, +) -> MultiProofTargets { + let mut targets = MultiProofTargets::default(); + + // first collect all new accounts (not previously fetched) + for &hashed_address in state_update.accounts.keys() { + if !fetched_proof_targets.contains_key(&hashed_address) { + targets.insert(hashed_address, HashSet::default()); + } + } + + // then process storage slots for all accounts in the state update + for (hashed_address, storage) in &state_update.storages { + let fetched = fetched_proof_targets.get(hashed_address); + let mut changed_slots = storage + .storage + .keys() + .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot))) + .peekable(); + + if changed_slots.peek().is_some() { + targets.entry(*hashed_address).or_default().extend(changed_slots); + } + } + + targets +} + +/// Calculate multiproof for the targets. +#[inline] +fn calculate_multiproof( + executor: WorkloadExecutor, + config: StateRootConfig, + proof_targets: MultiProofTargets, +) -> ProviderResult +where + Factory: + DatabaseProviderFactory + StateCommitmentProvider + Clone + 'static, +{ + Ok(ParallelProof::new( + config.consistent_view, + config.nodes_sorted, + config.state_sorted, + config.prefix_sets, + executor.rayon_pool().clone(), + ) + .with_branch_node_masks(true) + .multiproof(proof_targets)?) +} + +fn extend_multi_proof_targets_ref(targets: &mut MultiProofTargets, other: &MultiProofTargets) { + for (address, slots) in other { + targets.entry(*address).or_default().extend(slots); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::map::B256Set; + use reth_evm::system_calls::StateChangeSource; + use reth_primitives_traits::{Account as RethAccount, StorageEntry}; + use reth_provider::{ + providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter, + }; + use reth_testing_utils::generators::{self, Rng}; + use reth_trie::{test_utils::state_root, TrieInput}; + use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256}; + use revm_state::{ + Account as RevmAccount, AccountInfo, AccountStatus, EvmState, EvmStorageSlot, + }; + use std::sync::Arc; + + fn convert_revm_to_reth_account(revm_account: &RevmAccount) -> RethAccount { + RethAccount { + balance: revm_account.info.balance, + nonce: revm_account.info.nonce, + bytecode_hash: if revm_account.info.code_hash == KECCAK_EMPTY { + None + } else { + Some(revm_account.info.code_hash) + }, + } + } + + fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec { + let mut rng = generators::rng(); + let all_addresses: Vec

= (0..num_accounts).map(|_| rng.gen()).collect(); + let mut updates = Vec::new(); + + for _ in 0..updates_per_account { + let num_accounts_in_update = rng.gen_range(1..=num_accounts); + let mut state_update = EvmState::default(); + + let selected_addresses = &all_addresses[0..num_accounts_in_update]; + + for &address in selected_addresses { + let mut storage = HashMap::default(); + if rng.gen_bool(0.7) { + for _ in 0..rng.gen_range(1..10) { + let slot = U256::from(rng.gen::()); + storage.insert( + slot, + EvmStorageSlot::new_changed(U256::ZERO, U256::from(rng.gen::())), + ); + } + } + + let account = RevmAccount { + info: AccountInfo { + balance: U256::from(rng.gen::()), + nonce: rng.gen::(), + code_hash: KECCAK_EMPTY, + code: Some(Default::default()), + }, + storage, + status: AccountStatus::Touched, + }; + + state_update.insert(address, account); + } + + updates.push(state_update); + } + + updates + } + + fn create_state_root_config(factory: F, input: TrieInput) -> StateRootConfig + where + F: DatabaseProviderFactory + + StateCommitmentProvider + + Clone + + 'static, + { + let consistent_view = ConsistentDbView::new(factory, None); + let nodes_sorted = Arc::new(input.nodes.clone().into_sorted()); + let state_sorted = Arc::new(input.state.clone().into_sorted()); + let prefix_sets = Arc::new(input.prefix_sets); + + StateRootConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets } + } + + fn create_test_state_root_task(factory: F) -> StateRootTask2 + where + F: DatabaseProviderFactory + + StateCommitmentProvider + + Clone + + 'static, + { + let executor = WorkloadExecutor::with_num_cpu_threads(2); + let config = create_state_root_config(factory, TrieInput::default()); + let channel = channel(); + + StateRootTask2::new(config, executor, channel.0) + } + + #[test] + fn test_state_root_task() { + reth_tracing::init_test_tracing(); + + let factory = create_test_provider_factory(); + + let state_updates = create_mock_state_updates(10, 10); + let mut hashed_state = HashedPostState::default(); + let mut accumulated_state: HashMap)> = + HashMap::default(); + + { + let provider_rw = factory.provider_rw().expect("failed to get provider"); + + for update in &state_updates { + let account_updates = update.iter().map(|(address, account)| { + (*address, Some(convert_revm_to_reth_account(account))) + }); + provider_rw + .insert_account_for_hashing(account_updates) + .expect("failed to insert accounts"); + + let storage_updates = update.iter().map(|(address, account)| { + let storage_entries = account.storage.iter().map(|(slot, value)| { + StorageEntry { key: B256::from(*slot), value: value.present_value } + }); + (*address, storage_entries) + }); + provider_rw + .insert_storage_for_hashing(storage_updates) + .expect("failed to insert storage"); + } + provider_rw.commit().expect("failed to commit changes"); + } + + for update in &state_updates { + hashed_state.extend(evm_state_to_hashed_post_state(update.clone())); + + for (address, account) in update { + let storage: HashMap = account + .storage + .iter() + .map(|(k, v)| (B256::from(*k), v.present_value)) + .collect(); + + let entry = accumulated_state.entry(*address).or_default(); + entry.0 = convert_revm_to_reth_account(account); + entry.1.extend(storage); + } + } + + let input = TrieInput::from_state(hashed_state); + let nodes_sorted = Arc::new(input.nodes.clone().into_sorted()); + let state_sorted = Arc::new(input.state.clone().into_sorted()); + let config = StateRootConfig { + consistent_view: ConsistentDbView::new(factory, None), + nodes_sorted, + state_sorted, + prefix_sets: Arc::new(input.prefix_sets), + }; + + let executor = WorkloadExecutor::with_num_cpu_threads(2); + + let task = StateRootTask2::new(config, executor); + let mut state_hook = task.state_hook(); + let handle = task.spawn(); + + for (i, update) in state_updates.into_iter().enumerate() { + state_hook.on_state(StateChangeSource::Transaction(i), &update); + } + drop(state_hook); + + let (root_from_task, _) = handle.wait_for_result().expect("task failed").state_root; + let root_from_base = state_root(accumulated_state); + + assert_eq!( + root_from_task, root_from_base, + "State root mismatch: task={root_from_task:?}, base={root_from_base:?}" + ); + } + + #[test] + fn test_add_proof_in_sequence() { + let mut sequencer = ProofSequencer::new(); + let proof1 = MultiProof::default(); + let proof2 = MultiProof::default(); + sequencer.next_sequence = 2; + + let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1)); + assert_eq!(ready.len(), 1); + assert!(!sequencer.has_pending()); + + let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2)); + assert_eq!(ready.len(), 1); + assert!(!sequencer.has_pending()); + } + + #[test] + fn test_add_proof_out_of_order() { + let mut sequencer = ProofSequencer::new(); + let proof1 = MultiProof::default(); + let proof2 = MultiProof::default(); + let proof3 = MultiProof::default(); + sequencer.next_sequence = 3; + + let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3)); + assert_eq!(ready.len(), 0); + assert!(sequencer.has_pending()); + + let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1)); + assert_eq!(ready.len(), 1); + assert!(sequencer.has_pending()); + + let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2)); + assert_eq!(ready.len(), 2); + assert!(!sequencer.has_pending()); + } + + #[test] + fn test_add_proof_with_gaps() { + let mut sequencer = ProofSequencer::new(); + let proof1 = MultiProof::default(); + let proof3 = MultiProof::default(); + sequencer.next_sequence = 3; + + let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1)); + assert_eq!(ready.len(), 1); + + let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3)); + assert_eq!(ready.len(), 0); + assert!(sequencer.has_pending()); + } + + #[test] + fn test_add_proof_duplicate_sequence() { + let mut sequencer = ProofSequencer::new(); + let proof1 = MultiProof::default(); + let proof2 = MultiProof::default(); + + let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1)); + assert_eq!(ready.len(), 1); + + let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2)); + assert_eq!(ready.len(), 0); + assert!(!sequencer.has_pending()); + } + + #[test] + fn test_add_proof_batch_processing() { + let mut sequencer = ProofSequencer::new(); + let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect(); + sequencer.next_sequence = 5; + + sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone())); + sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone())); + sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone())); + sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone())); + + let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone())); + assert_eq!(ready.len(), 5); + assert!(!sequencer.has_pending()); + } + + fn create_get_proof_targets_state() -> HashedPostState { + let mut state = HashedPostState::default(); + + let addr1 = B256::random(); + let addr2 = B256::random(); + state.accounts.insert(addr1, Some(Default::default())); + state.accounts.insert(addr2, Some(Default::default())); + + let mut storage = HashedStorage::default(); + let slot1 = B256::random(); + let slot2 = B256::random(); + storage.storage.insert(slot1, U256::ZERO); + storage.storage.insert(slot2, U256::from(1)); + state.storages.insert(addr1, storage); + + state + } + + #[test] + fn test_get_proof_targets_new_account_targets() { + let state = create_get_proof_targets_state(); + let fetched = MultiProofTargets::default(); + + let targets = get_proof_targets(&state, &fetched); + + // should return all accounts as targets since nothing was fetched before + assert_eq!(targets.len(), state.accounts.len()); + for addr in state.accounts.keys() { + assert!(targets.contains_key(addr)); + } + } + + #[test] + fn test_get_proof_targets_new_storage_targets() { + let state = create_get_proof_targets_state(); + let fetched = MultiProofTargets::default(); + + let targets = get_proof_targets(&state, &fetched); + + // verify storage slots are included for accounts with storage + for (addr, storage) in &state.storages { + assert!(targets.contains_key(addr)); + let target_slots = &targets[addr]; + assert_eq!(target_slots.len(), storage.storage.len()); + for slot in storage.storage.keys() { + assert!(target_slots.contains(slot)); + } + } + } + + #[test] + fn test_get_proof_targets_filter_already_fetched_accounts() { + let state = create_get_proof_targets_state(); + let mut fetched = MultiProofTargets::default(); + + // select an account that has no storage updates + let fetched_addr = state + .accounts + .keys() + .find(|&&addr| !state.storages.contains_key(&addr)) + .expect("Should have an account without storage"); + + // mark the account as already fetched + fetched.insert(*fetched_addr, HashSet::default()); + + let targets = get_proof_targets(&state, &fetched); + + // should not include the already fetched account since it has no storage updates + assert!(!targets.contains_key(fetched_addr)); + // other accounts should still be included + assert_eq!(targets.len(), state.accounts.len() - 1); + } + + #[test] + fn test_get_proof_targets_filter_already_fetched_storage() { + let state = create_get_proof_targets_state(); + let mut fetched = MultiProofTargets::default(); + + // mark one storage slot as already fetched + let (addr, storage) = state.storages.iter().next().unwrap(); + let mut fetched_slots = HashSet::default(); + let fetched_slot = *storage.storage.keys().next().unwrap(); + fetched_slots.insert(fetched_slot); + fetched.insert(*addr, fetched_slots); + + let targets = get_proof_targets(&state, &fetched); + + // should not include the already fetched storage slot + let target_slots = &targets[addr]; + assert!(!target_slots.contains(&fetched_slot)); + assert_eq!(target_slots.len(), storage.storage.len() - 1); + } + + #[test] + fn test_get_proof_targets_empty_state() { + let state = HashedPostState::default(); + let fetched = MultiProofTargets::default(); + + let targets = get_proof_targets(&state, &fetched); + + assert!(targets.is_empty()); + } + + #[test] + fn test_get_proof_targets_mixed_fetched_state() { + let mut state = HashedPostState::default(); + let mut fetched = MultiProofTargets::default(); + + let addr1 = B256::random(); + let addr2 = B256::random(); + let slot1 = B256::random(); + let slot2 = B256::random(); + + state.accounts.insert(addr1, Some(Default::default())); + state.accounts.insert(addr2, Some(Default::default())); + + let mut storage = HashedStorage::default(); + storage.storage.insert(slot1, U256::ZERO); + storage.storage.insert(slot2, U256::from(1)); + state.storages.insert(addr1, storage); + + let mut fetched_slots = HashSet::default(); + fetched_slots.insert(slot1); + fetched.insert(addr1, fetched_slots); + + let targets = get_proof_targets(&state, &fetched); + + assert!(targets.contains_key(&addr2)); + assert!(!targets[&addr1].contains(&slot1)); + assert!(targets[&addr1].contains(&slot2)); + } + + #[test] + fn test_get_proof_targets_unmodified_account_with_storage() { + let mut state = HashedPostState::default(); + let fetched = MultiProofTargets::default(); + + let addr = B256::random(); + let slot1 = B256::random(); + let slot2 = B256::random(); + + // don't add the account to state.accounts (simulating unmodified account) + // but add storage updates for this account + let mut storage = HashedStorage::default(); + storage.storage.insert(slot1, U256::from(1)); + storage.storage.insert(slot2, U256::from(2)); + state.storages.insert(addr, storage); + + assert!(!state.accounts.contains_key(&addr)); + assert!(!fetched.contains_key(&addr)); + + let targets = get_proof_targets(&state, &fetched); + + // verify that we still get the storage slots for the unmodified account + assert!(targets.contains_key(&addr)); + + let target_slots = &targets[&addr]; + assert_eq!(target_slots.len(), 2); + assert!(target_slots.contains(&slot1)); + assert!(target_slots.contains(&slot2)); + } + + #[test] + fn test_get_prefetch_proof_targets_no_duplicates() { + let test_provider_factory = create_test_provider_factory(); + let mut test_state_root_task = create_test_state_root_task(test_provider_factory); + + // populate some targets + let mut targets = MultiProofTargets::default(); + let addr1 = B256::random(); + let addr2 = B256::random(); + let slot1 = B256::random(); + let slot2 = B256::random(); + targets.insert(addr1, vec![slot1].into_iter().collect()); + targets.insert(addr2, vec![slot2].into_iter().collect()); + + let prefetch_proof_targets = + test_state_root_task.get_prefetch_proof_targets(targets.clone()); + + // check that the prefetch proof targets are the same because there are no fetched proof + // targets yet + assert_eq!(prefetch_proof_targets, targets); + + // add a different addr and slot to fetched proof targets + let addr3 = B256::random(); + let slot3 = B256::random(); + test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect()); + + let prefetch_proof_targets = + test_state_root_task.get_prefetch_proof_targets(targets.clone()); + + // check that the prefetch proof targets are the same because the fetched proof targets + // don't overlap with the prefetch targets + assert_eq!(prefetch_proof_targets, targets); + } + + #[test] + fn test_get_prefetch_proof_targets_remove_subset() { + let test_provider_factory = create_test_provider_factory(); + let mut test_state_root_task = create_test_state_root_task(test_provider_factory); + + // populate some targe + let mut targets = MultiProofTargets::default(); + let addr1 = B256::random(); + let addr2 = B256::random(); + let slot1 = B256::random(); + let slot2 = B256::random(); + targets.insert(addr1, vec![slot1].into_iter().collect()); + targets.insert(addr2, vec![slot2].into_iter().collect()); + + // add a subset of the first target to fetched proof targets + test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect()); + + let prefetch_proof_targets = + test_state_root_task.get_prefetch_proof_targets(targets.clone()); + + // check that the prefetch proof targets do not include the subset + assert_eq!(prefetch_proof_targets.len(), 1); + assert!(!prefetch_proof_targets.contains_key(&addr1)); + assert!(prefetch_proof_targets.contains_key(&addr2)); + + // now add one more slot to the prefetch targets + let slot3 = B256::random(); + targets.get_mut(&addr1).unwrap().insert(slot3); + + let prefetch_proof_targets = + test_state_root_task.get_prefetch_proof_targets(targets.clone()); + + // check that the prefetch proof targets do not include the subset + // but include the new slot + assert_eq!(prefetch_proof_targets.len(), 2); + assert!(prefetch_proof_targets.contains_key(&addr1)); + assert_eq!( + *prefetch_proof_targets.get(&addr1).unwrap(), + vec![slot3].into_iter().collect::() + ); + assert!(prefetch_proof_targets.contains_key(&addr2)); + assert_eq!( + *prefetch_proof_targets.get(&addr2).unwrap(), + vec![slot2].into_iter().collect::() + ); + } +} diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs new file mode 100644 index 000000000000..5cfc7afdc6e8 --- /dev/null +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -0,0 +1,300 @@ +use crate::tree::{ + cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache}, + payload_processor::{executor::WorkloadExecutor, multiproof::StateRootMessage, ExecutionCache}, + StateProviderBuilder, +}; +use alloy_consensus::transaction::Recovered; +use alloy_primitives::{keccak256, map::B256Set, B256}; +use reth_evm::{ConfigureEvm, ConfigureEvmEnvFor, Evm}; +use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, SignedTransaction}; +use reth_provider::{BlockReader, StateCommitmentProvider, StateProviderFactory, StateReader}; +use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState}; +use reth_trie::MultiProofTargets; +use std::{ + collections::VecDeque, + sync::mpsc::{channel, Receiver, Sender}, +}; +use tracing::trace; + +/// A task that executes transactions individually in parallel. +/// +/// Note: This task runs until cancelled externally. +pub(super) struct PrewarmTask { + /// The executor used to spawn execution tasks. + executor: WorkloadExecutor, + /// Shared execution cache. + execution_cache: ExecutionCache, + /// Transactions pending execution. + pending: VecDeque>, + /// Context provided to execution tasks + ctx: PrewarmContext, + /// How many txs are currently in progress + in_progress: usize, + /// How many transactions should be executed in parallel + max_concurrency: usize, + /// Sender to emit evm state outcome messages, if any. + to_multi_proof: Option>, + /// Receiver for events produced by tx execution + actions_rx: Receiver, + /// Sender the transactions use to send their result back + actions_tx: Sender, +} + +impl PrewarmTask +where + N: NodePrimitives, + P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static, + Evm: ConfigureEvmEnvFor + + 'static + + ConfigureEvm
+ + 'static, +{ + /// Intializes the task with the given transactions pending execution + pub(super) fn new( + executor: WorkloadExecutor, + execution_cache: ExecutionCache, + ctx: PrewarmContext, + to_multi_proof: Option>, + pending: VecDeque>, + ) -> Self { + let (actions_tx, actions_rx) = channel(); + Self { + executor, + execution_cache, + pending, + ctx, + in_progress: 0, + // TODO settings + max_concurrency: 4, + to_multi_proof, + actions_rx, + actions_tx, + } + } + + pub(super) fn actions_tx(&self) -> Sender { + self.actions_tx.clone() + } + + /// Spawns the next transactions + fn spawn_next(&mut self) { + while self.in_progress < self.max_concurrency { + if let Some(tx) = self.pending.pop_front() { + self.spawn_transaction(tx); + } else { + break + } + } + } + + /// Spawns the given transaction as a blocking task. + fn spawn_transaction(&self, tx: Recovered) { + let ctx = self.ctx.clone(); + let actions_tx = self.actions_tx.clone(); + let prepare_proof_targets = self.should_prepare_multi_proof_targets(); + self.executor.spawn_blocking(move || { + // depending on whether this task needs he proof targets we either just transact or + // transact and prepare the targets + let proof_targets = if prepare_proof_targets { + ctx.prepare_multiproof_targets(tx) + } else { + ctx.transact(tx); + None + }; + let _ = actions_tx.send(PrewarmTaskEvent::Outcome { proof_targets }); + }); + } + + /// Returns true if the tx prewarming tasks should prepare multiproof targets. + fn should_prepare_multi_proof_targets(&self) -> bool { + self.to_multi_proof.is_some() + } + + /// If configured and the tx returned proof targets, emit the targets the transaction produced + fn send_multi_proof_targets(&self, targets: Option) { + if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) { + let _ = to_multi_proof.send(StateRootMessage::PrefetchProofs(proof_targets)); + } + } + + /// Save the state to the shared cache for the given block. + fn save_cache(&self, state: BundleState) { + let cache = SavedCache::new( + self.ctx.header.hash(), + self.ctx.cache.clone(), + self.ctx.cache_metrics.clone(), + ); + if cache.cache().insert_state(&state).is_err() { + return + } + + // TODO: update metrics + + // update the cache for the executed block + self.execution_cache.save_cache(cache); + } + + /// Executes the task. + /// + /// This will execute the transactions until all transactions have been processed or the task + /// was cancelled. + pub(super) fn run(mut self) { + // spawn execution tasks. + self.spawn_next(); + + while let Ok(event) = self.actions_rx.recv() { + match event { + PrewarmTaskEvent::TerminateTransactionExecution => { + // stop tx processing + self.pending.clear(); + } + PrewarmTaskEvent::Outcome { proof_targets } => { + // completed a transaction, frees up one slot + self.in_progress -= 1; + self.send_multi_proof_targets(proof_targets); + } + PrewarmTaskEvent::Terminate { block_output } => { + // terminate the task + if let Some(state) = block_output { + self.save_cache(state); + } + + break + } + } + + // schedule followup transactions + self.spawn_next(); + } + } +} + +/// Context required by tx execution tasks. +#[derive(Debug, Clone)] +pub(super) struct PrewarmContext { + pub(super) header: SealedHeaderFor, + pub(super) evm_config: Evm, + pub(super) cache: ProviderCaches, + pub(super) cache_metrics: CachedStateMetrics, + /// Provider to obtain the state + pub(super) provider: StateProviderBuilder, +} + +impl PrewarmContext +where + N: NodePrimitives, + P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static, + Evm: ConfigureEvmEnvFor + + 'static + + ConfigureEvm
+ + 'static, +{ + /// Transacts the the transactions and transform the state into [`MultiProofTargets`]. + fn prepare_multiproof_targets(self, tx: Recovered) -> Option { + let state = self.transact(tx)?; + + let mut targets = + MultiProofTargets::with_capacity_and_hasher(state.len(), Default::default()); + + for (addr, account) in state { + // if the account was not touched, or if the account was selfdestructed, do not + // fetch proofs for it + // + // Since selfdestruct can only happen in the same transaction, we can skip + // prefetching proofs for selfdestructed accounts + // + // See: https://eips.ethereum.org/EIPS/eip-6780 + if !account.is_touched() || account.is_selfdestructed() { + continue + } + + let mut storage_set = + B256Set::with_capacity_and_hasher(account.storage.len(), Default::default()); + for (key, slot) in account.storage { + // do nothing if unchanged + if !slot.is_changed() { + continue + } + + storage_set.insert(keccak256(B256::new(key.to_be_bytes()))); + } + + targets.insert(keccak256(addr), storage_set); + } + + Some(targets) + } + + /// Transacts the transaction and returns the state outcome. + /// + /// Returns `None` if executing the transaction failed to a non Revert error. + /// Returns the touched+modified state of the transaction. + /// + /// Note: Since here are no ordering guarantees this won't the state the tx produces when + /// executed sequentially. + fn transact(self, tx: Recovered) -> Option { + let Self { header, evm_config, cache: caches, cache_metrics, provider } = self; + // Create the state provider inside the thread + let state_provider = match provider.build() { + Ok(provider) => provider, + Err(err) => { + trace!( + target: "engine::tree", + %err, + "Failed to build state provider in prewarm thread" + ); + return None + } + }; + + // Use the caches to create a new provider with caching + let state_provider = + CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics); + + let state_provider = StateProviderDatabase::new(&state_provider); + + let mut evm_env = evm_config.evm_env(&header); + + // we must disable the nonce check so that we can execute the transaction even if the nonce + // doesn't match what's on chain. + evm_env.cfg_env.disable_nonce_check = true; + + // create a new executor and disable nonce checks in the env + let mut evm = evm_config.evm_with_env(state_provider, evm_env); + + // create the tx env and reset nonce + let tx_env = evm_config.tx_env(&tx); + let res = match evm.transact(tx_env) { + Ok(res) => res, + Err(err) => { + trace!( + target: "engine::tree", + %err, + tx_hash=%tx.tx_hash(), + sender=%tx.signer(), + "Error when executing prewarm transaction", + ); + return None + } + }; + + Some(res.state) + } +} + +/// The events the pre-warm task can handle. +pub(super) enum PrewarmTaskEvent { + /// Forcefully terminate all remaining transaction execution. + TerminateTransactionExecution, + /// Forcefully terminate the task on demand and update the shared cache with the given output + /// before exiting. + Terminate { + /// The final block state output. + block_output: Option, + }, + /// The outcome of a pre-warm task + Outcome { + /// The prepared proof targets based on the evm state outcome + proof_targets: Option, + }, +} diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs new file mode 100644 index 000000000000..1c7d5815d2f7 --- /dev/null +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -0,0 +1,199 @@ +//! Contains the implementation of the sparse trie logic responsible for creating the + +use crate::tree::payload_processor::{ + executor::WorkloadExecutor, + multiproof::{ + SparseTrieUpdate, StateRootComputeOutcome, StateRootConfig, StateRootResult, + StateRootTaskMetrics, + }, +}; +use rayon::iter::{ParallelBridge, ParallelIterator}; +use reth_provider::{BlockReader, DBProvider, DatabaseProviderFactory, StateCommitmentProvider}; +use reth_trie::{ + hashed_cursor::HashedPostStateCursorFactory, proof::ProofBlindedProviderFactory, + trie_cursor::InMemoryTrieCursorFactory, MultiProofTargets, Nibbles, +}; +use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; +use reth_trie_parallel::root::ParallelStateRootError; +use reth_trie_sparse::{ + blinded::{BlindedProvider, BlindedProviderFactory}, + errors::{SparseStateTrieResult, SparseTrieErrorKind}, + SparseStateTrie, +}; +use std::{ + sync::mpsc, + time::{Duration, Instant}, +}; +use tracing::{debug, trace, trace_span}; + +/// The level below which the sparse trie hashes are calculated in +/// [`crate::tree::payload_processor::multiproof::update_sparse_trie`]. +const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2; + +/// A task responsible for populating the sparse trie. +pub(super) struct SparseTrieTask { + /// Executor used to spawn subtasks. + pub(super) executor: WorkloadExecutor, + /// Receives updates from the state root task. + pub(super) updates: mpsc::Receiver, + // TODO: ideally we need a way to create multiple readers on demand. + pub(super) config: StateRootConfig, + pub(super) metrics: StateRootTaskMetrics, + /// How many sparse trie jobs should be executed in parallel + pub(super) max_concurrency: usize, +} + +impl SparseTrieTask +where + F: DatabaseProviderFactory + StateCommitmentProvider, +{ + /// Runs the sparse trie task to completion. + /// + /// This waits for new incoming [`SparseTrieUpdate`]. + /// + /// This concludes once the last trie update has been received. + pub(super) fn run(self) -> StateRootResult { + let now = Instant::now(); + let provider_ro = self.config.consistent_view.provider_ro()?; + let in_memory_trie_cursor = InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + &self.config.nodes_sorted, + ); + let blinded_provider_factory = ProofBlindedProviderFactory::new( + in_memory_trie_cursor.clone(), + HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + &self.config.state_sorted, + ), + self.config.prefix_sets.clone(), + ); + + let mut num_iterations = 0; + let mut trie = SparseStateTrie::new(blinded_provider_factory).with_updates(true); + + while let Ok(mut update) = self.updates.recv() { + num_iterations += 1; + let mut num_updates = 1; + while let Ok(next) = self.updates.try_recv() { + update.extend(next); + num_updates += 1; + } + + debug!( + target: "engine::root", + num_updates, + account_proofs = update.multiproof.account_subtree.len(), + storage_proofs = update.multiproof.storages.len(), + "Updating sparse trie" + ); + + let elapsed = update_sparse_trie(&mut trie, update).map_err(|e| { + ParallelStateRootError::Other(format!("could not calculate state root: {e:?}")) + })?; + self.metrics.sparse_trie_update_duration_histogram.record(elapsed); + trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed"); + } + + debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation"); + + let start = Instant::now(); + let (state_root, trie_updates) = trie.root_with_updates().map_err(|e| { + ParallelStateRootError::Other(format!("could not calculate state root: {e:?}")) + })?; + let elapsed = start.elapsed(); + + self.metrics.sparse_trie_final_update_duration_histogram.record(elapsed); + + Ok(StateRootComputeOutcome { + state_root: (state_root, trie_updates), + total_time: now.elapsed(), + time_from_last_update: elapsed, + }) + } +} + +/// Aliased for now to not introduce too many changes at once. +pub(super) type SparseTrieEvent = SparseTrieUpdate; + +// /// The event type the sparse trie task operates on. +// pub(crate) enum SparseTrieEvent { +// /// Updates received from the multiproof task. +// /// +// /// This represents a stream of [`SparseTrieUpdate`] where a `None` indicates that all +// updates /// have been received. +// Update(Option), +// } + +/// Updates the sparse trie with the given proofs and state, and returns the elapsed time. +pub(crate) fn update_sparse_trie( + trie: &mut SparseStateTrie, + SparseTrieUpdate { state, multiproof }: SparseTrieUpdate, +) -> SparseStateTrieResult +where + BPF: BlindedProviderFactory + Send + Sync, + BPF::AccountNodeProvider: BlindedProvider + Send + Sync, + BPF::StorageNodeProvider: BlindedProvider + Send + Sync, +{ + trace!(target: "engine::root::sparse", "Updating sparse trie"); + let started_at = Instant::now(); + + // Reveal new accounts and storage slots. + trie.reveal_multiproof(multiproof)?; + + // Update storage slots with new values and calculate storage roots. + let (tx, rx) = mpsc::channel(); + state + .storages + .into_iter() + .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address))) + .par_bridge() + .map(|(address, storage, storage_trie)| { + let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address); + let _enter = span.enter(); + trace!(target: "engine::root::sparse", "Updating storage"); + let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?; + + if storage.wiped { + trace!(target: "engine::root::sparse", "Wiping storage"); + storage_trie.wipe()?; + } + for (slot, value) in storage.storage { + let slot_nibbles = Nibbles::unpack(slot); + if value.is_zero() { + trace!(target: "engine::root::sparse", ?slot, "Removing storage slot"); + storage_trie.remove_leaf(&slot_nibbles)?; + } else { + trace!(target: "engine::root::sparse", ?slot, "Updating storage slot"); + storage_trie + .update_leaf(slot_nibbles, alloy_rlp::encode_fixed_size(&value).to_vec())?; + } + } + + storage_trie.root(); + + SparseStateTrieResult::Ok((address, storage_trie)) + }) + .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap()); + drop(tx); + for result in rx { + let (address, storage_trie) = result?; + trie.insert_storage_trie(address, storage_trie); + } + + // Update accounts with new values + for (address, account) in state.accounts { + trace!(target: "engine::root::sparse", ?address, "Updating account"); + trie.update_account(address, account.unwrap_or_default())?; + } + + trie.calculate_below_level(SPARSE_TRIE_INCREMENTAL_LEVEL); + let elapsed = started_at.elapsed(); + + Ok(elapsed) +} + +fn extend_multi_proof_targets_ref(targets: &mut MultiProofTargets, other: &MultiProofTargets) { + for (address, slots) in other { + targets.entry(*address).or_default().extend(slots); + } +} diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index fceaab9c2f2b..a0cc06098fea 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -478,7 +478,7 @@ where #[derive(Metrics, Clone)] #[metrics(scope = "tree.root")] -struct StateRootTaskMetrics { +pub(crate) struct StateRootTaskMetrics { /// Histogram of proof calculation durations. pub proof_calculation_duration_histogram: Histogram, /// Histogram of proof calculation account targets.