Skip to content

Commit

Permalink
chore: convert connection type to stream (#5384)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Nov 11, 2023
1 parent 55db2dc commit e8729dd
Showing 1 changed file with 9 additions and 55 deletions.
64 changes: 9 additions & 55 deletions crates/net/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use reth_primitives::BytesMut;
use reth_rpc_types::PeerId;
use std::{
fmt,
future::Future,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc::UnboundedSender;

use tokio_stream::wrappers::UnboundedReceiverStream;

/// A trait that allows to offer additional RLPx-based application-level protocols when establishing
Expand Down Expand Up @@ -42,10 +41,12 @@ pub trait ProtocolHandler: fmt::Debug + Send + Sync + 'static {

/// A trait that allows to authenticate a protocol after the RLPx connection was established.
pub trait ConnectionHandler: Send + Sync + 'static {
/// The future that handles the connection
type Connection: Future<Output = ()> + Send + 'static;
/// The connection that yields messages to send to the remote.
///
/// The connection will be closed when this stream resolves.
type Connection: Stream<Item = BytesMut> + Send + 'static;

/// Returns the protocols to announce when the RLPx connection will be established.
/// Returns the protocol to announce when the RLPx connection will be established.
///
/// This will be negotiated with the remote peer.
fn protocol(&self) -> Protocol;
Expand Down Expand Up @@ -80,27 +81,12 @@ pub enum OnNotSupported {
Disconnect,
}

/// A connection channel to send and receive messages for a specific protocols.
/// A connection channel to receive messages for the negotiated protocol.
///
/// This is a [Stream] that returns raw bytes of the received messages for this protocol.
#[derive(Debug)]
pub struct ProtocolConnection {
from_wire: UnboundedReceiverStream<BytesMut>,
to_wire: UnboundedSender<ProtocolMessage>,
}

impl ProtocolConnection {
/// Sends a message to the remote.
///
/// Returns an error if the connection has been disconnected.
pub fn send(&self, msg: BytesMut) {
self.to_wire.send(ProtocolMessage::Message(msg)).ok();
}

/// Disconnects the connection.
pub fn disconnect(&self) {
let _ = self.to_wire.send(ProtocolMessage::Disconnect);
}
}

impl Stream for ProtocolConnection {
Expand All @@ -111,38 +97,6 @@ impl Stream for ProtocolConnection {
}
}

/// Messages that can be sent from a protocol connection
#[derive(Debug)]
pub(crate) enum ProtocolMessage {
/// New message to send to the remote.
Message(BytesMut),
/// Disconnect the connection.
Disconnect,
}

/// Errors that can occur when handling a protocol.
#[derive(Debug, thiserror::Error)]
pub enum ProtocolError {
/// custom error message
#[error("{0}")]
Message(String),
/// Ayn other error
#[error(transparent)]
Other(Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl ProtocolError {
/// Creates a new error with the given message.
pub fn msg(msg: impl fmt::Display) -> Self {
ProtocolError::Message(msg.to_string())
}

/// Wraps the given error in a `ProtocolError`.
pub fn new(err: impl std::error::Error + Send + Sync + 'static) -> Self {
ProtocolError::Other(Box::new(err))
}
}

/// A wrapper type for a RLPx sub-protocol.
#[derive(Debug)]
pub struct RlpxSubProtocol(Box<dyn DynProtocolHandler>);
Expand Down Expand Up @@ -221,7 +175,7 @@ pub(crate) trait DynConnectionHandler: Send + Sync + 'static {
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>>;
}

impl<T> DynConnectionHandler for T
Expand All @@ -246,7 +200,7 @@ where
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>> {
Box::pin(T::into_connection(self, direction, peer_id, conn))
}
}

0 comments on commit e8729dd

Please sign in to comment.