-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
req-resp: Replace SubstreamSet with FuturesStream #321
Conversation
Signed-off-by: Alexandru Vasile <[email protected]>
Signed-off-by: Alexandru Vasile <[email protected]>
if let Err(error) = self.on_inbound_request(peer, request_id, message).await { | ||
event = self.pending_inbound_requests.next(), if !self.pending_inbound_requests.is_empty() => match event { | ||
Some((peer, request_id, request, substream)) => { | ||
if let Err(error) = self.on_inbound_request(peer, request_id, request, substream).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to check here if substream
points to alive substream over an existing connection, and not process the request if not? May be we can do this check in on_inbound_request
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the connection has been closed, we'll remove the peer-associated data, which will lead to no further polling on the substream:
litep2p/src/protocol/request_response/mod.rs
Lines 302 to 307 in 4bc31d0
/// Connection closed to remote peer. | |
async fn on_connection_closed(&mut self, peer: PeerId) { | |
tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed"); | |
// Remove any pending outbound substreams for this peer. | |
self.pending_outbound.retain(|_, context| context.peer != peer); |
The inbound request is directly rejected as a first step here:
litep2p/src/protocol/request_response/mod.rs
Lines 477 to 489 in 4bc31d0
.active_inbound | |
.remove(&request_id) | |
.ok_or_else(|| { | |
tracing::debug!( | |
target: LOG_TARGET, | |
?peer, | |
protocol = %self.protocol, | |
?request_id, | |
"no active inbound request", | |
); | |
Error::InvalidState | |
})?; |
If the connection has been closed and detected by the substream, we'll return a Substream::Closed
error which will not propagate the message:
litep2p/src/protocol/request_response/mod.rs
Lines 501 to 511 in 4bc31d0
let Ok(request) = request else { | |
tracing::debug!( | |
target: LOG_TARGET, | |
?peer, | |
%protocol, | |
?request_id, | |
?request, | |
"failed to read request from substream", | |
); | |
return Err(Error::InvalidData); | |
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean if we received a request and hold it in pending_inbound_requests
, but the peer gets disconnected before we poll pending_inbound_requests
, we will process the request anyway and only hit the error when try to send the response to a closed substream? Is there an easy way to not process the inbound request (as it may require resources on the client/runtime side etc.) in this case and fail early?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a chat offline: we'll not process the inbound requests because we remove the peer state in on_connection_closed
, have added a comment to make the code easier to follow : 58d5742 🙏
|
||
// remove all pending inbound requests | ||
for (request_id, _) in context.active_inbound { | ||
self.pending_inbound_requests.remove(&(peer, request_id)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with the new logic we will handle the request anyway if the connection is closed, hitting an error when we try to send a response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep exactly, if the connection gets closed, we'll still have the substream in the FuturesUnordered and let them hit an error (either when we send out the request, or read the response) 🙏
Signed-off-by: Alexandru Vasile <[email protected]>
## [0.9.1] - 2025-01-19 This release enhances compatibility between litep2p and libp2p by backporting the latest Yamux updates. Additionally, it includes various improvements and fixes to boost the stability and performance of the WebSocket stream and the multistream-select protocol. ### Changed - yamux: Switch to upstream implementation while keeping the controller API ([#320](#320)) - req-resp: Replace SubstreamSet with FuturesStream ([#321](#321)) - cargo: Bring up to date multiple dependencies ([#324](#324)) - build(deps): bump hickory-proto from 0.24.1 to 0.24.3 ([#323](#323)) - build(deps): bump openssl from 0.10.66 to 0.10.70 ([#322](#322)) ### Fixed - websocket/stream: Fix unexpected EOF on `Poll::Pending` state poisoning ([#327](#327)) - websocket/stream: Avoid memory allocations on flushing ([#325](#325)) - multistream-select: Enforce `io::error` instead of empty protocols ([#318](#318)) - multistream: Do not wait for negotiation in poll_close ([#319](#319)) cc @paritytech/networking --------- Signed-off-by: Alexandru Vasile <[email protected]>
This PR updates litep2p to version 0.9.1. The yamux config is entirely removed to mirror the libp2p yamux upstream version. While at it, I had to bump indexmap and URL as well. ## [0.9.1] - 2025-01-19 This release enhances compatibility between litep2p and libp2p by using the latest Yamux upstream version. Additionally, it includes various improvements and fixes to boost the stability and performance of the WebSocket stream and the multistream-select protocol. ### Changed - yamux: Switch to upstream implementation while keeping the controller API ([#320](paritytech/litep2p#320)) - req-resp: Replace SubstreamSet with FuturesStream ([#321](paritytech/litep2p#321)) - cargo: Bring up to date multiple dependencies ([#324](paritytech/litep2p#324)) - build(deps): bump hickory-proto from 0.24.1 to 0.24.3 ([#323](paritytech/litep2p#323)) - build(deps): bump openssl from 0.10.66 to 0.10.70 ([#322](paritytech/litep2p#322)) ### Fixed - websocket/stream: Fix unexpected EOF on `Poll::Pending` state poisoning ([#327](paritytech/litep2p#327)) - websocket/stream: Avoid memory allocations on flushing ([#325](paritytech/litep2p#325)) - multistream-select: Enforce `io::error` instead of empty protocols ([#318](paritytech/litep2p#318)) - multistream: Do not wait for negotiation in poll_close ([#319](paritytech/litep2p#319)) cc @paritytech/networking --------- Signed-off-by: Alexandru Vasile <[email protected]>
This PR updates litep2p to version 0.9.1. The yamux config is entirely removed to mirror the libp2p yamux upstream version. While at it, I had to bump indexmap and URL as well. ## [0.9.1] - 2025-01-19 This release enhances compatibility between litep2p and libp2p by using the latest Yamux upstream version. Additionally, it includes various improvements and fixes to boost the stability and performance of the WebSocket stream and the multistream-select protocol. ### Changed - yamux: Switch to upstream implementation while keeping the controller API ([#320](paritytech/litep2p#320)) - req-resp: Replace SubstreamSet with FuturesStream ([#321](paritytech/litep2p#321)) - cargo: Bring up to date multiple dependencies ([#324](paritytech/litep2p#324)) - build(deps): bump hickory-proto from 0.24.1 to 0.24.3 ([#323](paritytech/litep2p#323)) - build(deps): bump openssl from 0.10.66 to 0.10.70 ([#322](paritytech/litep2p#322)) ### Fixed - websocket/stream: Fix unexpected EOF on `Poll::Pending` state poisoning ([#327](paritytech/litep2p#327)) - websocket/stream: Avoid memory allocations on flushing ([#325](paritytech/litep2p#325)) - multistream-select: Enforce `io::error` instead of empty protocols ([#318](paritytech/litep2p#318)) - multistream: Do not wait for negotiation in poll_close ([#319](paritytech/litep2p#319)) cc @paritytech/networking --------- Signed-off-by: Alexandru Vasile <[email protected]> (cherry picked from commit 42e9de7)
This PR replaces the in-house built
SubstreamSet
with theFuturesStream
.FuturesUnordered
implementationSubstreams are no longer removed from the
FuturesStream
and instead they are left to expire with theConnectionClosed
event. This has the side-effect of emitting adebug
log and it simplifies state tracking.cc @paritytech/networking