Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

req-resp: Replace SubstreamSet with FuturesStream #321

Merged
merged 4 commits into from
Feb 13, 2025

Conversation

lexnv
Copy link
Collaborator

@lexnv lexnv commented Feb 3, 2025

This PR replaces the in-house built SubstreamSet with the FuturesStream.

  • provide fairness for substreams that produce elements based on the FuturesUnordered implementation
  • clone free implementation for peerId and requested
  • substream object is readily available once the substream produces a request (ie there's no need to pay for the hashmap element retrieval)

Substreams are no longer removed from the FuturesStream and instead they are left to expire with the ConnectionClosed event. This has the side-effect of emitting a debug log and it simplifies state tracking.

cc @paritytech/networking

@lexnv lexnv self-assigned this Feb 3, 2025
if let Err(error) = self.on_inbound_request(peer, request_id, message).await {
event = self.pending_inbound_requests.next(), if !self.pending_inbound_requests.is_empty() => match event {
Some((peer, request_id, request, substream)) => {
if let Err(error) = self.on_inbound_request(peer, request_id, request, substream).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to check here if substream points to alive substream over an existing connection, and not process the request if not? May be we can do this check in on_inbound_request.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the connection has been closed, we'll remove the peer-associated data, which will lead to no further polling on the substream:

/// Connection closed to remote peer.
async fn on_connection_closed(&mut self, peer: PeerId) {
tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
// Remove any pending outbound substreams for this peer.
self.pending_outbound.retain(|_, context| context.peer != peer);

The inbound request is directly rejected as a first step here:

.active_inbound
.remove(&request_id)
.ok_or_else(|| {
tracing::debug!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol,
?request_id,
"no active inbound request",
);
Error::InvalidState
})?;

If the connection has been closed and detected by the substream, we'll return a Substream::Closed error which will not propagate the message:

let Ok(request) = request else {
tracing::debug!(
target: LOG_TARGET,
?peer,
%protocol,
?request_id,
?request,
"failed to read request from substream",
);
return Err(Error::InvalidData);
};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean if we received a request and hold it in pending_inbound_requests, but the peer gets disconnected before we poll pending_inbound_requests, we will process the request anyway and only hit the error when try to send the response to a closed substream? Is there an easy way to not process the inbound request (as it may require resources on the client/runtime side etc.) in this case and fail early?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a chat offline: we'll not process the inbound requests because we remove the peer state in on_connection_closed, have added a comment to make the code easier to follow : 58d5742 🙏

Comment on lines -318 to -322

// remove all pending inbound requests
for (request_id, _) in context.active_inbound {
self.pending_inbound_requests.remove(&(peer, request_id));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with the new logic we will handle the request anyway if the connection is closed, hitting an error when we try to send a response?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep exactly, if the connection gets closed, we'll still have the substream in the FuturesUnordered and let them hit an error (either when we send out the request, or read the response) 🙏

@lexnv lexnv merged commit dde6161 into master Feb 13, 2025
8 checks passed
@lexnv lexnv deleted the lexnv/req-resp-improvement branch February 13, 2025 10:49
lexnv added a commit that referenced this pull request Feb 20, 2025
## [0.9.1] - 2025-01-19

This release enhances compatibility between litep2p and libp2p by
backporting the latest Yamux updates. Additionally, it includes various
improvements and fixes to boost the stability and performance of the
WebSocket stream and the multistream-select protocol.

### Changed

- yamux: Switch to upstream implementation while keeping the controller
API ([#320](#320))
- req-resp: Replace SubstreamSet with FuturesStream
([#321](#321))
- cargo: Bring up to date multiple dependencies
([#324](#324))
- build(deps): bump hickory-proto from 0.24.1 to 0.24.3
([#323](#323))
- build(deps): bump openssl from 0.10.66 to 0.10.70
([#322](#322))

### Fixed

- websocket/stream: Fix unexpected EOF on `Poll::Pending` state
poisoning ([#327](#327))
- websocket/stream: Avoid memory allocations on flushing
([#325](#325))
- multistream-select: Enforce `io::error` instead of empty protocols
([#318](#318))
- multistream: Do not wait for negotiation in poll_close
([#319](#319))

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
github-merge-queue bot pushed a commit to paritytech/polkadot-sdk that referenced this pull request Feb 20, 2025
This PR updates litep2p to version 0.9.1. The yamux config is entirely
removed to mirror the libp2p yamux upstream version.
While at it, I had to bump indexmap and URL as well. 


## [0.9.1] - 2025-01-19

This release enhances compatibility between litep2p and libp2p by using
the latest Yamux upstream version. Additionally, it includes various
improvements and fixes to boost the stability and performance of the
WebSocket stream and the multistream-select protocol.

### Changed

- yamux: Switch to upstream implementation while keeping the controller
API ([#320](paritytech/litep2p#320))
- req-resp: Replace SubstreamSet with FuturesStream
([#321](paritytech/litep2p#321))
- cargo: Bring up to date multiple dependencies
([#324](paritytech/litep2p#324))
- build(deps): bump hickory-proto from 0.24.1 to 0.24.3
([#323](paritytech/litep2p#323))
- build(deps): bump openssl from 0.10.66 to 0.10.70
([#322](paritytech/litep2p#322))

### Fixed

- websocket/stream: Fix unexpected EOF on `Poll::Pending` state
poisoning ([#327](paritytech/litep2p#327))
- websocket/stream: Avoid memory allocations on flushing
([#325](paritytech/litep2p#325))
- multistream-select: Enforce `io::error` instead of empty protocols
([#318](paritytech/litep2p#318))
- multistream: Do not wait for negotiation in poll_close
([#319](paritytech/litep2p#319))

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
github-actions bot pushed a commit to paritytech/polkadot-sdk that referenced this pull request Feb 20, 2025
This PR updates litep2p to version 0.9.1. The yamux config is entirely
removed to mirror the libp2p yamux upstream version.
While at it, I had to bump indexmap and URL as well.

## [0.9.1] - 2025-01-19

This release enhances compatibility between litep2p and libp2p by using
the latest Yamux upstream version. Additionally, it includes various
improvements and fixes to boost the stability and performance of the
WebSocket stream and the multistream-select protocol.

### Changed

- yamux: Switch to upstream implementation while keeping the controller
API ([#320](paritytech/litep2p#320))
- req-resp: Replace SubstreamSet with FuturesStream
([#321](paritytech/litep2p#321))
- cargo: Bring up to date multiple dependencies
([#324](paritytech/litep2p#324))
- build(deps): bump hickory-proto from 0.24.1 to 0.24.3
([#323](paritytech/litep2p#323))
- build(deps): bump openssl from 0.10.66 to 0.10.70
([#322](paritytech/litep2p#322))

### Fixed

- websocket/stream: Fix unexpected EOF on `Poll::Pending` state
poisoning ([#327](paritytech/litep2p#327))
- websocket/stream: Avoid memory allocations on flushing
([#325](paritytech/litep2p#325))
- multistream-select: Enforce `io::error` instead of empty protocols
([#318](paritytech/litep2p#318))
- multistream: Do not wait for negotiation in poll_close
([#319](paritytech/litep2p#319))

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
(cherry picked from commit 42e9de7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants