Skip to content

Commit

Permalink
ExEx Discv5 (#8873)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
loocapro and mattsse authored Jun 25, 2024
1 parent a3a472a commit e1af75d
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 0 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ to make a PR!
| [Minimal](./exex/minimal) | Illustrates how to build a simple ExEx |
| [OP Bridge](./exex/op-bridge) | Illustrates an ExEx that decodes Optimism deposit and withdrawal receipts from L1 |
| [Rollup](./exex/rollup) | Illustrates a rollup ExEx that derives the state from L1 |
| [Discv5 as ExEx](./exex/discv5) | Illustrates an ExEx that runs discv5 discovery stack |

## RPC

Expand Down
33 changes: 33 additions & 0 deletions examples/exex/discv5/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "example-exex-discv5"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true

[dependencies]
discv5.workspace = true
enr.workspace = true

reth-discv5.workspace = true
reth.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-network-peers.workspace = true
reth-tracing.workspace = true
futures.workspace = true

clap.workspace = true
reth-chainspec.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
futures-util.workspace = true

tracing.workspace = true
eyre.workspace = true

[dev-dependencies]
reth-exex-test-utils.workspace = true
reth-testing-utils.workspace = true
70 changes: 70 additions & 0 deletions examples/exex/discv5/src/exex/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use eyre::Result;
use futures::{Future, FutureExt};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::info;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tracing::error;

use crate::network::DiscV5ExEx;

/// The ExEx struct, representing the initialization and execution of the ExEx.
pub struct ExEx<Node: FullNodeComponents> {
exex: ExExContext<Node>,
disc_v5: DiscV5ExEx,
}

impl<Node: FullNodeComponents> ExEx<Node> {
pub fn new(exex: ExExContext<Node>, disc_v5: DiscV5ExEx) -> Self {
Self { exex, disc_v5 }
}
}

impl<Node: FullNodeComponents> Future for ExEx<Node> {
type Output = Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll the Discv5 future until its drained
loop {
match self.disc_v5.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
info!("Discv5 task completed successfully");
}
Poll::Ready(Err(e)) => {
error!(?e, "Discv5 task encountered an error");
return Poll::Ready(Err(e));
}
Poll::Pending => {
// Exit match and continue to poll notifications
break;
}
}
}

// Continuously poll the ExExContext notifications
loop {
if let Some(notification) = ready!(self.exex.notifications.poll_recv(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
}

if let Some(committed_chain) = notification.committed_chain() {
self.exex
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
}
}
}
29 changes: 29 additions & 0 deletions examples/exex/discv5/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use clap::Parser;

use exex::ExEx;
use network::{cli_ext::Discv5ArgsExt, DiscV5ExEx};
use reth_node_ethereum::EthereumNode;

mod exex;
mod network;

fn main() -> eyre::Result<()> {
reth::cli::Cli::<Discv5ArgsExt>::parse().run(|builder, args| async move {
let tcp_port = args.tcp_port;
let udp_port = args.udp_port;

let handle = builder
.node(EthereumNode::default())
.install_exex("exex-discv5", move |ctx| async move {
// start Discv5 task
let disc_v5 = DiscV5ExEx::new(tcp_port, udp_port).await?;

// start exex task with discv5
Ok(ExEx::new(ctx, disc_v5))
})
.launch()
.await?;

handle.wait_for_node_exit().await
})
}
15 changes: 15 additions & 0 deletions examples/exex/discv5/src/network/cli_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use clap::Args;

pub const DEFAULT_DISCOVERY_PORT: u16 = 30304;
pub const DEFAULT_RLPX_PORT: u16 = 30303;

#[derive(Debug, Clone, Args)]
pub(crate) struct Discv5ArgsExt {
/// TCP port used by RLPx
#[clap(long = "exex-discv5.tcp-port", default_value_t = DEFAULT_RLPX_PORT)]
pub tcp_port: u16,

/// UDP port used for discovery
#[clap(long = "exex-discv5.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)]
pub udp_port: u16,
}
123 changes: 123 additions & 0 deletions examples/exex/discv5/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#![allow(dead_code)]

use discv5::{enr::secp256k1::rand, Enr, Event, ListenConfig};
use reth::network::config::SecretKey;
use reth_chainspec::net::NodeRecord;
use reth_discv5::{enr::EnrCombinedKeyWrapper, Config, Discv5};
use reth_tracing::tracing::info;
use std::{
future::Future,
net::SocketAddr,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc;

pub(crate) mod cli_ext;

/// Helper struct to manage a discovery node using discv5.
pub(crate) struct DiscV5ExEx {
/// The inner discv5 instance.
inner: Discv5,
/// The node record of the discv5 instance.
node_record: NodeRecord,
/// The events stream of the discv5 instance.
events: mpsc::Receiver<discv5::Event>,
}

impl DiscV5ExEx {
/// Starts a new discv5 node.
pub async fn new(udp_port: u16, tcp_port: u16) -> eyre::Result<DiscV5ExEx> {
let secret_key = SecretKey::new(&mut rand::thread_rng());

let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port}").parse()?;
let rlpx_addr: SocketAddr = format!("127.0.0.1:{tcp_port}").parse()?;

let discv5_listen_config = ListenConfig::from(discv5_addr);
let discv5_config = Config::builder(rlpx_addr)
.discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
.build();

let (discv5, events, node_record) = Discv5::start(&secret_key, discv5_config).await?;
Ok(Self { inner: discv5, events, node_record })
}

/// Adds a node to the table if its not already present.
pub fn add_node(&mut self, enr: Enr) -> eyre::Result<()> {
let reth_enr: enr::Enr<SecretKey> = EnrCombinedKeyWrapper(enr.clone()).into();
self.inner.add_node(reth_enr)?;
Ok(())
}

/// Returns the local ENR of the discv5 node.
pub fn local_enr(&self) -> Enr {
self.inner.with_discv5(|discv5| discv5.local_enr())
}
}

impl Future for DiscV5ExEx {
type Output = eyre::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
loop {
match ready!(this.events.poll_recv(cx)) {
Some(evt) => {
if let Event::SessionEstablished(enr, socket_addr) = evt {
info!(?enr, ?socket_addr, "Session established with a new peer.");
}
}
None => return Poll::Ready(Ok(())),
}
}
}
}

#[cfg(test)]
mod tests {
use crate::network::DiscV5ExEx;
use tracing::info;

#[tokio::test]
async fn can_establish_discv5_session_with_peer() {
reth_tracing::init_test_tracing();
let mut node_1 = DiscV5ExEx::new(30301, 30303).await.unwrap();
let node_1_enr = node_1.local_enr();

let mut node_2 = DiscV5ExEx::new(30302, 30303).await.unwrap();

let node_2_enr = node_2.local_enr();

info!(?node_1_enr, ?node_2_enr, "Started discovery nodes.");

// add node_2 to node_1 table
node_1.add_node(node_2_enr.clone()).unwrap();

// verify node_2 is in node_1 table
assert!(node_1
.inner
.with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id())));

// send ping from node_1 to node_2
node_1.inner.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap();

// verify they both established a session
let event_2_v5 = node_2.events.recv().await.unwrap();
let event_1_v5 = node_1.events.recv().await.unwrap();
assert!(matches!(
event_1_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
));
assert!(matches!(
event_2_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into()
));

// verify node_1 is in
let event_2_v5 = node_2.events.recv().await.unwrap();
assert!(matches!(
event_2_v5,
discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
));
}
}

0 comments on commit e1af75d

Please sign in to comment.