diff --git a/Cargo.lock b/Cargo.lock index e5beda6176a4..25895a352df5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7070,6 +7070,7 @@ dependencies = [ "reth-engine-primitives", "reth-errors", "reth-ethereum-consensus", + "reth-ethereum-engine-primitives", "reth-evm", "reth-exex-types", "reth-metrics", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 38ab50877ac1..a8d29b020cee 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -63,6 +63,8 @@ reth-tracing = { workspace = true, optional = true } [dev-dependencies] # reth reth-db = { workspace = true, features = ["test-utils"] } +reth-ethereum-engine-primitives.workspace = true +reth-evm = { workspace = true, features = ["test-utils"] } reth-exex-types.workspace = true reth-network-p2p = { workspace = true, features = ["test-utils"] } reth-prune.workspace = true diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index e2fabd2c8e24..f63003a31ba3 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -286,17 +286,14 @@ impl PersistenceHandle { #[cfg(test)] mod tests { use super::*; + use crate::test_utils::get_executed_block_with_number; use reth_chainspec::MAINNET; use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; use reth_exex_types::FinishedExExHeight; - use reth_primitives::{ - Address, Block, Receipts, Requests, SealedBlockWithSenders, TransactionSigned, B256, - }; - use reth_provider::{providers::StaticFileProvider, ExecutionOutcome, ProviderFactory}; + use reth_primitives::B256; + use reth_provider::{providers::StaticFileProvider, ProviderFactory}; use reth_prune::Pruner; - use reth_trie::{updates::TrieUpdates, HashedPostState}; - use revm::db::BundleState; - use std::sync::{mpsc::channel, Arc}; + use std::sync::mpsc::channel; fn default_persistence_handle() -> PersistenceHandle { let db = create_test_rw_db(); @@ -340,29 +337,10 @@ mod tests { #[tokio::test] async fn test_save_blocks_single_block() { let persistence_handle = default_persistence_handle(); + let block_number = 5; + let executed = get_executed_block_with_number(block_number); + let block_hash = executed.block().hash(); - let mut block = Block::default(); - let sender = Address::random(); - let tx = TransactionSigned::default(); - block.body.push(tx); - let block_hash = block.hash_slow(); - let block_number = block.number; - let sealed = block.seal_slow(); - let sealed_with_senders = - SealedBlockWithSenders::new(sealed.clone(), vec![sender]).unwrap(); - - let executed = ExecutedBlock::new( - Arc::new(sealed), - Arc::new(sealed_with_senders.senders), - Arc::new(ExecutionOutcome::new( - BundleState::default(), - Receipts { receipt_vec: vec![] }, - block_number, - vec![Requests::default()], - )), - Arc::new(HashedPostState::default()), - Arc::new(TrieUpdates::default()), - ); let blocks = vec![executed]; let (tx, rx) = oneshot::channel(); diff --git a/crates/engine/tree/src/test_utils.rs b/crates/engine/tree/src/test_utils.rs index 0a5fbd5ad560..3ab7d31cdbaf 100644 --- a/crates/engine/tree/src/test_utils.rs +++ b/crates/engine/tree/src/test_utils.rs @@ -1,12 +1,18 @@ +use crate::tree::ExecutedBlock; use reth_chainspec::ChainSpec; use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase}; use reth_network_p2p::test_utils::TestFullBlockClient; -use reth_primitives::{BlockBody, SealedHeader, B256}; +use reth_primitives::{ + Address, Block, BlockBody, BlockNumber, Receipts, Requests, SealedBlockWithSenders, + SealedHeader, TransactionSigned, B256, +}; use reth_provider::{test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome}; use reth_prune_types::PruneModes; use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; use reth_stages_api::Pipeline; use reth_static_file::StaticFileProducer; +use reth_trie::{updates::TrieUpdates, HashedPostState}; +use revm::db::BundleState; use std::{collections::VecDeque, ops::Range, sync::Arc}; use tokio::sync::watch; @@ -75,3 +81,33 @@ pub(crate) fn insert_headers_into_client( client.insert(sealed_header.clone(), body.clone()); } } + +pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBlock { + let mut block = Block::default(); + let mut header = block.header.clone(); + header.number = block_number; + block.header = header; + + let sender = Address::random(); + let tx = TransactionSigned::default(); + block.body.push(tx); + let sealed = block.seal_slow(); + let sealed_with_senders = SealedBlockWithSenders::new(sealed.clone(), vec![sender]).unwrap(); + + ExecutedBlock::new( + Arc::new(sealed), + Arc::new(sealed_with_senders.senders), + Arc::new(ExecutionOutcome::new( + BundleState::default(), + Receipts { receipt_vec: vec![] }, + block_number, + vec![Requests::default()], + )), + Arc::new(HashedPostState::default()), + Arc::new(TrieUpdates::default()), + ) +} + +pub(crate) fn get_executed_blocks(number: u64) -> Vec { + (1..=number).map(get_executed_block_with_number).collect() +} diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 78e8cdbaeec7..81b69dfb1842 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -48,7 +48,7 @@ pub use memory_overlay::MemoryOverlayStateProvider; const PERSISTENCE_THRESHOLD: u64 = 256; /// Represents an executed block stored in-memory. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct ExecutedBlock { block: Arc, senders: Arc>, @@ -326,44 +326,39 @@ where } fn run(mut self) { - loop { - while let Ok(msg) = self.incoming.recv() { - match msg { - FromEngine::Event(event) => match event { - FromOrchestrator::BackfillSyncFinished => { - todo!() - } - FromOrchestrator::BackfillSyncStarted => { - todo!() - } - }, - FromEngine::Request(request) => match request { - BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - let output = self.on_forkchoice_updated(state, payload_attrs); - if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) - { - error!("Failed to send event: {err:?}"); - } - } - BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { - let output = self.on_new_payload(payload, cancun_fields); - if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| { - reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new( - e, - )) - })) { - error!("Failed to send event: {err:?}"); - } + while let Ok(msg) = self.incoming.recv() { + match msg { + FromEngine::Event(event) => match event { + FromOrchestrator::BackfillSyncFinished => { + todo!() + } + FromOrchestrator::BackfillSyncStarted => { + todo!() + } + }, + FromEngine::Request(request) => match request { + BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { + let output = self.on_forkchoice_updated(state, payload_attrs); + if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) { + error!("Failed to send event: {err:?}"); } - BeaconEngineMessage::TransitionConfigurationExchanged => { - todo!() + } + BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { + let output = self.on_new_payload(payload, cancun_fields); + if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| { + reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(e)) + })) { + error!("Failed to send event: {err:?}"); } - }, - FromEngine::DownloadedBlocks(blocks) => { - if let Some(event) = self.on_downloaded(blocks) { - if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) { - error!("Failed to send event: {err:?}"); - } + } + BeaconEngineMessage::TransitionConfigurationExchanged => { + todo!() + } + }, + FromEngine::DownloadedBlocks(blocks) => { + if let Some(event) = self.on_downloaded(blocks) { + if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) { + error!("Failed to send event: {err:?}"); } } } @@ -719,7 +714,8 @@ where type Engine = T; fn on_downloaded(&mut self, _blocks: Vec) -> Option { - todo!() + debug!("not implemented"); + None } fn on_new_payload( @@ -878,3 +874,102 @@ impl PersistenceState { self.last_persisted_block_hash = last_persisted_block_hash; } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{persistence::PersistenceAction, test_utils::get_executed_blocks}; + + use reth_beacon_consensus::EthBeaconConsensus; + use reth_chainspec::{ChainSpecBuilder, MAINNET}; + use reth_ethereum_engine_primitives::EthEngineTypes; + use reth_evm::test_utils::MockExecutorProvider; + use reth_provider::test_utils::MockEthProvider; + use std::sync::mpsc::{channel, Sender}; + use tokio::sync::mpsc::unbounded_channel; + + #[allow(clippy::type_complexity)] + fn get_default_tree( + persistence_handle: PersistenceHandle, + tree_state: TreeState, + ) -> ( + EngineApiTreeHandlerImpl, + Sender>>, + ) { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone())); + + let provider = MockEthProvider::default(); + let executor_factory = MockExecutorProvider::default(); + executor_factory.extend(vec![ExecutionOutcome::default()]); + + let payload_validator = ExecutionPayloadValidator::new(chain_spec); + + let (to_tree_tx, to_tree_rx) = channel(); + let (from_tree_tx, from_tree_rx) = unbounded_channel(); + + let engine_api_tree_state = EngineApiTreeState { + invalid_headers: InvalidHeaderCache::new(10), + buffer: BlockBuffer::new(10), + tree_state, + forkchoice_state_tracker: ForkchoiceStateTracker::default(), + }; + + ( + EngineApiTreeHandlerImpl::new( + provider, + executor_factory, + consensus, + payload_validator, + to_tree_rx, + from_tree_tx, + engine_api_tree_state, + persistence_handle, + ), + to_tree_tx, + ) + } + + #[tokio::test] + async fn test_tree_persist_blocks() { + // we need more than PERSISTENCE_THRESHOLD blocks to trigger the + // persistence task. + let mut blocks = get_executed_blocks(PERSISTENCE_THRESHOLD + 1); + + let mut blocks_by_hash = HashMap::new(); + let mut blocks_by_number = BTreeMap::new(); + for block in &blocks { + blocks_by_hash.insert(block.block().hash(), block.clone()); + blocks_by_number + .entry(block.block().number) + .or_insert_with(Vec::new) + .push(block.clone()); + } + let tree_state = TreeState { blocks_by_hash, blocks_by_number }; + + let (action_tx, action_rx) = channel(); + let persistence_handle = PersistenceHandle::new(action_tx); + + let (tree, to_tree_tx) = get_default_tree(persistence_handle, tree_state); + std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| tree.run()).unwrap(); + + // send a message to the tree + to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); + + let received_action = action_rx.recv().expect("Failed to receive saved blocks"); + if let PersistenceAction::SaveBlocks((saved_blocks, _)) = received_action { + // only PERSISTENCE_THRESHOLD will be persisted + blocks.pop(); + assert_eq!(saved_blocks, blocks); + assert_eq!(saved_blocks.len() as u64, PERSISTENCE_THRESHOLD); + } else { + panic!("unexpected action received {received_action:?}"); + } + } +}