Skip to content

Commit

Permalink
correcting cargo run command
Browse files Browse the repository at this point in the history
  • Loading branch information
loocapro committed Jun 26, 2024
1 parent 3d2b933 commit 32b12c6
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 22 deletions.
7 changes: 4 additions & 3 deletions examples/custom-rlpx-subprotocol/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Run with
//!
//! ```not_rust
//! cargo run -p custom-rlpx-subprotocol -- node
//! cargo run -p example-custom-rlpx-subprotocol -- node
//! ```
//!
//! This launch a regular reth node with a custom rlpx subprotocol.
Expand Down Expand Up @@ -47,6 +47,7 @@ fn main() -> eyre::Result<()> {
let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } };
let net_cfg = NetworkConfig::builder(secret_key)
.listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.disable_discovery()
.add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol())
.build(NoopProvider::default());

Expand Down Expand Up @@ -84,14 +85,14 @@ fn main() -> eyre::Result<()> {

// send a ping message from peer0 to peer1
let (tx, rx) = oneshot::channel();
peer0_conn.send(CustomCommand::Message { msg: "hello!".to_owned(), response: tx })?;
peer0_conn.send(CustomCommand::Message { msg: "hello!".to_string(), response: tx })?;
let response = rx.await?;
assert_eq!(response, "hello!");
info!(target:"rlpx-subprotocol", ?response, "New message received");

// send a ping message from peer1 to peer0
let (tx, rx) = oneshot::channel();
peer1_conn.send(CustomCommand::Message { msg: "world!".to_owned(), response: tx })?;
peer1_conn.send(CustomCommand::Message { msg: "world!".to_string(), response: tx })?;
let response = rx.await?;
assert_eq!(response, "world!");
info!(target:"rlpx-subprotocol", ?response, "New message received");
Expand Down
26 changes: 15 additions & 11 deletions examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind};
use futures::{Stream, StreamExt};
use reth_eth_wire::multiplex::ProtocolConnection;
use reth_primitives::BytesMut;
Expand All @@ -8,15 +9,12 @@ use std::{
use tokio::sync::oneshot;
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind};

pub(crate) mod handler;

/// We define some custom commands that the subprotocol supports.
pub(crate) enum CustomCommand {
/// Sends a message to the peer
Message {
#[allow(dead_code)]
msg: String,
/// The response will be sent to this channel.
response: oneshot::Sender<String>,
Expand All @@ -37,36 +35,42 @@ impl Stream for CustomRlpxConnection {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(initial_ping) = this.initial_ping.take() {
return Poll::Ready(Some(initial_ping.encoded()));
return Poll::Ready(Some(initial_ping.encoded()))
}

loop {
if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) {
return match cmd {
CustomCommand::Message { msg: _, response } => {
CustomCommand::Message { msg, response } => {
this.pending_pong = Some(response);
Poll::Ready(Some(CustomRlpxProtoMessage::ping().encoded()))
Poll::Ready(Some(CustomRlpxProtoMessage::ping_message(msg).encoded()))
}
};
}
}

let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) };

let Some(msg) = CustomRlpxProtoMessage::decode_message(&mut &msg[..]) else {
return Poll::Ready(None);
return Poll::Ready(None)
};

match msg.message {
CustomRlpxProtoMessageKind::Ping => {
return Poll::Ready(Some(CustomRlpxProtoMessage::pong().encoded()))
}
CustomRlpxProtoMessageKind::Pong => {}
CustomRlpxProtoMessageKind::CustomMessage(msg) => {
CustomRlpxProtoMessageKind::PingMessage(msg) => {
return Poll::Ready(Some(CustomRlpxProtoMessage::pong_message(msg).encoded()))
}
CustomRlpxProtoMessageKind::PongMessage(msg) => {
if let Some(sender) = this.pending_pong.take() {
sender.send(msg).ok();
}
continue;
continue
}
}
return Poll::Pending;

return Poll::Pending
}
}
}
38 changes: 30 additions & 8 deletions examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ use reth_primitives::{Buf, BufMut, BytesMut};
pub(crate) enum CustomRlpxProtoMessageId {
Ping = 0x00,
Pong = 0x01,
CustomMessage = 0x02,
PingMessage = 0x02,
PongMessage = 0x03,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum CustomRlpxProtoMessageKind {
Ping,
Pong,
CustomMessage(String),
PingMessage(String),
PongMessage(String),
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -33,7 +35,22 @@ impl CustomRlpxProtoMessage {

/// Returns the protocol for the `custom_rlpx` protocol.
pub fn protocol() -> Protocol {
Protocol::new(Self::capability(), 3)
Protocol::new(Self::capability(), 4)
}

/// Creates a ping message
pub fn ping_message(msg: impl Into<String>) -> Self {
Self {
message_type: CustomRlpxProtoMessageId::PingMessage,
message: CustomRlpxProtoMessageKind::PingMessage(msg.into()),
}
}
/// Creates a ping message
pub fn pong_message(msg: impl Into<String>) -> Self {
Self {
message_type: CustomRlpxProtoMessageId::PongMessage,
message: CustomRlpxProtoMessageKind::PongMessage(msg.into()),
}
}

/// Creates a ping message
Expand All @@ -57,9 +74,9 @@ impl CustomRlpxProtoMessage {
let mut buf = BytesMut::new();
buf.put_u8(self.message_type as u8);
match &self.message {
CustomRlpxProtoMessageKind::Ping => {}
CustomRlpxProtoMessageKind::Pong => {}
CustomRlpxProtoMessageKind::CustomMessage(msg) => {
CustomRlpxProtoMessageKind::Ping | CustomRlpxProtoMessageKind::Pong => {}
CustomRlpxProtoMessageKind::PingMessage(msg) |
CustomRlpxProtoMessageKind::PongMessage(msg) => {
buf.put(msg.as_bytes());
}
}
Expand All @@ -76,16 +93,21 @@ impl CustomRlpxProtoMessage {
let message_type = match id {
0x00 => CustomRlpxProtoMessageId::Ping,
0x01 => CustomRlpxProtoMessageId::Pong,
0x02 => CustomRlpxProtoMessageId::CustomMessage,
0x02 => CustomRlpxProtoMessageId::PingMessage,
0x03 => CustomRlpxProtoMessageId::PongMessage,
_ => return None,
};
let message = match message_type {
CustomRlpxProtoMessageId::Ping => CustomRlpxProtoMessageKind::Ping,
CustomRlpxProtoMessageId::Pong => CustomRlpxProtoMessageKind::Pong,
CustomRlpxProtoMessageId::CustomMessage => CustomRlpxProtoMessageKind::CustomMessage(
CustomRlpxProtoMessageId::PingMessage => CustomRlpxProtoMessageKind::PingMessage(
String::from_utf8_lossy(&buf[..]).into_owned(),
),
CustomRlpxProtoMessageId::PongMessage => CustomRlpxProtoMessageKind::PongMessage(
String::from_utf8_lossy(&buf[..]).into_owned(),
),
};

Some(Self { message_type, message })
}
}

0 comments on commit 32b12c6

Please sign in to comment.