From 136a8227bf9e2326484455e33755bfc0adccb05e Mon Sep 17 00:00:00 2001 From: greged93 <82421016+greged93@users.noreply.github.com> Date: Fri, 27 Sep 2024 11:44:52 +0200 Subject: [PATCH] feat: canonical state for local engine (#11245) --- crates/engine/local/Cargo.toml | 1 + crates/engine/local/src/service.rs | 162 ++++++++++++++++++++--------- 2 files changed, 114 insertions(+), 49 deletions(-) diff --git a/crates/engine/local/Cargo.toml b/crates/engine/local/Cargo.toml index 286b9f836aa4..f045bb6fda1d 100644 --- a/crates/engine/local/Cargo.toml +++ b/crates/engine/local/Cargo.toml @@ -11,6 +11,7 @@ exclude.workspace = true [dependencies] # reth reth-beacon-consensus.workspace = true +reth-chain-state.workspace = true reth-engine-tree.workspace = true reth-node-types.workspace = true reth-payload-builder.workspace = true diff --git a/crates/engine/local/src/service.rs b/crates/engine/local/src/service.rs index d276dc5c1f8a..c9794ecfabb0 100644 --- a/crates/engine/local/src/service.rs +++ b/crates/engine/local/src/service.rs @@ -7,8 +7,9 @@ //! building at a fixed interval. use crate::miner::MiningMode; -use alloy_primitives::B256; +use eyre::eyre; use reth_beacon_consensus::EngineNodeTypes; +use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain}; use reth_engine_tree::persistence::PersistenceHandle; use reth_payload_builder::PayloadBuilderHandle; use reth_payload_primitives::{ @@ -17,12 +18,12 @@ use reth_payload_primitives::{ use reth_provider::ProviderFactory; use reth_prune::PrunerWithFactory; use reth_stages_api::MetricEventsSender; -use std::fmt::Formatter; use tokio::sync::oneshot; use tracing::debug; /// Provides a local dev service engine that can be used to drive the /// chain forward. +#[derive(Debug)] pub struct LocalEngineService where N: EngineNodeTypes, @@ -32,30 +33,14 @@ where payload_builder: PayloadBuilderHandle, /// The payload attribute builder for the engine payload_attributes_builder: B, + /// Keep track of the Canonical chain state that isn't persisted on disk yet + canonical_in_memory_state: CanonicalInMemoryState, /// A handle to the persistence layer persistence_handle: PersistenceHandle, - /// The hash of the current head - head: B256, /// The mining mode for the engine mode: MiningMode, } -impl std::fmt::Debug for LocalEngineService -where - N: EngineNodeTypes, - B: PayloadAttributesBuilder::PayloadAttributes>, -{ - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LocalEngineService") - .field("payload_builder", &self.payload_builder) - .field("payload_attributes_builder", &self.payload_attributes_builder) - .field("persistence_handle", &self.persistence_handle) - .field("head", &self.head) - .field("mode", &self.mode) - .finish() - } -} - impl LocalEngineService where N: EngineNodeTypes, @@ -67,14 +52,20 @@ where payload_attributes_builder: B, provider: ProviderFactory, pruner: PrunerWithFactory>, + canonical_in_memory_state: CanonicalInMemoryState, sync_metrics_tx: MetricEventsSender, - head: B256, mode: MiningMode, ) -> Self { let persistence_handle = PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx); - Self { payload_builder, payload_attributes_builder, persistence_handle, head, mode } + Self { + payload_builder, + payload_attributes_builder, + canonical_in_memory_state, + persistence_handle, + mode, + } } /// Spawn the [`LocalEngineService`] on a tokio green thread. The service will poll the payload @@ -86,8 +77,8 @@ where payload_attributes_builder: B, provider: ProviderFactory, pruner: PrunerWithFactory>, + canonical_in_memory_state: CanonicalInMemoryState, sync_metrics_tx: MetricEventsSender, - head: B256, mode: MiningMode, ) { let engine = Self::new( @@ -95,8 +86,8 @@ where payload_attributes_builder, provider, pruner, + canonical_in_memory_state, sync_metrics_tx, - head, mode, ); @@ -112,26 +103,29 @@ where (&mut self.mode).await; // Start a new payload building job - let new_head = self.build_and_save_payload().await; + let executed_block = self.build_and_save_payload().await; - if new_head.is_err() { - debug!(target: "local_engine", err = ?new_head.unwrap_err(), "failed payload building"); + if executed_block.is_err() { + debug!(target: "local_engine", err = ?executed_block.unwrap_err(), "failed payload building"); continue } + let block = executed_block.expect("not error"); - // Update the head - self.head = new_head.expect("not error"); + let res = self.update_canonical_in_memory_state(block); + if res.is_err() { + debug!(target: "local_engine", err = ?res.unwrap_err(), "failed canonical state update"); + } } } /// Builds a payload by initiating a new payload job via the [`PayloadBuilderHandle`], - /// saving the execution outcome to persistence and returning the current head of the - /// chain. - async fn build_and_save_payload(&self) -> eyre::Result { + /// saving the execution outcome to persistence and returning the executed block. + async fn build_and_save_payload(&self) -> eyre::Result { let payload_attributes = self.payload_attributes_builder.build()?; + let parent = self.canonical_in_memory_state.get_canonical_head().hash(); let payload_builder_attributes = ::PayloadBuilderAttributes::try_new( - self.head, + parent, payload_attributes, ) .map_err(|_| eyre::eyre!("failed to fetch payload attributes"))?; @@ -142,22 +136,38 @@ where .await? .await?; - let block = payload.executed_block().map(|block| vec![block]).unwrap_or_default(); + let executed_block = + payload.executed_block().ok_or_else(|| eyre!("missing executed block"))?; let (tx, rx) = oneshot::channel(); - let _ = self.persistence_handle.save_blocks(block, tx); + let _ = self.persistence_handle.save_blocks(vec![executed_block.clone()], tx); // Wait for the persistence_handle to complete - let new_head = rx.await?.ok_or_else(|| eyre::eyre!("missing new head"))?; + let _ = rx.await?.ok_or_else(|| eyre!("missing new head"))?; + + Ok(executed_block) + } + + /// Update the canonical in memory state and send notification for a new canon state to + /// all the listeners. + fn update_canonical_in_memory_state(&self, executed_block: ExecutedBlock) -> eyre::Result<()> { + let chain = NewCanonicalChain::Commit { new: vec![executed_block] }; + let tip = chain.tip().header.clone(); + let notification = chain.to_chain_notification(); - Ok(new_head.hash) + // Update the tracked in-memory state with the new chain + self.canonical_in_memory_state.update_chain(chain); + self.canonical_in_memory_state.set_canonical_head(tip); + + // Sends an event to all active listeners about the new canonical chain + self.canonical_in_memory_state.notify_canon_state(notification); + Ok(()) } } #[cfg(test)] mod tests { use super::*; - use alloy_primitives::B256; use reth_chainspec::MAINNET; use reth_config::PruneConfig; use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; @@ -201,20 +211,20 @@ mod tests { let provider = ProviderFactory::>::new( create_test_rw_db(), MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), + StaticFileProvider::read_write(static_dir_path)?, ); let pruner = PrunerBuilder::new(PruneConfig::default()) .build_with_provider_factory(provider.clone()); + // Create an empty canonical in memory state + let canonical_in_memory_state = CanonicalInMemoryState::empty(); + // Start the payload builder service let payload_handle = spawn_test_payload_service::(); // Sync metric channel let (sync_metrics_tx, _) = unbounded_channel(); - // Get the attributes for start of block building - let genesis_hash = B256::random(); - // Launch the LocalEngineService in interval mode let period = Duration::from_secs(1); LocalEngineService::spawn_new( @@ -222,13 +232,17 @@ mod tests { TestPayloadAttributesBuilder, provider.clone(), pruner, + canonical_in_memory_state, sync_metrics_tx, - genesis_hash, MiningMode::interval(period), ); + // Check that we have no block for now + let block = provider.block_by_number(0)?; + assert!(block.is_none()); + // Wait 4 intervals - tokio::time::sleep(4 * period).await; + tokio::time::sleep(2 * period).await; // Assert a block has been build let block = provider.block_by_number(0)?; @@ -246,11 +260,14 @@ mod tests { let provider = ProviderFactory::>::new( create_test_rw_db(), MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), + StaticFileProvider::read_write(static_dir_path)?, ); let pruner = PrunerBuilder::new(PruneConfig::default()) .build_with_provider_factory(provider.clone()); + // Create an empty canonical in memory state + let canonical_in_memory_state = CanonicalInMemoryState::empty(); + // Start the payload builder service let payload_handle = spawn_test_payload_service::(); @@ -260,17 +277,14 @@ mod tests { // Sync metric channel let (sync_metrics_tx, _) = unbounded_channel(); - // Get the attributes for start of block building - let genesis_hash = B256::random(); - // Launch the LocalEngineService in instant mode LocalEngineService::spawn_new( payload_handle, TestPayloadAttributesBuilder, provider.clone(), pruner, + canonical_in_memory_state, sync_metrics_tx, - genesis_hash, MiningMode::instant(pool.clone()), ); @@ -295,4 +309,54 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_canonical_chain_subscription() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Start the provider and the pruner + let (_, static_dir_path) = create_test_static_files_dir(); + let provider = ProviderFactory::>::new( + create_test_rw_db(), + MAINNET.clone(), + StaticFileProvider::read_write(static_dir_path)?, + ); + let pruner = PrunerBuilder::new(PruneConfig::default()) + .build_with_provider_factory(provider.clone()); + + // Create an empty canonical in memory state + let canonical_in_memory_state = CanonicalInMemoryState::empty(); + let mut notifications = canonical_in_memory_state.subscribe_canon_state(); + + // Start the payload builder service + let payload_handle = spawn_test_payload_service::(); + + // Start a transaction pool + let pool = testing_pool(); + + // Sync metric channel + let (sync_metrics_tx, _) = unbounded_channel(); + + // Launch the LocalEngineService in instant mode + LocalEngineService::spawn_new( + payload_handle, + TestPayloadAttributesBuilder, + provider.clone(), + pruner, + canonical_in_memory_state, + sync_metrics_tx, + MiningMode::instant(pool.clone()), + ); + + // Add a transaction to the pool + let transaction = MockTransaction::legacy().with_gas_price(10); + pool.add_transaction(Default::default(), transaction).await?; + + // Check a notification is received for block 0 + let res = notifications.recv().await?; + + assert_eq!(res.tip().number, 0); + + Ok(()) + } }