diff --git a/Cargo.lock b/Cargo.lock index a92397fea46e..88b7b807c034 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2822,6 +2822,32 @@ dependencies = [ "reth-rpc-types", ] +[[package]] +name = "example-exex-discv5" +version = "0.0.0" +dependencies = [ + "clap", + "discv5", + "enr", + "eyre", + "futures", + "futures-util", + "reth", + "reth-chainspec", + "reth-discv5", + "reth-exex", + "reth-exex-test-utils", + "reth-network-peers", + "reth-node-api", + "reth-node-ethereum", + "reth-testing-utils", + "reth-tracing", + "serde_json", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "example-exex-in-memory-state" version = "0.0.0" diff --git a/examples/README.md b/examples/README.md index 423d9224be67..b24b7387f32d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -30,6 +30,7 @@ to make a PR! | [Minimal](./exex/minimal) | Illustrates how to build a simple ExEx | | [OP Bridge](./exex/op-bridge) | Illustrates an ExEx that decodes Optimism deposit and withdrawal receipts from L1 | | [Rollup](./exex/rollup) | Illustrates a rollup ExEx that derives the state from L1 | +| [Discv5 as ExEx](./exex/discv5) | Illustrates an ExEx that runs discv5 discovery stack | ## RPC diff --git a/examples/exex/discv5/Cargo.toml b/examples/exex/discv5/Cargo.toml new file mode 100644 index 000000000000..b1777cfa1516 --- /dev/null +++ b/examples/exex/discv5/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "example-exex-discv5" +version = "0.0.0" +publish = false +edition.workspace = true +license.workspace = true + +[dependencies] +discv5.workspace = true +enr.workspace = true + +reth-discv5.workspace = true +reth.workspace = true +reth-exex.workspace = true +reth-node-api.workspace = true +reth-node-ethereum.workspace = true +reth-network-peers.workspace = true +reth-tracing.workspace = true +futures.workspace = true + +clap.workspace = true +reth-chainspec.workspace = true +serde_json.workspace = true +tokio.workspace = true +tokio-stream.workspace = true +futures-util.workspace = true + +tracing.workspace = true +eyre.workspace = true + +[dev-dependencies] +reth-exex-test-utils.workspace = true +reth-testing-utils.workspace = true diff --git a/examples/exex/discv5/src/exex/mod.rs b/examples/exex/discv5/src/exex/mod.rs new file mode 100644 index 000000000000..4631f392979c --- /dev/null +++ b/examples/exex/discv5/src/exex/mod.rs @@ -0,0 +1,70 @@ +use eyre::Result; +use futures::{Future, FutureExt}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_node_api::FullNodeComponents; +use reth_tracing::tracing::info; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; +use tracing::error; + +use crate::network::DiscV5ExEx; + +/// The ExEx struct, representing the initialization and execution of the ExEx. +pub struct ExEx { + exex: ExExContext, + disc_v5: DiscV5ExEx, +} + +impl ExEx { + pub fn new(exex: ExExContext, disc_v5: DiscV5ExEx) -> Self { + Self { exex, disc_v5 } + } +} + +impl Future for ExEx { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Poll the Discv5 future until its drained + loop { + match self.disc_v5.poll_unpin(cx) { + Poll::Ready(Ok(())) => { + info!("Discv5 task completed successfully"); + } + Poll::Ready(Err(e)) => { + error!(?e, "Discv5 task encountered an error"); + return Poll::Ready(Err(e)); + } + Poll::Pending => { + // Exit match and continue to poll notifications + break; + } + } + } + + // Continuously poll the ExExContext notifications + loop { + if let Some(notification) = ready!(self.exex.notifications.poll_recv(cx)) { + match ¬ification { + ExExNotification::ChainCommitted { new } => { + info!(committed_chain = ?new.range(), "Received commit"); + } + ExExNotification::ChainReorged { old, new } => { + info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + } + ExExNotification::ChainReverted { old } => { + info!(reverted_chain = ?old.range(), "Received revert"); + } + } + + if let Some(committed_chain) = notification.committed_chain() { + self.exex + .events + .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; + } + } + } + } +} diff --git a/examples/exex/discv5/src/main.rs b/examples/exex/discv5/src/main.rs new file mode 100644 index 000000000000..2374326050b7 --- /dev/null +++ b/examples/exex/discv5/src/main.rs @@ -0,0 +1,29 @@ +use clap::Parser; + +use exex::ExEx; +use network::{cli_ext::Discv5ArgsExt, DiscV5ExEx}; +use reth_node_ethereum::EthereumNode; + +mod exex; +mod network; + +fn main() -> eyre::Result<()> { + reth::cli::Cli::::parse().run(|builder, args| async move { + let tcp_port = args.tcp_port; + let udp_port = args.udp_port; + + let handle = builder + .node(EthereumNode::default()) + .install_exex("exex-discv5", move |ctx| async move { + // start Discv5 task + let disc_v5 = DiscV5ExEx::new(tcp_port, udp_port).await?; + + // start exex task with discv5 + Ok(ExEx::new(ctx, disc_v5)) + }) + .launch() + .await?; + + handle.wait_for_node_exit().await + }) +} diff --git a/examples/exex/discv5/src/network/cli_ext.rs b/examples/exex/discv5/src/network/cli_ext.rs new file mode 100644 index 000000000000..1eb864de3611 --- /dev/null +++ b/examples/exex/discv5/src/network/cli_ext.rs @@ -0,0 +1,15 @@ +use clap::Args; + +pub const DEFAULT_DISCOVERY_PORT: u16 = 30304; +pub const DEFAULT_RLPX_PORT: u16 = 30303; + +#[derive(Debug, Clone, Args)] +pub(crate) struct Discv5ArgsExt { + /// TCP port used by RLPx + #[clap(long = "exex-discv5.tcp-port", default_value_t = DEFAULT_RLPX_PORT)] + pub tcp_port: u16, + + /// UDP port used for discovery + #[clap(long = "exex-discv5.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)] + pub udp_port: u16, +} diff --git a/examples/exex/discv5/src/network/mod.rs b/examples/exex/discv5/src/network/mod.rs new file mode 100644 index 000000000000..41f57bffc478 --- /dev/null +++ b/examples/exex/discv5/src/network/mod.rs @@ -0,0 +1,123 @@ +#![allow(dead_code)] + +use discv5::{enr::secp256k1::rand, Enr, Event, ListenConfig}; +use reth::network::config::SecretKey; +use reth_chainspec::net::NodeRecord; +use reth_discv5::{enr::EnrCombinedKeyWrapper, Config, Discv5}; +use reth_tracing::tracing::info; +use std::{ + future::Future, + net::SocketAddr, + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::sync::mpsc; + +pub(crate) mod cli_ext; + +/// Helper struct to manage a discovery node using discv5. +pub(crate) struct DiscV5ExEx { + /// The inner discv5 instance. + inner: Discv5, + /// The node record of the discv5 instance. + node_record: NodeRecord, + /// The events stream of the discv5 instance. + events: mpsc::Receiver, +} + +impl DiscV5ExEx { + /// Starts a new discv5 node. + pub async fn new(udp_port: u16, tcp_port: u16) -> eyre::Result { + let secret_key = SecretKey::new(&mut rand::thread_rng()); + + let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port}").parse()?; + let rlpx_addr: SocketAddr = format!("127.0.0.1:{tcp_port}").parse()?; + + let discv5_listen_config = ListenConfig::from(discv5_addr); + let discv5_config = Config::builder(rlpx_addr) + .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build()) + .build(); + + let (discv5, events, node_record) = Discv5::start(&secret_key, discv5_config).await?; + Ok(Self { inner: discv5, events, node_record }) + } + + /// Adds a node to the table if its not already present. + pub fn add_node(&mut self, enr: Enr) -> eyre::Result<()> { + let reth_enr: enr::Enr = EnrCombinedKeyWrapper(enr.clone()).into(); + self.inner.add_node(reth_enr)?; + Ok(()) + } + + /// Returns the local ENR of the discv5 node. + pub fn local_enr(&self) -> Enr { + self.inner.with_discv5(|discv5| discv5.local_enr()) + } +} + +impl Future for DiscV5ExEx { + type Output = eyre::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut(); + loop { + match ready!(this.events.poll_recv(cx)) { + Some(evt) => { + if let Event::SessionEstablished(enr, socket_addr) = evt { + info!(?enr, ?socket_addr, "Session established with a new peer."); + } + } + None => return Poll::Ready(Ok(())), + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::network::DiscV5ExEx; + use tracing::info; + + #[tokio::test] + async fn can_establish_discv5_session_with_peer() { + reth_tracing::init_test_tracing(); + let mut node_1 = DiscV5ExEx::new(30301, 30303).await.unwrap(); + let node_1_enr = node_1.local_enr(); + + let mut node_2 = DiscV5ExEx::new(30302, 30303).await.unwrap(); + + let node_2_enr = node_2.local_enr(); + + info!(?node_1_enr, ?node_2_enr, "Started discovery nodes."); + + // add node_2 to node_1 table + node_1.add_node(node_2_enr.clone()).unwrap(); + + // verify node_2 is in node_1 table + assert!(node_1 + .inner + .with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id()))); + + // send ping from node_1 to node_2 + node_1.inner.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap(); + + // verify they both established a session + let event_2_v5 = node_2.events.recv().await.unwrap(); + let event_1_v5 = node_1.events.recv().await.unwrap(); + assert!(matches!( + event_1_v5, + discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into() + )); + assert!(matches!( + event_2_v5, + discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into() + )); + + // verify node_1 is in + let event_2_v5 = node_2.events.recv().await.unwrap(); + assert!(matches!( + event_2_v5, + discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none() + )); + } +}