diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index 5b0543101a82..3a198c38d285 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -3,7 +3,7 @@ //! Run with //! //! ```not_rust -//! cargo run -p custom-rlpx-subprotocol -- node +//! cargo run -p example-custom-rlpx-subprotocol -- node //! ``` //! //! This launch a regular reth node with a custom rlpx subprotocol. @@ -47,6 +47,7 @@ fn main() -> eyre::Result<()> { let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; let net_cfg = NetworkConfig::builder(secret_key) .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) + .disable_discovery() .add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol()) .build(NoopProvider::default()); @@ -84,14 +85,14 @@ fn main() -> eyre::Result<()> { // send a ping message from peer0 to peer1 let (tx, rx) = oneshot::channel(); - peer0_conn.send(CustomCommand::Message { msg: "hello!".to_owned(), response: tx })?; + peer0_conn.send(CustomCommand::Message { msg: "hello!".to_string(), response: tx })?; let response = rx.await?; assert_eq!(response, "hello!"); info!(target:"rlpx-subprotocol", ?response, "New message received"); // send a ping message from peer1 to peer0 let (tx, rx) = oneshot::channel(); - peer1_conn.send(CustomCommand::Message { msg: "world!".to_owned(), response: tx })?; + peer1_conn.send(CustomCommand::Message { msg: "world!".to_string(), response: tx })?; let response = rx.await?; assert_eq!(response, "world!"); info!(target:"rlpx-subprotocol", ?response, "New message received"); diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs index 06b862544566..a6d835b70c26 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs @@ -1,3 +1,4 @@ +use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; use futures::{Stream, StreamExt}; use reth_eth_wire::multiplex::ProtocolConnection; use reth_primitives::BytesMut; @@ -8,15 +9,12 @@ use std::{ use tokio::sync::oneshot; use tokio_stream::wrappers::UnboundedReceiverStream; -use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; - pub(crate) mod handler; /// We define some custom commands that the subprotocol supports. pub(crate) enum CustomCommand { /// Sends a message to the peer Message { - #[allow(dead_code)] msg: String, /// The response will be sent to this channel. response: oneshot::Sender, @@ -37,21 +35,23 @@ impl Stream for CustomRlpxConnection { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); if let Some(initial_ping) = this.initial_ping.take() { - return Poll::Ready(Some(initial_ping.encoded())); + return Poll::Ready(Some(initial_ping.encoded())) } loop { if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { return match cmd { - CustomCommand::Message { msg: _, response } => { + CustomCommand::Message { msg, response } => { this.pending_pong = Some(response); - Poll::Ready(Some(CustomRlpxProtoMessage::ping().encoded())) + Poll::Ready(Some(CustomRlpxProtoMessage::ping_message(msg).encoded())) } - }; + } } + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + let Some(msg) = CustomRlpxProtoMessage::decode_message(&mut &msg[..]) else { - return Poll::Ready(None); + return Poll::Ready(None) }; match msg.message { @@ -59,14 +59,18 @@ impl Stream for CustomRlpxConnection { return Poll::Ready(Some(CustomRlpxProtoMessage::pong().encoded())) } CustomRlpxProtoMessageKind::Pong => {} - CustomRlpxProtoMessageKind::CustomMessage(msg) => { + CustomRlpxProtoMessageKind::PingMessage(msg) => { + return Poll::Ready(Some(CustomRlpxProtoMessage::pong_message(msg).encoded())) + } + CustomRlpxProtoMessageKind::PongMessage(msg) => { if let Some(sender) = this.pending_pong.take() { sender.send(msg).ok(); } - continue; + continue } } - return Poll::Pending; + + return Poll::Pending } } } diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs index 63e09fd40c47..8b179a447d9f 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs @@ -9,14 +9,16 @@ use reth_primitives::{Buf, BufMut, BytesMut}; pub(crate) enum CustomRlpxProtoMessageId { Ping = 0x00, Pong = 0x01, - CustomMessage = 0x02, + PingMessage = 0x02, + PongMessage = 0x03, } #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum CustomRlpxProtoMessageKind { Ping, Pong, - CustomMessage(String), + PingMessage(String), + PongMessage(String), } #[derive(Clone, Debug, PartialEq, Eq)] @@ -33,7 +35,22 @@ impl CustomRlpxProtoMessage { /// Returns the protocol for the `custom_rlpx` protocol. pub fn protocol() -> Protocol { - Protocol::new(Self::capability(), 3) + Protocol::new(Self::capability(), 4) + } + + /// Creates a ping message + pub fn ping_message(msg: impl Into) -> Self { + Self { + message_type: CustomRlpxProtoMessageId::PingMessage, + message: CustomRlpxProtoMessageKind::PingMessage(msg.into()), + } + } + /// Creates a ping message + pub fn pong_message(msg: impl Into) -> Self { + Self { + message_type: CustomRlpxProtoMessageId::PongMessage, + message: CustomRlpxProtoMessageKind::PongMessage(msg.into()), + } } /// Creates a ping message @@ -57,9 +74,9 @@ impl CustomRlpxProtoMessage { let mut buf = BytesMut::new(); buf.put_u8(self.message_type as u8); match &self.message { - CustomRlpxProtoMessageKind::Ping => {} - CustomRlpxProtoMessageKind::Pong => {} - CustomRlpxProtoMessageKind::CustomMessage(msg) => { + CustomRlpxProtoMessageKind::Ping | CustomRlpxProtoMessageKind::Pong => {} + CustomRlpxProtoMessageKind::PingMessage(msg) | + CustomRlpxProtoMessageKind::PongMessage(msg) => { buf.put(msg.as_bytes()); } } @@ -76,16 +93,21 @@ impl CustomRlpxProtoMessage { let message_type = match id { 0x00 => CustomRlpxProtoMessageId::Ping, 0x01 => CustomRlpxProtoMessageId::Pong, - 0x02 => CustomRlpxProtoMessageId::CustomMessage, + 0x02 => CustomRlpxProtoMessageId::PingMessage, + 0x03 => CustomRlpxProtoMessageId::PongMessage, _ => return None, }; let message = match message_type { CustomRlpxProtoMessageId::Ping => CustomRlpxProtoMessageKind::Ping, CustomRlpxProtoMessageId::Pong => CustomRlpxProtoMessageKind::Pong, - CustomRlpxProtoMessageId::CustomMessage => CustomRlpxProtoMessageKind::CustomMessage( + CustomRlpxProtoMessageId::PingMessage => CustomRlpxProtoMessageKind::PingMessage( + String::from_utf8_lossy(&buf[..]).into_owned(), + ), + CustomRlpxProtoMessageId::PongMessage => CustomRlpxProtoMessageKind::PongMessage( String::from_utf8_lossy(&buf[..]).into_owned(), ), }; + Some(Self { message_type, message }) } }