From 3210007f519df742740e3d4588b3cbd800fd7f3c Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Thu, 27 Jun 2024 19:12:43 +0200 Subject: [PATCH 1/4] update `NodeExitFuture` --- crates/node-core/Cargo.toml | 2 +- crates/node-core/src/exit.rs | 35 +++++++++++---------------- crates/node/builder/src/handle.rs | 11 +-------- crates/node/builder/src/launch/mod.rs | 5 +++- 4 files changed, 20 insertions(+), 33 deletions(-) diff --git a/crates/node-core/Cargo.toml b/crates/node-core/Cargo.toml index ec86bb440893..cc29e3d2acf2 100644 --- a/crates/node-core/Cargo.toml +++ b/crates/node-core/Cargo.toml @@ -36,7 +36,7 @@ reth-net-nat.workspace = true reth-network-peers.workspace = true reth-tasks.workspace = true reth-consensus-common.workspace = true -reth-beacon-consensus.workspace = true +reth-beacon-consensus = { workspace = true, optional = true } reth-prune-types.workspace = true reth-stages-types.workspace = true diff --git a/crates/node-core/src/exit.rs b/crates/node-core/src/exit.rs index 7957af1854fc..26a5f1cc992d 100644 --- a/crates/node-core/src/exit.rs +++ b/crates/node-core/src/exit.rs @@ -1,20 +1,18 @@ //! Helper types for waiting for the node to exit. use futures::FutureExt; -use reth_beacon_consensus::BeaconConsensusEngineError; use std::{ future::Future, pin::Pin, task::{ready, Context, Poll}, }; -use tokio::sync::oneshot; /// A Future which resolves when the node exits -#[derive(Debug)] +#[allow(missing_debug_implementations)] pub struct NodeExitFuture { - /// The receiver half of the channel for the consensus engine. - /// This can be used to wait for the consensus engine to exit. - consensus_engine_rx: Option>>, + /// The consensus engine future. + /// This can be polled to wait for the consensus engine to exit. + consensus_engine_fut: Option>>>>, /// Flag indicating whether the node should be terminated after the pipeline sync. terminate: bool, @@ -23,10 +21,10 @@ pub struct NodeExitFuture { impl NodeExitFuture { /// Create a new `NodeExitFuture`. pub const fn new( - consensus_engine_rx: oneshot::Receiver>, + consensus_engine_fut: Pin>>>, terminate: bool, ) -> Self { - Self { consensus_engine_rx: Some(consensus_engine_rx), terminate } + Self { consensus_engine_fut: Some(consensus_engine_fut), terminate } } } @@ -35,18 +33,17 @@ impl Future for NodeExitFuture { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - if let Some(rx) = this.consensus_engine_rx.as_mut() { + if let Some(rx) = this.consensus_engine_fut.as_mut() { match ready!(rx.poll_unpin(cx)) { - Ok(res) => { - this.consensus_engine_rx.take(); - res?; + Ok(_) => { + this.consensus_engine_fut.take(); if this.terminate { Poll::Ready(Ok(())) } else { Poll::Pending } } - Err(err) => Poll::Ready(Err(err.into())), + Err(err) => Poll::Ready(Err(err)), } } else { Poll::Pending @@ -61,11 +58,9 @@ mod tests { #[tokio::test] async fn test_node_exit_future_terminate_true() { - let (tx, rx) = oneshot::channel::>(); + let fut = Box::pin(async { Ok(()) }); - let _ = tx.send(Ok(())); - - let node_exit_future = NodeExitFuture::new(rx, true); + let node_exit_future = NodeExitFuture::new(fut, true); let res = node_exit_future.await; @@ -74,11 +69,9 @@ mod tests { #[tokio::test] async fn test_node_exit_future_terminate_false() { - let (tx, rx) = oneshot::channel::>(); - - let _ = tx.send(Ok(())); + let fut = Box::pin(async { Ok(()) }); - let mut node_exit_future = NodeExitFuture::new(rx, false); + let mut node_exit_future = NodeExitFuture::new(fut, false); poll_fn(|cx| { assert!(node_exit_future.poll_unpin(cx).is_pending()); Poll::Ready(()) diff --git a/crates/node/builder/src/handle.rs b/crates/node/builder/src/handle.rs index cbdce0c8b59f..fa070b0b0ebe 100644 --- a/crates/node/builder/src/handle.rs +++ b/crates/node/builder/src/handle.rs @@ -1,10 +1,10 @@ use crate::node::FullNode; use reth_node_api::FullNodeComponents; use reth_node_core::exit::NodeExitFuture; -use std::fmt; /// A Handle to the launched node. #[must_use = "Needs to await the node exit future"] +#[allow(missing_debug_implementations)] pub struct NodeHandle { /// All node components. pub node: FullNode, @@ -18,12 +18,3 @@ impl NodeHandle { self.node_exit_future.await } } - -impl fmt::Debug for NodeHandle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("NodeHandle") - .field("node", &"...") - .field("node_exit_future", &self.node_exit_future) - .finish() - } -} diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 532e87fecba5..7b71a72c76b8 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -389,7 +389,10 @@ where on_node_started.on_event(full_node.clone())?; let handle = NodeHandle { - node_exit_future: NodeExitFuture::new(rx, full_node.config.debug.terminate), + node_exit_future: NodeExitFuture::new( + Box::pin(async { rx.await?.map_err(Into::into) }), + full_node.config.debug.terminate, + ), node: full_node, }; From 88ec2634fa3b98c90e8d4de64d19e2fdc958fb01 Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Fri, 28 Jun 2024 09:24:35 +0200 Subject: [PATCH 2/4] answer comments --- Cargo.lock | 1 - crates/node-core/Cargo.toml | 2 -- crates/node-core/src/exit.rs | 29 ++++++++++++++++++--------- crates/node/builder/src/handle.rs | 11 +++++++++- crates/node/builder/src/launch/mod.rs | 2 +- 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 326850d6eb8b..d8237a516c73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7587,7 +7587,6 @@ dependencies = [ "procfs", "proptest", "rand 0.8.5", - "reth-beacon-consensus", "reth-chainspec", "reth-config", "reth-consensus-common", diff --git a/crates/node-core/Cargo.toml b/crates/node-core/Cargo.toml index cc29e3d2acf2..db72a5e00ee7 100644 --- a/crates/node-core/Cargo.toml +++ b/crates/node-core/Cargo.toml @@ -36,7 +36,6 @@ reth-net-nat.workspace = true reth-network-peers.workspace = true reth-tasks.workspace = true reth-consensus-common.workspace = true -reth-beacon-consensus = { workspace = true, optional = true } reth-prune-types.workspace = true reth-stages-types.workspace = true @@ -102,7 +101,6 @@ optimism = [ "reth-primitives/optimism", "reth-provider/optimism", "reth-rpc-types-compat/optimism", - "reth-beacon-consensus/optimism", "reth-rpc-eth-api/optimism", "reth-rpc-eth-types/optimism" ] diff --git a/crates/node-core/src/exit.rs b/crates/node-core/src/exit.rs index 26a5f1cc992d..5dc6e5638d80 100644 --- a/crates/node-core/src/exit.rs +++ b/crates/node-core/src/exit.rs @@ -1,30 +1,39 @@ //! Helper types for waiting for the node to exit. -use futures::FutureExt; +use futures::{future::BoxFuture, FutureExt}; use std::{ + fmt, future::Future, pin::Pin, task::{ready, Context, Poll}, }; /// A Future which resolves when the node exits -#[allow(missing_debug_implementations)] pub struct NodeExitFuture { /// The consensus engine future. /// This can be polled to wait for the consensus engine to exit. - consensus_engine_fut: Option>>>>, + consensus_engine_fut: Option>>, /// Flag indicating whether the node should be terminated after the pipeline sync. terminate: bool, } +impl fmt::Debug for NodeExitFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NodeExitFuture") + .field("consensus_engine_fut", &"...") + .field("terminate", &self.terminate) + .finish() + } +} + impl NodeExitFuture { /// Create a new `NodeExitFuture`. - pub const fn new( - consensus_engine_fut: Pin>>>, - terminate: bool, - ) -> Self { - Self { consensus_engine_fut: Some(consensus_engine_fut), terminate } + pub fn new(consensus_engine_fut: F, terminate: bool) -> Self + where + F: Future> + 'static + Send, + { + Self { consensus_engine_fut: Some(Box::pin(consensus_engine_fut)), terminate } } } @@ -58,7 +67,7 @@ mod tests { #[tokio::test] async fn test_node_exit_future_terminate_true() { - let fut = Box::pin(async { Ok(()) }); + let fut = async { Ok(()) }; let node_exit_future = NodeExitFuture::new(fut, true); @@ -69,7 +78,7 @@ mod tests { #[tokio::test] async fn test_node_exit_future_terminate_false() { - let fut = Box::pin(async { Ok(()) }); + let fut = async { Ok(()) }; let mut node_exit_future = NodeExitFuture::new(fut, false); poll_fn(|cx| { diff --git a/crates/node/builder/src/handle.rs b/crates/node/builder/src/handle.rs index fa070b0b0ebe..cbdce0c8b59f 100644 --- a/crates/node/builder/src/handle.rs +++ b/crates/node/builder/src/handle.rs @@ -1,10 +1,10 @@ use crate::node::FullNode; use reth_node_api::FullNodeComponents; use reth_node_core::exit::NodeExitFuture; +use std::fmt; /// A Handle to the launched node. #[must_use = "Needs to await the node exit future"] -#[allow(missing_debug_implementations)] pub struct NodeHandle { /// All node components. pub node: FullNode, @@ -18,3 +18,12 @@ impl NodeHandle { self.node_exit_future.await } } + +impl fmt::Debug for NodeHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NodeHandle") + .field("node", &"...") + .field("node_exit_future", &self.node_exit_future) + .finish() + } +} diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 7b71a72c76b8..77aff046fd12 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -390,7 +390,7 @@ where let handle = NodeHandle { node_exit_future: NodeExitFuture::new( - Box::pin(async { rx.await?.map_err(Into::into) }), + async { rx.await?.map_err(Into::into) }, full_node.config.debug.terminate, ), node: full_node, From e64ca8d38a0a0857b8af4ed0f707dda5d80de83b Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Fri, 28 Jun 2024 09:54:26 +0200 Subject: [PATCH 3/4] lint --- crates/net/eth-wire/src/p2pstream.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index 23f106da9a42..aa8770d058c6 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -478,11 +478,10 @@ where // // It's possible we already tried to RLP decode this, but it was snappy // compressed, so we need to RLP decode it again. - let reason = DisconnectReason::decode(&mut &decompress_buf[1..]).map_err(|err| { + let reason = DisconnectReason::decode(&mut &decompress_buf[1..]).inspect_err(|err| { debug!( %err, msg=%hex::encode(&decompress_buf[1..]), "Failed to decode disconnect message from peer" ); - err })?; return Poll::Ready(Some(Err(P2PStreamError::Disconnected(reason)))) } From 1e07c4add6c1234be05c33b2ac436f16e63d4ae4 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 28 Jun 2024 10:00:52 +0200 Subject: [PATCH 4/4] smol touchup --- crates/node/builder/src/launch/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 77aff046fd12..99dc04310d82 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -390,7 +390,7 @@ where let handle = NodeHandle { node_exit_future: NodeExitFuture::new( - async { rx.await?.map_err(Into::into) }, + async { Ok(rx.await??) }, full_node.config.debug.terminate, ), node: full_node,