-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
req-resp: Replace SubstreamSet with FuturesStream #321
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -27,8 +27,9 @@ use crate::{ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
request_response::handle::{InnerRequestResponseEvent, RequestResponseCommand}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Direction, TransportEvent, TransportService, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
substream::{Substream, SubstreamSet}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
substream::Substream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
types::{protocol::ProtocolName, RequestId, SubstreamId}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
utils::futures_stream::FuturesStream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PeerId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -162,7 +163,17 @@ pub(crate) struct RequestResponseProtocol { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_inbound: FuturesUnordered<BoxFuture<'static, PendingRequest>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/// Pending inbound requests. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_inbound_requests: SubstreamSet<(PeerId, RequestId), Substream>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_inbound_requests: FuturesStream< | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BoxFuture< | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
'static, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PeerId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
RequestId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Result<BytesMut, SubstreamError>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Substream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/// Pending dials for outbound requests. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_dials: HashMap<PeerId, RequestContext>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -200,7 +211,7 @@ impl RequestResponseProtocol { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_outbound: HashMap::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_inbound: FuturesUnordered::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_outbound_cancels: HashMap::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_inbound_requests: SubstreamSet::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_inbound_requests: FuturesStream::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_outbound_responses: FuturesUnordered::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
max_concurrent_inbound_requests: config.max_concurrent_inbound_request, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -315,11 +326,6 @@ impl RequestResponseProtocol { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// remove all pending inbound requests | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (request_id, _) in context.active_inbound { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.pending_inbound_requests.remove(&(peer, request_id)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/// Local node opened a substream to remote node. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -462,6 +468,7 @@ impl RequestResponseProtocol { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
peer: PeerId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
request_id: RequestId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
request: Result<BytesMut, SubstreamError>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
mut substream: Substream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> crate::Result<()> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let fallback = self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.peers | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -480,18 +487,7 @@ impl RequestResponseProtocol { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Error::InvalidState | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
})?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut substream = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.pending_inbound_requests.remove(&(peer, request_id)).ok_or_else(|| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tracing::debug!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
target: LOG_TARGET, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
?peer, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protocol = %self.protocol, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
?request_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"request doesn't exist in pending requests", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Error::InvalidState | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
})?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let protocol = self.protocol.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tracing::trace!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -589,7 +585,7 @@ impl RequestResponseProtocol { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&mut self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
peer: PeerId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fallback: Option<ProtocolName>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
substream: Substream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
mut substream: Substream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> crate::Result<()> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "handle inbound substream"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -622,7 +618,16 @@ impl RequestResponseProtocol { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.ok_or(Error::PeerDoesntExist(peer))? | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.active_inbound | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.insert(request_id, fallback); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.pending_inbound_requests.insert((peer, request_id), substream); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.pending_inbound_requests.push(Box::pin(async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let request = match substream.next().await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Some(Ok(request)) => Ok(request), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Some(Err(error)) => Err(error), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
None => Err(SubstreamError::ConnectionClosed), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(peer, request_id, request, substream) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
})); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1057,9 +1062,9 @@ impl RequestResponseProtocol { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_ = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Inbound requests that are moved to `pending_outbound_responses`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event = self.pending_inbound_requests.next() => match event { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Some(((peer, request_id), message)) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(error) = self.on_inbound_request(peer, request_id, message).await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event = self.pending_inbound_requests.next(), if !self.pending_inbound_requests.is_empty() => match event { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Some((peer, request_id, request, substream)) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(error) = self.on_inbound_request(peer, request_id, request, substream).await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to check here if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the connection has been closed, we'll remove the peer-associated data, which will lead to no further polling on the substream: litep2p/src/protocol/request_response/mod.rs Lines 302 to 307 in 4bc31d0
The inbound request is directly rejected as a first step here: litep2p/src/protocol/request_response/mod.rs Lines 477 to 489 in 4bc31d0
If the connection has been closed and detected by the substream, we'll return a litep2p/src/protocol/request_response/mod.rs Lines 501 to 511 in 4bc31d0
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i mean if we received a request and hold it in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had a chat offline: we'll not process the inbound requests because we remove the peer state in |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tracing::debug!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
target: LOG_TARGET, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
?peer, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with the new logic we will handle the request anyway if the connection is closed, hitting an error when we try to send a response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep exactly, if the connection gets closed, we'll still have the substream in the FuturesUnordered and let them hit an error (either when we send out the request, or read the response) 🙏