From 08b1e882727992a58adfa920df2c5a188ed43645 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 24 Jun 2024 12:21:52 +0200 Subject: [PATCH] feat: integrate Node traits into LaunchContextWith (#8993) --- crates/node/builder/src/launch/common.rs | 232 +++++++++++++++++++---- crates/node/builder/src/launch/mod.rs | 119 ++++-------- 2 files changed, 234 insertions(+), 117 deletions(-) diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 45c5fe01cbe4..1c48fd763041 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -1,18 +1,28 @@ //! Helper types that can be used by launchers. +use crate::{ + components::{NodeComponents, NodeComponentsBuilder}, + hooks::OnComponentInitializedHook, + BuilderContext, NodeAdapter, +}; use backon::{ConstantBuilder, Retryable}; use eyre::Context; use rayon::ThreadPoolBuilder; use reth_auto_seal_consensus::MiningMode; use reth_beacon_consensus::EthBeaconConsensus; -use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig}; +use reth_blockchain_tree::{ + noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, + TreeExternals, +}; use reth_chainspec::{Chain, ChainSpec}; use reth_config::{config::EtlConfig, PruneConfig}; +use reth_consensus::Consensus; use reth_db_api::{database::Database, database_metrics::DatabaseMetrics}; use reth_db_common::init::{init_genesis, InitDatabaseError}; use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader}; use reth_evm::noop::NoopBlockExecutorProvider; use reth_network_p2p::headers::client::HeadersClient; +use reth_node_api::FullNodeTypes; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, node_config::NodeConfig, @@ -29,7 +39,7 @@ use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget}; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_tracing::tracing::{debug, error, info, warn}; -use std::{sync::Arc, thread::available_parallelism}; +use std::{marker::PhantomData, sync::Arc, thread::available_parallelism}; use tokio::sync::{ mpsc::{unbounded_channel, Receiver, UnboundedSender}, oneshot, watch, @@ -509,9 +519,12 @@ where } /// Creates a `BlockchainProvider` and attaches it to the launch context. - pub async fn with_blockchain_db( + pub async fn with_blockchain_db( self, - ) -> eyre::Result>>> { + ) -> eyre::Result>>> + where + T: FullNodeTypes::DB>>, + { let tree_config = BlockchainTreeConfig::default(); // NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: @@ -526,11 +539,15 @@ where )?; let metered_providers = WithMeteredProviders { - provider_factory: self.provider_factory().clone(), + db_provider_container: WithMeteredProvider { + provider_factory: self.provider_factory().clone(), + metrics_sender: self.sync_metrics_tx(), + }, blockchain_db, - metrics_sender: self.sync_metrics_tx(), tree_config, canon_state_notification_sender, + // we store here a reference to T. + phantom_data: PhantomData, }; let ctx = LaunchContextWith { @@ -542,9 +559,10 @@ where } } -impl LaunchContextWith>> +impl LaunchContextWith>> where DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static, + T: FullNodeTypes>, { /// Returns access to the underlying database. pub fn database(&self) -> &DB { @@ -553,20 +571,122 @@ where /// Returns the configured `ProviderFactory`. pub const fn provider_factory(&self) -> &ProviderFactory { - &self.right().provider_factory + &self.right().db_provider_container.provider_factory } - /// Returns the static file provider to interact with the static files. - pub fn static_file_provider(&self) -> StaticFileProvider { - self.provider_factory().static_file_provider() + /// Fetches the head block from the database. + /// + /// If the database is empty, returns the genesis block. + pub fn lookup_head(&self) -> eyre::Result { + self.node_config() + .lookup_head(self.provider_factory().clone()) + .wrap_err("the head block is missing") } - /// Creates a new [`StaticFileProducer`] with the attached database. - pub fn static_file_producer(&self) -> StaticFileProducer { - StaticFileProducer::new( + /// Returns the metrics sender. + pub fn sync_metrics_tx(&self) -> UnboundedSender { + self.right().db_provider_container.metrics_sender.clone() + } + + /// Returns a reference to the `BlockchainProvider`. + pub const fn blockchain_db(&self) -> &BlockchainProvider { + &self.right().blockchain_db + } + + /// Returns a reference to the `BlockchainTreeConfig`. + pub const fn tree_config(&self) -> &BlockchainTreeConfig { + &self.right().tree_config + } + + /// Returns the `CanonStateNotificationSender`. + pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender { + self.right().canon_state_notification_sender.clone() + } + + /// Creates a `NodeAdapter` and attaches it to the launch context. + pub async fn with_components( + self, + components_builder: CB, + on_component_initialized: Box< + dyn OnComponentInitializedHook>, + >, + ) -> eyre::Result>>> + where + CB: NodeComponentsBuilder, + { + // fetch the head block from the database + let head = self.lookup_head()?; + + let builder_ctx = BuilderContext::new( + head, + self.blockchain_db().clone(), + self.task_executor().clone(), + self.configs().clone(), + ); + + debug!(target: "reth::cli", "creating components"); + let components = components_builder.build_components(&builder_ctx).await?; + + let consensus: Arc = Arc::new(components.consensus().clone()); + + let tree_externals = TreeExternals::new( self.provider_factory().clone(), - self.prune_modes().unwrap_or_default(), - ) + consensus.clone(), + components.block_executor().clone(), + ); + let tree = BlockchainTree::new(tree_externals, *self.tree_config(), self.prune_modes())? + .with_sync_metrics_tx(self.sync_metrics_tx()) + // Note: This is required because we need to ensure that both the components and the + // tree are using the same channel for canon state notifications. This will be removed + // once the Blockchain provider no longer depends on an instance of the tree + .with_canon_state_notification_sender(self.canon_state_notification_sender()); + + let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree)); + + // Replace the tree component with the actual tree + let blockchain_db = self.blockchain_db().clone().with_tree(blockchain_tree); + + debug!(target: "reth::cli", "configured blockchain tree"); + + let node_adapter = NodeAdapter { + components, + task_executor: self.task_executor().clone(), + provider: blockchain_db.clone(), + }; + + debug!(target: "reth::cli", "calling on_component_initialized hook"); + on_component_initialized.on_event(node_adapter.clone())?; + + let components_container = WithComponents { + db_provider_container: WithMeteredProvider { + provider_factory: self.provider_factory().clone(), + metrics_sender: self.sync_metrics_tx(), + }, + blockchain_db, + tree_config: self.right().tree_config, + node_adapter, + head, + consensus, + }; + + let ctx = LaunchContextWith { + inner: self.inner, + attachment: self.attachment.map_right(|_| components_container), + }; + + Ok(ctx) + } +} + +impl LaunchContextWith>> +where + DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static, + T: FullNodeTypes>, + CB: NodeComponentsBuilder, +{ + /// Returns the configured `ProviderFactory`. + pub const fn provider_factory(&self) -> &ProviderFactory { + &self.right().db_provider_container.provider_factory } /// Returns the max block that the node should run to, looking it up from the network if @@ -578,18 +698,27 @@ where self.node_config().max_block(client, self.provider_factory().clone()).await } - /// Fetches the head block from the database. - /// - /// If the database is empty, returns the genesis block. - pub fn lookup_head(&self) -> eyre::Result { - self.node_config() - .lookup_head(self.provider_factory().clone()) - .wrap_err("the head block is missing") + /// Returns the static file provider to interact with the static files. + pub fn static_file_provider(&self) -> StaticFileProvider { + self.provider_factory().static_file_provider() } - /// Returns the metrics sender. - pub fn sync_metrics_tx(&self) -> UnboundedSender { - self.right().metrics_sender.clone() + /// Creates a new [`StaticFileProducer`] with the attached database. + pub fn static_file_producer(&self) -> StaticFileProducer { + StaticFileProducer::new( + self.provider_factory().clone(), + self.prune_modes().unwrap_or_default(), + ) + } + + /// Returns the current head block. + pub const fn head(&self) -> Head { + self.right().head + } + + /// Returns the configured `NodeAdapter`. + pub const fn node_adapter(&self) -> &NodeAdapter { + &self.right().node_adapter } /// Returns a reference to the `BlockchainProvider`. @@ -597,14 +726,24 @@ where &self.right().blockchain_db } + /// Returns the configured `Consensus`. + pub fn consensus(&self) -> Arc { + self.right().consensus.clone() + } + + /// Returns the metrics sender. + pub fn sync_metrics_tx(&self) -> UnboundedSender { + self.right().db_provider_container.metrics_sender.clone() + } + /// Returns a reference to the `BlockchainTreeConfig`. pub const fn tree_config(&self) -> &BlockchainTreeConfig { &self.right().tree_config } - /// Returns the `CanonStateNotificationSender`. - pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender { - self.right().canon_state_notification_sender.clone() + /// Returns the node adapter components. + pub const fn components(&self) -> &CB::Components { + &self.node_adapter().components } } @@ -668,23 +807,40 @@ pub struct WithConfigs { pub toml_config: reth_config::Config, } +/// Helper container type to bundle the [`ProviderFactory`] and the metrics +/// sender. +#[derive(Debug, Clone)] +pub struct WithMeteredProvider { + provider_factory: ProviderFactory, + metrics_sender: UnboundedSender, +} + /// Helper container to bundle the [`ProviderFactory`], [`BlockchainProvider`] /// and a metrics sender. #[allow(missing_debug_implementations)] -pub struct WithMeteredProviders { - provider_factory: ProviderFactory, +pub struct WithMeteredProviders { + db_provider_container: WithMeteredProvider, blockchain_db: BlockchainProvider, - metrics_sender: UnboundedSender, canon_state_notification_sender: CanonStateNotificationSender, tree_config: BlockchainTreeConfig, + // this field is used to store a reference to the FullNodeTypes so that we + // can build the components in `with_components` method. + phantom_data: PhantomData, } -/// Helper container type to bundle athe [`ProviderFactory`] and the metrics -/// sender. -#[derive(Debug)] -pub struct WithMeteredProvider { - provider_factory: ProviderFactory, - metrics_sender: UnboundedSender, +/// Helper container to bundle the metered providers container and [`NodeAdapter`]. +#[allow(missing_debug_implementations)] +pub struct WithComponents +where + T: FullNodeTypes>, + CB: NodeComponentsBuilder, +{ + db_provider_container: WithMeteredProvider, + tree_config: BlockchainTreeConfig, + blockchain_db: BlockchainProvider, + node_adapter: NodeAdapter, + head: Head, + consensus: Arc, } #[cfg(test)] diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 9326e3b14c04..9795cead4155 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -5,19 +5,17 @@ use crate::{ components::{NodeComponents, NodeComponentsBuilder}, hooks::NodeHooks, node::FullNode, - BuilderContext, NodeBuilderWithComponents, NodeHandle, + NodeBuilderWithComponents, NodeHandle, }; use futures::{future::Either, stream, stream_select, StreamExt}; use reth_beacon_consensus::{ hooks::{EngineHooks, PruneHook, StaticFileHook}, BeaconConsensusEngine, }; -use reth_blockchain_tree::{BlockchainTree, ShareableBlockchainTree, TreeExternals}; -use reth_consensus::Consensus; use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider}; use reth_exex::ExExManagerHandle; use reth_network::NetworkEvents; -use reth_node_api::{FullNodeComponents, FullNodeTypes}; +use reth_node_api::FullNodeTypes; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, engine::EngineMessageStreamExt, @@ -101,6 +99,7 @@ where add_ons: NodeAddOns { hooks, rpc, exexs: installed_exex }, config, } = target; + let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; // setup the launch context let ctx = ctx @@ -127,61 +126,23 @@ where info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks()); }) .with_metrics() - .with_blockchain_db().await?; - - // fetch the head block from the database - let head = ctx.lookup_head()?; - - let builder_ctx = BuilderContext::new( - head, - ctx.blockchain_db().clone(), - ctx.task_executor().clone(), - ctx.configs().clone(), - ); - - debug!(target: "reth::cli", "creating components"); - let components = components_builder.build_components(&builder_ctx).await?; - - let consensus: Arc = Arc::new(components.consensus().clone()); - - let tree_externals = TreeExternals::new( - ctx.provider_factory().clone(), - consensus.clone(), - components.block_executor().clone(), - ); - let tree = BlockchainTree::new(tree_externals, *ctx.tree_config(), ctx.prune_modes())? - .with_sync_metrics_tx(ctx.sync_metrics_tx()) - // Note: This is required because we need to ensure that both the components and the - // tree are using the same channel for canon state notifications. This will be removed - // once the Blockchain provider no longer depends on an instance of the tree - .with_canon_state_notification_sender(ctx.canon_state_notification_sender()); - - let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree)); - - // Replace the tree component with the actual tree - let blockchain_db = ctx.blockchain_db().clone().with_tree(blockchain_tree); - - debug!(target: "reth::cli", "configured blockchain tree"); - - let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; - - let node_adapter = NodeAdapter { - components, - task_executor: ctx.task_executor().clone(), - provider: blockchain_db.clone(), - }; - - debug!(target: "reth::cli", "calling on_component_initialized hook"); - on_component_initialized.on_event(node_adapter.clone())?; + // passing FullNodeTypes as type parameter here so that we can build + // later the components. + .with_blockchain_db::().await? + .with_components(components_builder, on_component_initialized).await?; // spawn exexs - let exex_manager_handle = - ExExLauncher::new(head, node_adapter.clone(), installed_exex, ctx.configs().clone()) - .launch() - .await; + let exex_manager_handle = ExExLauncher::new( + ctx.head(), + ctx.node_adapter().clone(), + installed_exex, + ctx.configs().clone(), + ) + .launch() + .await; // create pipeline - let network_client = node_adapter.network().fetch_client().await?; + let network_client = ctx.components().network().fetch_client().await?; let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); let node_config = ctx.node_config(); @@ -216,30 +177,30 @@ where // install auto-seal let mining_mode = - ctx.dev_mining_mode(node_adapter.components.pool().pending_transactions_listener()); + ctx.dev_mining_mode(ctx.components().pool().pending_transactions_listener()); info!(target: "reth::cli", mode=%mining_mode, "configuring dev mining mode"); let (_, client, mut task) = reth_auto_seal_consensus::AutoSealBuilder::new( ctx.chain_spec(), - blockchain_db.clone(), - node_adapter.components.pool().clone(), + ctx.blockchain_db().clone(), + ctx.components().pool().clone(), consensus_engine_tx.clone(), mining_mode, - node_adapter.components.block_executor().clone(), + ctx.components().block_executor().clone(), ) .build(); let pipeline = crate::setup::build_networked_pipeline( &ctx.toml_config().stages, client.clone(), - consensus.clone(), + ctx.consensus(), ctx.provider_factory().clone(), ctx.task_executor(), ctx.sync_metrics_tx(), ctx.prune_config(), max_block, static_file_producer, - node_adapter.components.block_executor().clone(), + ctx.components().block_executor().clone(), pipeline_exex_handle, ) .await?; @@ -254,14 +215,14 @@ where let pipeline = crate::setup::build_networked_pipeline( &ctx.toml_config().stages, network_client.clone(), - consensus.clone(), + ctx.consensus(), ctx.provider_factory().clone(), ctx.task_executor(), ctx.sync_metrics_tx(), ctx.prune_config(), max_block, static_file_producer, - node_adapter.components.block_executor().clone(), + ctx.components().block_executor().clone(), pipeline_exex_handle, ) .await?; @@ -290,11 +251,11 @@ where let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( client, pipeline, - blockchain_db.clone(), + ctx.blockchain_db().clone(), Box::new(ctx.task_executor().clone()), - Box::new(node_adapter.components.network().clone()), + Box::new(ctx.components().network().clone()), max_block, - node_adapter.components.payload_builder().clone(), + ctx.components().payload_builder().clone(), initial_target, reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN, consensus_engine_tx, @@ -304,12 +265,12 @@ where info!(target: "reth::cli", "Consensus engine initialized"); let events = stream_select!( - node_adapter.components.network().event_listener().map(Into::into), + ctx.components().network().event_listener().map(Into::into), beacon_engine_handle.event_listener().map(Into::into), pipeline_events.map(Into::into), if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { Either::Left( - ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())) + ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) .map(Into::into), ) } else { @@ -321,8 +282,8 @@ where ctx.task_executor().spawn_critical( "events task", node::handle_events( - Some(node_adapter.components.network().clone()), - Some(head.number), + Some(ctx.components().network().clone()), + Some(ctx.head().number), events, database.clone(), ), @@ -335,10 +296,10 @@ where commit: VERGEN_GIT_SHA.to_string(), }; let engine_api = EngineApi::new( - blockchain_db.clone(), + ctx.blockchain_db().clone(), ctx.chain_spec(), beacon_engine_handle, - node_adapter.components.payload_builder().clone().into(), + ctx.components().payload_builder().clone().into(), Box::new(ctx.task_executor().clone()), client, ); @@ -349,7 +310,7 @@ where // Start RPC servers let (rpc_server_handles, mut rpc_registry) = crate::rpc::launch_rpc_servers( - node_adapter.clone(), + ctx.node_adapter().clone(), engine_api, ctx.node_config(), jwt_secret, @@ -413,12 +374,12 @@ where } let full_node = FullNode { - evm_config: node_adapter.components.evm_config().clone(), - block_executor: node_adapter.components.block_executor().clone(), - pool: node_adapter.components.pool().clone(), - network: node_adapter.components.network().clone(), - provider: node_adapter.provider.clone(), - payload_builder: node_adapter.components.payload_builder().clone(), + evm_config: ctx.components().evm_config().clone(), + block_executor: ctx.components().block_executor().clone(), + pool: ctx.components().pool().clone(), + network: ctx.components().network().clone(), + provider: ctx.node_adapter().provider.clone(), + payload_builder: ctx.components().payload_builder().clone(), task_executor: ctx.task_executor().clone(), rpc_server_handles, rpc_registry,