Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExEx Discv5 #8873

Merged
merged 10 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ to make a PR!
| [OP Bridge](./exex/op-bridge) | Illustrates an ExEx that decodes Optimism deposit and withdrawal receipts from L1 |
| [Remote](./exex/remote) | Illustrates an ExEx that emits notifications using a gRPC server, and a consumer that receives them |
| [Rollup](./exex/rollup) | Illustrates a rollup ExEx that derives the state from L1 |
| [Discv5 as ExEx](./exex/discv5) | Illustrates an ExEx that runs discv5 discovery stack |

## RPC

Expand Down
33 changes: 33 additions & 0 deletions examples/exex/discv5/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "example-exex-discv5"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true

[dependencies]
discv5.workspace = true
enr.workspace = true

reth-discv5.workspace = true
reth.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-network-peers.workspace = true
reth-tracing.workspace = true
futures.workspace = true

clap.workspace = true
reth-chainspec.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
futures-util.workspace = true

tracing.workspace = true
eyre.workspace = true

[dev-dependencies]
reth-exex-test-utils.workspace = true
reth-testing-utils.workspace = true
69 changes: 69 additions & 0 deletions examples/exex/discv5/src/exex/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use eyre::Result;
use futures::Future;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::info;

use crate::network::DiscV5ExEx;

/// The ExEx struct, representing the initialization and execution of the ExEx.
pub struct ExEx<Node: FullNodeComponents> {
exex: ExExContext<Node>,
disc_v5: DiscV5ExEx,
}

impl<Node: FullNodeComponents> ExEx<Node> {
pub fn new(exex: ExExContext<Node>, disc_v5: DiscV5ExEx) -> Self {
Self { exex, disc_v5 }
}
}

impl<Node: FullNodeComponents> Future for ExEx<Node> {
type Output = Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll the Discv5 future
match Pin::new(&mut self.disc_v5).poll(cx) {
Poll::Ready(Ok(())) => {
info!("Discv5 task completed successfully");
}
Poll::Ready(Err(e)) => {
info!(error = ?e, "Discv5 task encountered an error");
return Poll::Ready(Err(e));
}
Poll::Pending => {
// If the future is still pending, we yield control back to the executor
return Poll::Pending;
}
}

// Continuously poll the ExExContext notifications
while let Poll::Ready(Some(notification)) =
Pin::new(&mut self.exex.notifications).poll_recv(cx)
{
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
}

if let Some(committed_chain) = notification.committed_chain() {
self.exex.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}

// If there are no more notifications and disc_v5 is not yet ready, return Poll::Pending
Poll::Pending
}
}
29 changes: 29 additions & 0 deletions examples/exex/discv5/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use clap::Parser;

use exex::ExEx;
use network::{cli_ext::Discv5ArgsExt, DiscV5ExEx};
use reth_node_ethereum::EthereumNode;

mod exex;
mod network;

fn main() -> eyre::Result<()> {
reth::cli::Cli::<Discv5ArgsExt>::parse().run(|builder, args| async move {
let tcp_port = args.tcp_port;
let udp_port = args.udp_port;

let handle = builder
.node(EthereumNode::default())
.install_exex("exex-discv5", move |ctx| async move {
// start Discv5 task
let disc_v5 = DiscV5ExEx::new(tcp_port, udp_port).await?;

// start exex task with discv5
Ok(ExEx::new(ctx, disc_v5))
})
.launch()
.await?;

handle.wait_for_node_exit().await
})
}
15 changes: 15 additions & 0 deletions examples/exex/discv5/src/network/cli_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use clap::Args;

pub const DEFAULT_DISCOVERY_PORT: u16 = 30304;
pub const DEFAULT_RLPX_PORT: u16 = 30303;

#[derive(Debug, Clone, Args)]
pub(crate) struct Discv5ArgsExt {
/// TCP port used by RLPx
#[clap(long = "exex-discv5.tcp-port", default_value_t = DEFAULT_RLPX_PORT)]
pub tcp_port: u16,

/// UDP port used for discovery
#[clap(long = "exex-discv5.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)]
pub udp_port: u16,
}
123 changes: 123 additions & 0 deletions examples/exex/discv5/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#![allow(dead_code)]

use discv5::{enr::secp256k1::rand, Enr, Event, ListenConfig};
use reth::network::config::SecretKey;
use reth_chainspec::net::NodeRecord;
use reth_discv5::{enr::EnrCombinedKeyWrapper, Config, Discv5};
use reth_tracing::tracing::info;
use std::{
future::Future,
net::SocketAddr,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc;

pub(crate) mod cli_ext;

/// Helper struct to manage a discovery node using discv5.
pub(crate) struct DiscV5ExEx {
/// The inner discv5 instance.
inner: Discv5,
/// The node record of the discv5 instance.
node_record: NodeRecord,
/// The events stream of the discv5 instance.
events: mpsc::Receiver<discv5::Event>,
}

impl DiscV5ExEx {
/// Starts a new discv5 node.
pub async fn new(udp_port: u16, tcp_port: u16) -> eyre::Result<DiscV5ExEx> {
let secret_key = SecretKey::new(&mut rand::thread_rng());

let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port}").parse()?;
let rlpx_addr: SocketAddr = format!("127.0.0.1:{tcp_port}").parse()?;

let discv5_listen_config = ListenConfig::from(discv5_addr);
let discv5_config = Config::builder(rlpx_addr)
.discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
.build();

let (discv5, events, node_record) = Discv5::start(&secret_key, discv5_config).await?;
Ok(Self { inner: discv5, events, node_record })
}

/// Adds a node to the table if its not already present.
pub fn add_node(&mut self, enr: Enr) -> eyre::Result<()> {
let reth_enr: enr::Enr<SecretKey> = EnrCombinedKeyWrapper(enr.clone()).into();
self.inner.add_node(reth_enr)?;
Ok(())
}

/// Returns the local ENR of the discv5 node.
pub fn local_enr(&self) -> Enr {
self.inner.with_discv5(|discv5| discv5.local_enr())
}
}

impl Future for DiscV5ExEx {
type Output = eyre::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
loop {
match ready!(this.events.poll_recv(cx)) {
Some(evt) => {
if let Event::SessionEstablished(enr, socket_addr) = evt {
info!(?enr, ?socket_addr, "Session established with a new peer.");
}
}
None => return Poll::Ready(Ok(())),
}
}
}
}

#[cfg(test)]
mod tests {
use crate::network::DiscV5ExEx;
use tracing::info;

#[tokio::test]
async fn can_establish_discv5_session_with_peer() {
reth_tracing::init_test_tracing();
let mut node_1 = DiscV5ExEx::new(30301, 30303).await.unwrap();
let node_1_enr = node_1.local_enr();

let mut node_2 = DiscV5ExEx::new(30302, 30303).await.unwrap();

let node_2_enr = node_2.local_enr();

info!(?node_1_enr, ?node_2_enr, "Started discovery nodes.");

// add node_2 to node_1 table
node_1.add_node(node_2_enr.clone()).unwrap();

// verify node_2 is in node_1 table
assert!(node_1
.inner
.with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id())));

// send ping from node_1 to node_2
node_1.inner.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap();

// verify they both established a session
let event_2_v5 = node_2.events.recv().await.unwrap();
let event_1_v5 = node_1.events.recv().await.unwrap();
assert!(matches!(
event_1_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
));
assert!(matches!(
event_2_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into()
));

// verify node_1 is in
let event_2_v5 = node_2.events.recv().await.unwrap();
assert!(matches!(
event_2_v5,
discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
));
}
}
Loading