diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index de1c3014ec4..b781efe8b27 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -44,7 +44,7 @@ use libp2p::{ noise, swarm::NetworkBehaviourEventProcess, tcp::TcpConfig, - yamux::Config as YamuxConfig, + yamux::YamuxConfig, Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport, }; use std::{ diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index 0bde02c141c..63541349f22 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,5 +1,9 @@ # 0.24.0 [unreleased] +- Tweak the naming in the `MplexConfig` API for better + consistency with `libp2p-yamux`. + [PR 1822](https://github.com/libp2p/rust-libp2p/pull/1822). + - Update dependencies. # 0.23.1 [2020-10-28] diff --git a/muxers/mplex/src/config.rs b/muxers/mplex/src/config.rs index e5f9d8079ba..fe615ba48d5 100644 --- a/muxers/mplex/src/config.rs +++ b/muxers/mplex/src/config.rs @@ -55,7 +55,7 @@ impl MplexConfig { /// accumulates too quickly (judged by internal bounds), the /// connection is closed with an error due to the misbehaved /// remote. - pub fn max_substreams(&mut self, max: usize) -> &mut Self { + pub fn set_max_num_streams(&mut self, max: usize) -> &mut Self { self.max_substreams = max; self } @@ -63,7 +63,7 @@ impl MplexConfig { /// Sets the maximum number of frames buffered per substream. /// /// A limit is necessary in order to avoid DoS attacks. - pub fn max_buffer_len(&mut self, max: usize) -> &mut Self { + pub fn set_max_buffer_size(&mut self, max: usize) -> &mut Self { self.max_buffer_len = max; self } @@ -72,14 +72,14 @@ impl MplexConfig { /// for a substream. /// /// See the documentation of [`MaxBufferBehaviour`]. - pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self { + pub fn set_max_buffer_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self { self.max_buffer_behaviour = behaviour; self } /// Sets the frame size used when sending data. Capped at 1Mbyte as per the /// Mplex spec. - pub fn split_send_size(&mut self, size: usize) -> &mut Self { + pub fn set_split_send_size(&mut self, size: usize) -> &mut Self { let size = cmp::min(size, MAX_FRAME_SIZE); self.split_send_size = size; self diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 15a50e52f2e..aec332cab9a 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,5 +1,9 @@ # 0.27.0 [unreleased] +- Tweak the naming in the `MplexConfig` API for better + consistency with `libp2p-mplex`. + [PR 1822](https://github.com/libp2p/rust-libp2p/pull/1822). + - Update dependencies. # 0.26.0 [2020-10-16] diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index c7f093ef69f..0edadb0ebb0 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -22,18 +22,13 @@ //! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md). use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}}; -use libp2p_core::muxing::StreamMuxerEvent; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use parking_lot::Mutex; -use std::{fmt, io, iter, ops::{Deref, DerefMut}, pin::Pin, task::Context}; +use std::{fmt, io, iter, pin::Pin, task::{Context, Poll}}; use thiserror::Error; -pub use yamux::{Mode, WindowUpdateMode}; - /// A Yamux connection. -/// -/// This implementation isn't capable of detecting when the underlying socket changes its address, -/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted. pub struct Yamux(Mutex>); impl fmt::Debug for Yamux { @@ -58,8 +53,7 @@ where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { /// Create a new Yamux connection. - pub fn new(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self { - cfg.set_read_after_close(false); + fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); let inner = Inner { @@ -78,8 +72,7 @@ where C: AsyncRead + AsyncWrite + Unpin + 'static { /// Create a new Yamux connection (which is ![`Send`]). - pub fn local(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self { - cfg.set_read_after_close(false); + fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); let inner = Inner { @@ -93,9 +86,10 @@ where } } -type Poll = std::task::Poll>; +pub type YamuxResult = Result; -impl libp2p_core::StreamMuxer for Yamux +/// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events. +impl StreamMuxer for Yamux where S: Stream> + Unpin { @@ -103,7 +97,9 @@ where type OutboundSubstream = OpenSubstreamToken; type Error = YamuxError; - fn poll_event(&self, c: &mut Context<'_>) -> Poll> { + fn poll_event(&self, c: &mut Context<'_>) + -> Poll>> + { let mut inner = self.0.lock(); match ready!(inner.incoming.poll_next_unpin(c)) { Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), @@ -116,7 +112,9 @@ where OpenSubstreamToken(()) } - fn poll_outbound(&self, c: &mut Context<'_>, _: &mut OpenSubstreamToken) -> Poll { + fn poll_outbound(&self, c: &mut Context<'_>, _: &mut OpenSubstreamToken) + -> Poll> + { let mut inner = self.0.lock(); Pin::new(&mut inner.control).poll_open_stream(c).map_err(YamuxError) } @@ -125,25 +123,33 @@ where self.0.lock().control.abort_open_stream() } - fn read_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &mut [u8]) -> Poll { + fn read_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &mut [u8]) + -> Poll> + { Pin::new(s).poll_read(c, b).map_err(|e| YamuxError(e.into())) } - fn write_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &[u8]) -> Poll { + fn write_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &[u8]) + -> Poll> + { Pin::new(s).poll_write(c, b).map_err(|e| YamuxError(e.into())) } - fn flush_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream) -> Poll<()> { + fn flush_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream) + -> Poll> + { Pin::new(s).poll_flush(c).map_err(|e| YamuxError(e.into())) } - fn shutdown_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream) -> Poll<()> { + fn shutdown_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream) + -> Poll> + { Pin::new(s).poll_close(c).map_err(|e| YamuxError(e.into())) } fn destroy_substream(&self, _: Self::Substream) { } - fn close(&self, c: &mut Context<'_>) -> Poll<()> { + fn close(&self, c: &mut Context<'_>) -> Poll> { let mut inner = self.0.lock(); if let std::task::Poll::Ready(x) = Pin::new(&mut inner.control).poll_close(c) { return Poll::Ready(x.map_err(YamuxError)) @@ -158,62 +164,122 @@ where Poll::Pending } - fn flush_all(&self, _: &mut Context<'_>) -> Poll<()> { + fn flush_all(&self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } /// The yamux configuration. #[derive(Clone)] -pub struct Config { - config: yamux::Config, +pub struct YamuxConfig { + inner: yamux::Config, mode: Option } +/// The window update mode determines when window updates are +/// sent to the remote, giving it new credit to send more data. +pub struct WindowUpdateMode(yamux::WindowUpdateMode); + +impl WindowUpdateMode { + /// The window update mode whereby the remote is given + /// new credit via a window update whenever the current + /// receive window is exhausted when data is received, + /// i.e. this mode cannot exert back-pressure from application + /// code that is slow to read from a substream. + /// + /// > **Note**: The receive buffer may overflow with this + /// > strategy if the receiver is too slow in reading the + /// > data from the buffer. The maximum receive buffer + /// > size must be tuned appropriately for the desired + /// > throughput and level of tolerance for (temporarily) + /// > slow receivers. + pub fn on_receive() -> Self { + WindowUpdateMode(yamux::WindowUpdateMode::OnReceive) + } + + /// The window update mode whereby the remote is given new + /// credit only when the current receive window is exhausted + /// when data is read from the substream's receive buffer, + /// i.e. application code that is slow to read from a substream + /// exerts back-pressure on the remote. + /// + /// > **Note**: If the receive window of a substream on + /// > both peers is exhausted and both peers are blocked on + /// > sending data before reading from the stream, a deadlock + /// > occurs. To avoid this situation, reading from a substream + /// > should never be blocked on writing to the same substream. + /// + /// > **Note**: With this strategy, there is usually no point in the + /// > receive buffer being larger than the window size. + pub fn on_read() -> Self { + WindowUpdateMode(yamux::WindowUpdateMode::OnRead) + } +} + /// The yamux configuration for upgrading I/O resources which are ![`Send`]. #[derive(Clone)] -pub struct LocalConfig(Config); +pub struct YamuxLocalConfig(YamuxConfig); -impl Config { - pub fn new(cfg: yamux::Config) -> Self { - Config { config: cfg, mode: None } +impl YamuxConfig { + /// Creates a new `YamuxConfig` in client mode, regardless of whether + /// it will be used for an inbound or outbound upgrade. + pub fn client() -> Self { + let mut cfg = Self::default(); + cfg.mode = Some(yamux::Mode::Client); + cfg } - /// Override the connection mode. - /// - /// This will always use the provided mode during the connection upgrade, - /// irrespective of whether an inbound or outbound upgrade happens. - pub fn override_mode(&mut self, mode: yamux::Mode) { - self.mode = Some(mode) + /// Creates a new `YamuxConfig` in server mode, regardless of whether + /// it will be used for an inbound or outbound upgrade. + pub fn server() -> Self { + let mut cfg = Self::default(); + cfg.mode = Some(yamux::Mode::Server); + cfg } - /// Turn this into a [`LocalConfig`] for use with upgrades of ![`Send`] resources. - pub fn local(self) -> LocalConfig { - LocalConfig(self) + /// Sets the size (in bytes) of the receive window per substream. + pub fn set_receive_window_size(&mut self, num_bytes: u32) -> &mut Self { + self.inner.set_receive_window(num_bytes); + self } -} -impl Default for Config { - fn default() -> Self { - Config::new(yamux::Config::default()) + /// Sets the maximum size (in bytes) of the receive buffer per substream. + pub fn set_max_buffer_size(&mut self, num_bytes: usize) -> &mut Self { + self.inner.set_max_buffer_size(num_bytes); + self + } + + /// Sets the maximum number of concurrent substreams. + pub fn set_max_num_streams(&mut self, num_streams: usize) -> &mut Self { + self.inner.set_max_num_streams(num_streams); + self } -} -impl Deref for Config { - type Target = yamux::Config; + /// Sets the window update mode that determines when the remote + /// is given new credit for sending more data. + pub fn set_window_update_mode(&mut self, mode: WindowUpdateMode) -> &mut Self { + self.inner.set_window_update_mode(mode.0); + self + } - fn deref(&self) -> &Self::Target { - &self.config + /// Converts the config into a [`YamuxLocalConfig`] for use with upgrades + /// of I/O streams that are ![`Send`]. + pub fn into_local(self) -> YamuxLocalConfig { + YamuxLocalConfig(self) } } -impl DerefMut for Config { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.config +impl Default for YamuxConfig { + fn default() -> Self { + let mut inner = yamux::Config::default(); + // For conformity with mplex, read-after-close on a multiplexed + // connection is never permitted and not configurable. + inner.set_read_after_close(false); + YamuxConfig { inner, mode: None } } } -impl UpgradeInfo for Config { +impl UpgradeInfo for YamuxConfig { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -222,7 +288,7 @@ impl UpgradeInfo for Config { } } -impl UpgradeInfo for LocalConfig { +impl UpgradeInfo for YamuxLocalConfig { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -231,7 +297,7 @@ impl UpgradeInfo for LocalConfig { } } -impl InboundUpgrade for Config +impl InboundUpgrade for YamuxConfig where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { @@ -240,11 +306,12 @@ where type Future = future::Ready>; fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { - future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Server)))) + let mode = self.mode.unwrap_or(yamux::Mode::Server); + future::ready(Ok(Yamux::new(io, self.inner, mode))) } } -impl InboundUpgrade for LocalConfig +impl InboundUpgrade for YamuxLocalConfig where C: AsyncRead + AsyncWrite + Unpin + 'static { @@ -254,11 +321,12 @@ where fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { let cfg = self.0; - future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Server)))) + let mode = cfg.mode.unwrap_or(yamux::Mode::Server); + future::ready(Ok(Yamux::local(io, cfg.inner, mode))) } } -impl OutboundUpgrade for Config +impl OutboundUpgrade for YamuxConfig where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { @@ -267,11 +335,12 @@ where type Future = future::Ready>; fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { - future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Client)))) + let mode = self.mode.unwrap_or(yamux::Mode::Client); + future::ready(Ok(Yamux::new(io, self.inner, mode))) } } -impl OutboundUpgrade for LocalConfig +impl OutboundUpgrade for YamuxLocalConfig where C: AsyncRead + AsyncWrite + Unpin + 'static { @@ -281,18 +350,22 @@ where fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { let cfg = self.0; - future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Client)))) + let mode = cfg.mode.unwrap_or(yamux::Mode::Client); + future::ready(Ok(Yamux::local(io, cfg.inner, mode))) } } -/// The Yamux [`libp2p_core::StreamMuxer`] error type. +/// The Yamux [`StreamMuxer`] error type. #[derive(Debug, Error)] #[error("yamux error: {0}")] -pub struct YamuxError(#[from] pub yamux::ConnectionError); +pub struct YamuxError(#[from] yamux::ConnectionError); impl Into for YamuxError { fn into(self: YamuxError) -> io::Error { - io::Error::new(io::ErrorKind::Other, self.to_string()) + match self.0 { + yamux::ConnectionError::Io(e) => e, + e => io::Error::new(io::ErrorKind::Other, e) + } } } diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index f46bda4b280..0fad359be44 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -150,7 +150,7 @@ fn build_node() -> (Multiaddr, Swarm) { .authenticate(PlainText2Config { local_public_key: public_key.clone(), }) - .multiplex(yamux::Config::default()) + .multiplex(yamux::YamuxConfig::default()) .boxed(); let peer_id = public_key.clone().into_peer_id(); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index db81ddf4675..d26f74a7d78 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -61,7 +61,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let transport = MemoryTransport::default() .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(yamux::Config::default()) + .multiplex(yamux::YamuxConfig::default()) .boxed(); let local_id = local_public_key.clone().into_peer_id(); diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 4027f2d3564..52056be4801 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -207,7 +207,7 @@ fn mk_transport(muxer: MuxerChoice) -> ( .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(match muxer { MuxerChoice::Yamux => - upgrade::EitherUpgrade::A(yamux::Config::default()), + upgrade::EitherUpgrade::A(yamux::YamuxConfig::default()), MuxerChoice::Mplex => upgrade::EitherUpgrade::B(mplex::MplexConfig::default()), }) diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index c1aafbb6d34..9aa7f093300 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -221,7 +221,7 @@ fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { .nodelay(true) .upgrade(upgrade::Version::V1) .authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(libp2p_yamux::Config::default()) + .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed()) } diff --git a/src/lib.rs b/src/lib.rs index 55a966ee6ae..df1b0ebd2cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -91,7 +91,7 @@ //! let id_keys = Keypair::generate_ed25519(); //! let noise_keys = noise::Keypair::::new().into_authentic(&id_keys).unwrap(); //! let noise = noise::NoiseConfig::xx(noise_keys).into_authenticated(); -//! let yamux = yamux::Config::default(); +//! let yamux = yamux::YamuxConfig::default(); //! let transport = tcp.upgrade(upgrade::Version::V1).authenticate(noise).multiplex(yamux); //! # } //! ``` @@ -302,7 +302,7 @@ pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) Ok(transport .upgrade(core::upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) + .multiplex(core::upgrade::SelectUpgrade::new(yamux::YamuxConfig::default(), mplex::MplexConfig::default())) .timeout(std::time::Duration::from_secs(20)) .boxed()) } @@ -334,7 +334,7 @@ pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreS .and_then(move |socket, _| PnetConfig::new(psk).handshake(socket)) .upgrade(core::upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) + .multiplex(core::upgrade::SelectUpgrade::new(yamux::YamuxConfig::default(), mplex::MplexConfig::default())) .timeout(std::time::Duration::from_secs(20)) .boxed()) }