Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

req-resp: Replace SubstreamSet with FuturesStream #321

Merged
merged 4 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions src/protocol/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use crate::{
request_response::handle::{InnerRequestResponseEvent, RequestResponseCommand},
Direction, TransportEvent, TransportService,
},
substream::{Substream, SubstreamSet},
substream::Substream,
types::{protocol::ProtocolName, RequestId, SubstreamId},
utils::futures_stream::FuturesStream,
PeerId,
};

Expand Down Expand Up @@ -162,7 +163,17 @@ pub(crate) struct RequestResponseProtocol {
pending_inbound: FuturesUnordered<BoxFuture<'static, PendingRequest>>,

/// Pending inbound requests.
pending_inbound_requests: SubstreamSet<(PeerId, RequestId), Substream>,
pending_inbound_requests: FuturesStream<
BoxFuture<
'static,
(
PeerId,
RequestId,
Result<BytesMut, SubstreamError>,
Substream,
),
>,
>,

/// Pending dials for outbound requests.
pending_dials: HashMap<PeerId, RequestContext>,
Expand Down Expand Up @@ -200,7 +211,7 @@ impl RequestResponseProtocol {
pending_outbound: HashMap::new(),
pending_inbound: FuturesUnordered::new(),
pending_outbound_cancels: HashMap::new(),
pending_inbound_requests: SubstreamSet::new(),
pending_inbound_requests: FuturesStream::new(),
pending_outbound_responses: FuturesUnordered::new(),
max_concurrent_inbound_requests: config.max_concurrent_inbound_request,
}
Expand Down Expand Up @@ -315,11 +326,6 @@ impl RequestResponseProtocol {
})
.await;
}

// remove all pending inbound requests
for (request_id, _) in context.active_inbound {
self.pending_inbound_requests.remove(&(peer, request_id));
}
Comment on lines -318 to -322
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with the new logic we will handle the request anyway if the connection is closed, hitting an error when we try to send a response?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep exactly, if the connection gets closed, we'll still have the substream in the FuturesUnordered and let them hit an error (either when we send out the request, or read the response) 🙏

}

/// Local node opened a substream to remote node.
Expand Down Expand Up @@ -462,6 +468,7 @@ impl RequestResponseProtocol {
peer: PeerId,
request_id: RequestId,
request: Result<BytesMut, SubstreamError>,
mut substream: Substream,
) -> crate::Result<()> {
let fallback = self
.peers
Expand All @@ -480,18 +487,7 @@ impl RequestResponseProtocol {

Error::InvalidState
})?;
let mut substream =
self.pending_inbound_requests.remove(&(peer, request_id)).ok_or_else(|| {
tracing::debug!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol,
?request_id,
"request doesn't exist in pending requests",
);

Error::InvalidState
})?;
let protocol = self.protocol.clone();

tracing::trace!(
Expand Down Expand Up @@ -589,7 +585,7 @@ impl RequestResponseProtocol {
&mut self,
peer: PeerId,
fallback: Option<ProtocolName>,
substream: Substream,
mut substream: Substream,
) -> crate::Result<()> {
tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "handle inbound substream");

Expand Down Expand Up @@ -622,7 +618,16 @@ impl RequestResponseProtocol {
.ok_or(Error::PeerDoesntExist(peer))?
.active_inbound
.insert(request_id, fallback);
self.pending_inbound_requests.insert((peer, request_id), substream);

self.pending_inbound_requests.push(Box::pin(async move {
let request = match substream.next().await {
Some(Ok(request)) => Ok(request),
Some(Err(error)) => Err(error),
None => Err(SubstreamError::ConnectionClosed),
};

(peer, request_id, request, substream)
}));

Ok(())
}
Expand Down Expand Up @@ -1057,9 +1062,9 @@ impl RequestResponseProtocol {
_ = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => {}

// Inbound requests that are moved to `pending_outbound_responses`.
event = self.pending_inbound_requests.next() => match event {
Some(((peer, request_id), message)) => {
if let Err(error) = self.on_inbound_request(peer, request_id, message).await {
event = self.pending_inbound_requests.next(), if !self.pending_inbound_requests.is_empty() => match event {
Some((peer, request_id, request, substream)) => {
if let Err(error) = self.on_inbound_request(peer, request_id, request, substream).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to check here if substream points to alive substream over an existing connection, and not process the request if not? May be we can do this check in on_inbound_request.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the connection has been closed, we'll remove the peer-associated data, which will lead to no further polling on the substream:

/// Connection closed to remote peer.
async fn on_connection_closed(&mut self, peer: PeerId) {
tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
// Remove any pending outbound substreams for this peer.
self.pending_outbound.retain(|_, context| context.peer != peer);

The inbound request is directly rejected as a first step here:

.active_inbound
.remove(&request_id)
.ok_or_else(|| {
tracing::debug!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol,
?request_id,
"no active inbound request",
);
Error::InvalidState
})?;

If the connection has been closed and detected by the substream, we'll return a Substream::Closed error which will not propagate the message:

let Ok(request) = request else {
tracing::debug!(
target: LOG_TARGET,
?peer,
%protocol,
?request_id,
?request,
"failed to read request from substream",
);
return Err(Error::InvalidData);
};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean if we received a request and hold it in pending_inbound_requests, but the peer gets disconnected before we poll pending_inbound_requests, we will process the request anyway and only hit the error when try to send the response to a closed substream? Is there an easy way to not process the inbound request (as it may require resources on the client/runtime side etc.) in this case and fail early?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a chat offline: we'll not process the inbound requests because we remove the peer state in on_connection_closed, have added a comment to make the code easier to follow : 58d5742 🙏

tracing::debug!(
target: LOG_TARGET,
?peer,
Expand Down
17 changes: 5 additions & 12 deletions src/protocol/request_response/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,12 @@ async fn inbound_substream_error() {
.await
.unwrap();

// verify the request has been registered for the peer
let request_id = *protocol.peers.get(&peer).unwrap().active_inbound.keys().next().unwrap();
assert!(protocol.pending_inbound_requests.get_mut(&(peer, request_id)).is_some());

// poll the substream and get the failure event
let ((peer, request_id), event) = protocol.pending_inbound_requests.next().await.unwrap();
assert_eq!(protocol.pending_inbound_requests.len(), 1);
let (peer, request_id, event, substream) =
protocol.pending_inbound_requests.next().await.unwrap();

match protocol.on_inbound_request(peer, request_id, event).await {
match protocol.on_inbound_request(peer, request_id, event, substream).await {
Err(Error::InvalidData) => {}
_ => panic!("invalid return value"),
}
Expand Down Expand Up @@ -241,9 +239,7 @@ async fn disconnect_peer_has_active_inbound_substream() {
.await
.unwrap();

// verify the request has been registered for the peer
let request_id = *protocol.peers.get(&peer).unwrap().active_inbound.keys().next().unwrap();
assert!(protocol.pending_inbound_requests.get_mut(&(peer, request_id)).is_some());
assert_eq!(protocol.pending_inbound_requests.len(), 1);

// disconnect the peer and verify that no events are read from the handle
// since no outbound request was initiated
Expand All @@ -254,9 +250,6 @@ async fn disconnect_peer_has_active_inbound_substream() {
event => panic!("read an unexpected event from handle: {event:?}"),
})
.await;

// verify the substream has been removed from `pending_inbound_requests`
assert!(protocol.pending_inbound_requests.get_mut(&(peer, request_id)).is_none());
}

// when user initiates an outbound request and `RequestResponseProtocol` tries to open an outbound
Expand Down
1 change: 0 additions & 1 deletion src/utils/futures_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl<F> FuturesStream<F> {
}

/// Number of futures in the stream.
#[cfg(test)]
pub fn len(&self) -> usize {
self.futures.len()
}
Expand Down
Loading