Skip to content

Commit

Permalink
Schedule immediate wake-up instead of looping
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Nov 11, 2022
1 parent b5612dc commit ca139c1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
9 changes: 9 additions & 0 deletions muxers/yamux/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# 0.41.1 [unreleased]

- Yield from `StreamMuxer::poll` as soon as we receive a single substream.
This fixes [issue 3041].
See [PR 3071].

[PR 3071]: https://github.com/libp2p/rust-libp2p/pull/3071/
[issue 3041]: https://github.com/libp2p/rust-libp2p/issues/3041/

# 0.41.0

- Update to `libp2p-core` `v0.37.0`.
Expand Down
2 changes: 1 addition & 1 deletion muxers/yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-yamux"
edition = "2021"
rust-version = "1.56.1"
description = "Yamux multiplexing protocol for libp2p"
version = "0.41.0"
version = "0.41.1"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
28 changes: 20 additions & 8 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use futures::{
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::collections::VecDeque;
use std::task::Waker;
use std::{
fmt, io, iter, mem,
pin::Pin,
Expand All @@ -54,6 +55,8 @@ pub struct Yamux<S> {
/// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called.
/// Once the buffer is full, new inbound streams are dropped.
inbound_stream_buffer: VecDeque<yamux::Stream>,
/// Waker to be called when new inbound streams are available.
inbound_stream_waker: Option<Waker>,
}

const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;
Expand All @@ -80,6 +83,7 @@ where
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None,
}
}
}
Expand All @@ -100,6 +104,7 @@ where
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None,
}
}
}
Expand All @@ -121,6 +126,8 @@ where
return Poll::Ready(Ok(stream));
}

self.inbound_stream_waker = Some(cx.waker().clone());

self.poll_inner(cx)
}

Expand All @@ -139,17 +146,22 @@ where
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
let this = self.get_mut();

loop {
let inbound_stream = ready!(this.poll_inner(cx))?;

if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
log::warn!("dropping {inbound_stream} because buffer is full");
drop(inbound_stream);
continue;
}
let inbound_stream = ready!(this.poll_inner(cx))?;

if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
log::warn!("dropping {inbound_stream} because buffer is full");
drop(inbound_stream);
} else {
this.inbound_stream_buffer.push_back(inbound_stream);

if let Some(waker) = this.inbound_stream_waker.take() {
waker.wake()
}
}

// Schedule an immediate wake-up, allowing other code to run.
cx.waker().wake_by_ref();
Poll::Pending
}

fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll<YamuxResult<()>> {
Expand Down

0 comments on commit ca139c1

Please sign in to comment.