Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mplex stream termination issue #1814

Closed
AgeManning opened this issue Oct 24, 2020 · 12 comments
Closed

Mplex stream termination issue #1814

AgeManning opened this issue Oct 24, 2020 · 12 comments

Comments

@AgeManning
Copy link
Contributor

AgeManning commented Oct 24, 2020

We recently updated to the latest master of rust-libp2p.

After the update, it seems mplex no longer functions as we expect. Without changing any of the application logic, we see the error:

IO Error: bytes remaining on stream

This appears to be an mplex issue as this doesn't occur when switching to yamux. It appears from looking at the commit history that the main change to mplex from our working commit to now is: #1784 and the less substantial: #1785

We are yet to pinpoint the exact issue, but can share current logs we have at the moment for our protocol.

The protocol we are using is an RPC protocol with a single short-lived stream for a request/response. A requester negotiates and opens a stream, sends a request and awaits a response over that stream.

It appears the receiving side is now closing the substream (early) after receiving the request and requester now drops the data with "unknown substream".

Requester logs:

[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Received Data { stream_id: RemoteStreamId { num: 1, role: Listener }, data: b";/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy\n" }
[2020-10-23T19:38:53Z TRACE multistream_select::protocol] Received message: Protocol(Protocol(b"/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy"))
[2020-10-23T19:38:53Z DEBUG multistream_select::dialer_select] Dialer: Received confirmation for protocol: /eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Sending Data { stream_id: LocalStreamId { num: 1, role: Dialer }, data: b"\x18\xff\x06\0\0sNaPpY\x01\x1c\0\0\x856JJ\0\0\0\0\0\0\0\0\n\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0" }
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (1/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (1/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (1/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (1/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Sending Close { stream_id: LocalStreamId { num: 1, role: Dialer } }
[2020-10-23T19:38:53Z DEBUG libp2p_mplex::io] ddd59255bfc8a742: Closed substream (1/initiator) (half-close)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (1/initiator)
[2020-10-23T19:38:53Z DEBUG libp2p_core::upgrade::apply] Successfully applied negotiated protocol
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Sending Data { stream_id: LocalStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Sending Close { stream_id: LocalStreamId { num: 0, role: Listener } }
[2020-10-23T19:38:53Z DEBUG libp2p_mplex::io] ddd59255bfc8a742: Closed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Received Data { stream_id: RemoteStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Buffering b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" for stream (0/initiator) (total: 1)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (0/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (0/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Flushed substream (0/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Received Close { stream_id: RemoteStreamId { num: 0, role: Listener } }
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] ddd59255bfc8a742: Ignoring `Close` for unknown substream (0/initiator). Possibly dropped earlier.

Reciever Logs

[2020-10-23T19:38:53Z TRACE multistream_select::protocol] Received message: Protocol(Protocol(b"/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy"))
[2020-10-23T19:38:53Z DEBUG multistream_select::listener_select] Listener: confirming protocol: /eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Sending Data { stream_id: LocalStreamId { num: 1, role: Listener }, data: b";/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy\n" }
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (1/receiver)
[2020-10-23T19:38:53Z DEBUG multistream_select::listener_select] Listener: sent confirmed protocol: /eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Received Close { stream_id: RemoteStreamId { num: 0, role: Dialer } }
[2020-10-23T19:38:53Z DEBUG libp2p_mplex::io] d46b5a239ea3dccb: Substream (0/receiver) closed by remote (Open -> RecvClosed)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Received Data { stream_id: RemoteStreamId { num: 1, role: Dialer }, data: b"\x18\xff\x06\0\0sNaPpY\x01\x1c\0\0\x856JJ\0\0\0\0\0\0\0\0\n\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0" }
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Buffering b"\x18\xff\x06\0\0sNaPpY\x01\x1c\0\0\x856JJ\0\0\0\0\0\0\0\0\n\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0" for stream (1/receiver) (total: 1)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Received Close { stream_id: RemoteStreamId { num: 1, role: Dialer } }
[2020-10-23T19:38:53Z DEBUG libp2p_mplex::io] d46b5a239ea3dccb: Substream (1/receiver) closed by remote (Open -> RecvClosed)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Received Data { stream_id: RemoteStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Buffering b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" for stream (0/initiator) (total: 1)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Received Close { stream_id: RemoteStreamId { num: 0, role: Listener } }
[2020-10-23T19:38:53Z DEBUG libp2p_mplex::io] d46b5a239ea3dccb: Substream (0/initiator) closed by remote (SendClosed -> Closed).
[2020-10-23T19:38:53Z DEBUG libp2p_core::upgrade::apply] Successfully applied negotiated protocol
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Sending Data { stream_id: LocalStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
Oct 24 01:08:53.858 INFO Receiver got request, req: BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: 10, step: 1 })
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/receiver)
Oct 24 01:08:53.858 INFO [2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Sending Close { stream_id: LocalStreamId { num: 0, role: Listener } }
[2020-10-23T19:38:53Z DEBUG libp2p_mplex::io] d46b5a239ea3dccb: Closed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/receiver)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/initiator)
[2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/initiator)

To give a comparison to a working version:
Requester:

2020-10-23T19:36:41Z TRACE multistream_select::protocol] Received message: Protocol(Protocol(b"/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy"))
[2020-10-23T19:36:41Z DEBUG multistream_select::dialer_select] Dialer: Received confirmation for protocol: /eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 1, role: Dialer }, data: b"\x18\xff\x06\0\0sNaPpY\x01\x1c\0\0\x856JJ\0\0\0\0\0\0\0\0\n\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0" }
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Sending Close { stream_id: LocalStreamId { num: 1, role: Dialer } }
[2020-10-23T19:36:41Z DEBUG libp2p_mplex::io] Closed substream (1/initiator) (half-close)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-23T19:36:41Z DEBUG libp2p_core::upgrade::apply] Successfully applied negotiated protocol
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Sending Close { stream_id: LocalStreamId { num: 0, role: Listener } }
[2020-10-23T19:36:41Z DEBUG libp2p_mplex::io] Closed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Received Data { stream_id: RemoteStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Buffering Data { stream_id: RemoteStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" } (total: 1)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Received Close { stream_id: RemoteStreamId { num: 0, role: Listener } }
[2020-10-23T19:36:41Z DEBUG libp2p_mplex::io] Substream (0/initiator) closed by remote (SendClosed -> Closed).
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/initiator)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/initiator)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/initiator)

working receiver:

[2020-10-23T19:36:41Z TRACE multistream_select::protocol] Received message: Protocol(Protocol(b"/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy"))
[2020-10-23T19:36:41Z DEBUG multistream_select::listener_select] Listener: confirming protocol: /eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 1, role: Listener }, data: b";/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy\n" }
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (1/receiver)
[2020-10-23T19:36:41Z DEBUG multistream_select::listener_select] Listener: sent confirmed protocol: /eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Sending Close { stream_id: LocalStreamId { num: 0, role: Listener } }
[2020-10-23T19:36:41Z DEBUG libp2p_mplex::io] Closed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Flushed substream (0/receiver)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Received Data { stream_id: RemoteStreamId { num: 1, role: Dialer }, data: b"\x18\xff\x06\0\0sNaPpY\x01\x1c\0\0\x856JJ\0\0\0\0\0\0\0\0\n\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0" }
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Buffering Data { stream_id: RemoteStreamId { num: 1, role: Dialer }, data: b"\x18\xff\x06\0\0sNaPpY\x01\x1c\0\0\x856JJ\0\0\0\0\0\0\0\0\n\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0" } (total: 1)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Received Close { stream_id: RemoteStreamId { num: 1, role: Dialer } }
[2020-10-23T19:36:41Z DEBUG libp2p_mplex::io] Substream (1/receiver) closed by remote (Open -> RecvClosed)
[2020-10-23T19:36:41Z DEBUG libp2p_core::upgrade::apply] Successfully applied negotiated protocol
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Received Data { stream_id: RemoteStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Buffering Data { stream_id: RemoteStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" } (total: 1)
[2020-10-23T19:36:41Z TRACE libp2p_mplex::io] Received Close { stream_id: RemoteStreamId { num: 0, role: Listener } }
[2020-10-23T19:36:41Z DEBUG libp2p_mplex::io] Substream (0/initiator) closed by remote (SendClosed -> Closed).

cc @pawanjay176 @romanb

@AgeManning AgeManning changed the title Mplex backwards compatibility broken Mplex stream termination issue Oct 24, 2020
@romanb
Copy link
Contributor

romanb commented Oct 26, 2020

The protocol we are using is an RPC protocol with a single short-lived stream for a request/response. A requester negotiates and opens a stream, sends a request and awaits a response over that stream.

You mention the error "IO Error: bytes remaining on stream", but that is not included in the logs. Could you share or point me to the code you are using for this request/response protocol?

It appears from looking at the commit history that the main change to mplex from our working commit to now is: #1784 and the less substantial: #1785

Does that mean the problem did not occur after #1769 but only after #1784 and / or #1785?

It appears the receiving side is now closing the substream (early) after receiving the request [..]

Could you point me to where you see that in the logs?

[..] and requester now drops the data with "unknown substream".

Could you also point me to where you see data being dropped in the logs? This line

ddd59255bfc8a742: Ignoring `Close` for unknown substream (0/initiator). Possibly dropped earlier.

only says that a received Close frame has been ignored because the substream has already been dropped locally. That can be a normal occurrence if a substream is already in SendClosed and the application is just waiting for a reply. If, having read the reply, the substream is dropped, that may happen before the final Close from the remote is received, leading to this message, in which case it is insignificant. When looking at only the logs for the substream 0/initiator ("Requester") and 0/receiver ("Receiver"), I see this on the "Requester"

...
The logs don't show the request sent on this substream
...
ddd59255bfc8a742: Received Data { stream_id: RemoteStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
ddd59255bfc8a742: Buffering b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" for stream (0/initiator) (total: 1)
ddd59255bfc8a742: Flushed substream (0/initiator)
... The buffered data is likely read by the application and the substream dropped ...
ddd59255bfc8a742: Received Close { stream_id: RemoteStreamId { num: 0, role: Listener } }
ddd59255bfc8a742: Ignoring `Close` for unknown substream (0/initiator). Possibly dropped earlier.

and this on the "Receiver"

...
The logs don't show the request sent by the "Requester" on this substream.
...
d46b5a239ea3dccb: Received Close { stream_id: RemoteStreamId { num: 0, role: Dialer } }
d46b5a239ea3dccb: Substream (0/receiver) closed by remote (Open -> RecvClosed)
d46b5a239ea3dccb: Sending Data { stream_id: LocalStreamId { num: 0, role: Listener }, data: b"\0\x10\xff\x06\0\0sNaPpY\x01\x14\0\0\xb9oW\xd8\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" }
d46b5a239ea3dccb: Flushed substream (0/receiver)
d46b5a239ea3dccb: Sending Close { stream_id: LocalStreamId { num: 0, role: Listener } }
d46b5a239ea3dccb: Closed substream (0/receiver)
d46b5a239ea3dccb: Flushed substream (0/receiver)

which doesn't look overly suspicious on first sight. In particular, there are logs from the following 3 substreams in your excerpts, one of which was actually initiated by the "Receiver" node:

  1. 0/initiator (Requester) - 0/receiver (Receiver)
  2. 0/receiver (Requester) - 0/initiator (Receiver)
  3. 1/initiator (Requester) - 1/receiver (Receiver)

I'm not sure which of these you are referring to w.r.t. the problem description. The logs don't seem to contain a full request-response exchange for any of these. Furthermore, in the "Receiver" logs there are these lines interleaved:

Oct 24 01:08:53.858 INFO Receiver got request, req: BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: 10, step: 1 })
...
Oct 24 01:08:53.858 INFO [2020-10-23T19:38:53Z TRACE libp2p_mplex::io] d46b5a239ea3dccb: Flushed substream (0/receiver)

These are prefix-dated Oct 24th, and I'm not sure what to make of these at the moment.

Thanks for your report and your help in looking into this, I'm just not yet able to see the problem clearly. I'm still hoping for clearer logs and more insight into the code you're using to better diagnose the problem.

@AgeManning
Copy link
Contributor Author

Hey @romanb. Thanks for the quick reply. I've been meaning to get back to you earlier but wanted more substantial info to help diagnose this. I saw you sorted out sigp/lighthouse#1832 thanks! I thought that may be related, but it occurs on an earlier libp2p-version than the issue here.

You mention the error "IO Error: bytes remaining on stream", but that is not included in the logs. Could you share or point me to the code you are using for this request/response protocol?

I wanted to spare you the RPC code and ideally get us to do the leg-work to isolate it further. But the RPC code is here: https://github.com/sigp/lighthouse/tree/master/beacon_node/eth2_libp2p/src/rpc

The important parts would be establishing a stream: https://github.com/sigp/lighthouse/blob/master/beacon_node/eth2_libp2p/src/rpc/protocol.rs#L291

And then the various stream management (which is somewhat complex): https://github.com/sigp/lighthouse/blob/master/beacon_node/eth2_libp2p/src/rpc/handler.rs#L628 (this is the inbound, but equivalent outbound is in there).

With all the RPC logic aside, the fundamental error occurs from polling the substream: https://github.com/sigp/lighthouse/blob/master/beacon_node/eth2_libp2p/src/rpc/handler.rs#L808

This, should give the IO Error I quoted. The underlying substream is a framed timeout substream but I dont think all of this complexity is too relevant.

The logs I posted were given by @pawanjay176 (perhaps his system clock is out?). I've been meaning to set up an isolated reproduceable test that can give you the full logs (and that you can run also) but am yet to have the time. @pawanjay176 however has done some further investigation and as the following thoughts:

  1. Sender sends an RPC request on substream id 1 where sender is initiator.
  2. Sender closes substream id 1 (SendClosed)
  3. Receiver sends the identify protocol request on the same substream id 1 where receiver is the initiator.
  4. Receiver closes substream id 1 (SendClosed for receiver)
  5. In the old libp2p code, the sender receives this close from sender as Received Close { stream_id: RemoteStreamId { num: 1, role: Dialer } }, then closes the identify substream
  6. In the new libp2p code, the sender does not receive this close message and ends up closing the RPC request stream (which has the same id)
  7. Now when receiver sends the RPC response, that stream has been dropped so all data gets dropped

I believe @pawanjay176 may have more to add when he is online next. As soon as I get the time, I'll also try and build some isolated tests for this. Perhaps this extra info may help a little in the meantime.

@pawanjay176
Copy link
Contributor

@romanb sorry for the late reply here. I'll try to describe the setup for which I got the above logs on top of which @AgeManning has already described.

We have 2 nodes running with a custom rpc behaviour + libp2p identify behaviour. Both nodes exchange a rpc metadata request followed by Node1 sending a blocks_by_range rpc request. After that, both nodes exchange the libp2p identify protocol messages which is interleaved with the blocks_by_range messages. Node2 is expected to respond to the blocks_by_range rpc request and the program ends correctly upon receiving the correct response.

From the logs, it looks like with the newer version of libp2p, mplex on Node1 somehow closes the stream before receiving the response from Node2 and hence ends up dropping the entire response.

Full logs for Node1 and Node2 for with the older version of libp2p when it was working as expected (commit: 8cec457)
Full logs for newer version where it stopped working (commit: b766d45). I'm running the same code in both cases except the libp2p version.

Below is the summary of the issue with snippets from the logs from Node1 for the failing case.

  1. Node1 sends an RPC request on substream id 1 where Node1 is initiator.
[2020-10-28T18:39:13Z DEBUG libp2p_mplex::io] New outbound substream: (1/initiator) (total 3)
[2020-10-28T18:39:13Z DEBUG multistream_select::dialer_select] Dialer: Proposed protocol: /eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 1, role: Dialer }, data: b"\x13/multistream/1.0.0\n;/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy\n" }
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Received Data { stream_id: RemoteStreamId { num: 1, role: Listener }, data: b"\x13/multistream/1.0.0\n" }
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Buffering b"\x13/multistream/1.0.0\n" for stream (1/initiator) (total: 1)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Received Data { stream_id: RemoteStreamId { num: 1, role: Listener }, data: b";/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy\n" }
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Buffering b";/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy\n" for stream (1/initiator) (total: 1)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 1, role: Dialer }, data: b"\x18\xff\x06\0\0sNaPpY\x01\x1c\0\0v\xde<\x04\0\0\0\0\0\0\0\0\x05\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0" }
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
  1. Node1 closes substream id 1 (SendClosed)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Sending Close { stream_id: LocalStreamId { num: 1, role: Dialer } }
[2020-10-28T18:39:13Z DEBUG libp2p_mplex::io] Closed substream (1/initiator) (half-close)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/initiator)
  1. Node1 receives the libp2p identify protocol request on the same substream id 1 (now Node2 is the initiator).
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Received Open { stream_id: RemoteStreamId { num: 1, role: Dialer } }
[2020-10-28T18:39:13Z DEBUG libp2p_mplex::io] New inbound substream: (1/receiver) (total 3)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Received Data { stream_id: RemoteStreamId { num: 1, role: Dialer }, data: b"\x13/multistream/1.0.0\n\x0f/ipfs/id/1.0.0\n" }
[2020-10-28T18:39:13Z TRACE multistream_select::protocol] Received message: Header(V1)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 1, role: Listener }, data: b"\x13/multistream/1.0.0\n" }
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/receiver)
[2020-10-28T18:39:13Z TRACE multistream_select::protocol] Received message: Protocol(Protocol(b"/ipfs/id/1.0.0"))
[2020-10-28T18:39:13Z DEBUG multistream_select::listener_select] Listener: confirming protocol: /ipfs/id/1.0.0
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 1, role: Listener }, data: b"\x0f/ipfs/id/1.0.0\n" }
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/receiver)
[2020-10-28T18:39:13Z DEBUG multistream_select::listener_select] Listener: sent confirmed protocol: /ipfs/id/1.0.0
[2020-10-28T18:39:13Z DEBUG libp2p_core::upgrade::apply] Successfully applied negotiated protocol
  1. Node1 sends the response on substream id 1 where he's the listener
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Received Close { stream_id: RemoteStreamId { num: 1, role: Dialer } }
[2020-10-28T18:39:13Z DEBUG libp2p_mplex::io] Substream (1/receiver) closed by remote (Open -> RecvClosed)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 1, role: Listener }, data: b"\x99\x04" }
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Sending Data { stream_id: LocalStreamId { num: 1, role: Listener }, data: 'some libp2p identify data'
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/receiver)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/receiver)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Sending Close { stream_id: LocalStreamId { num: 1, role: Listener } }
[2020-10-28T18:39:13Z DEBUG libp2p_mplex::io] Closed substream (1/receiver)
[2020-10-28T18:39:13Z TRACE libp2p_mplex::io] Flushed substream (1/receiver)
  1. Node1 receives data for the originally negotiated /eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy protocol, but drops it as the stream is closed
[2020-10-28T18:39:23Z TRACE libp2p_mplex::io] Received Data { stream_id: RemoteStreamId { num: 1, role: Listener }, data: b"\0\x94\x03\xff\x06\0\0sNaPpY\0'\0\0!\xe88l\x94\x03\x04d\0\xfe\x01\0\xfe\x01\0\xc6\x01\0\0T\xfe\xb3\0\xfe\xb3\0\xc6\xb3\0b\x01\0\x08\xdc\0\0B\x04\0" }
[2020-10-28T18:39:23Z TRACE libp2p_mplex::io] Dropping data b"\0\x94\x03\xff\x06\0\0sNaPpY\0'\0\0!\xe88l\x94\x03\x04d\0\xfe\x01\0\xfe\x01\0\xc6\x01\0\0T\xfe\xb3\0\xfe\xb3\0\xc6\xb3\0b\x01\0\x08\xdc\0\0B\x04\0" for unknown substream (1/initiator)
[2020-10-28T18:39:23Z TRACE libp2p_mplex::io] Received Close { stream_id: RemoteStreamId { num: 1, role: Listener } }
[2020-10-28T18:39:23Z TRACE libp2p_mplex::io] Ignoring `Close` for unknown substream (1/initiator). Possibly dropped earlier.

My guess is that Node1 mplex closes the substream for blocks_by_range when trying to close the substream for the libp2p identify (since both of them have the same substream id 1).

@romanb
Copy link
Contributor

romanb commented Oct 29, 2020

Thanks for the further details.

Node1 receives the libp2p identify protocol request on the same substream id 1 (now Node2 is the initiator).

There is an important misconception here, as these are not the same substreams. In Mplex a substream ID is a tuple of <number> and <role> (dialer/listener or equivalently initiator/receiver). So the IDs 1/initiator and 1/receiver are two different substreams on a particular node and identified by the remote as 1/receiver and 1/initiator respectively. Mplex does this to avoid splitting the number space into even/odd between the two nodes, as yamux does. So, to make this even clearer, maybe this helps (each row is a substream and the entries are the substream "IDs" by which that substream is identified by a particular peer):

  Node A            Node B 
=============================
 1/initiator      1/receiver
 1/receiver       1/initiator

So there is no substream mix-up going on here, as far as I can tell. But your logs are very helpful. In particular what they show is the following:

  1. Node 1 sends the RPC request at ~ 2020-10-28T18:39:13Z.
  2. Node 2 receives the RPC request and substream close at ~ 2020-10-28T18:39:13Z and that substream is now RecvClosed.
  3. Node 2 sends the RPC response at ~ 2020-10-28T18:39:23Z.

Note the 10 second delay of Node 2. At this point Node 1 has likely dropped the substream due to a timeout (better logs may be needed to make this more visible - 10 seconds is the default timeout for inbound/outbound substream upgrades) and hence when it receives the response at ~ 2020-10-28T18:39:23Z it drops the data. So what is going on here looks like a missing task wakeup on Node 2, which I now need to look into.

@romanb
Copy link
Contributor

romanb commented Oct 29, 2020

It's probably unrelated, but I noticed that the RPC connection handler in lighthouse, when it has a response to send for a new idle inbound substream switches the substream state to Busy(f) with a future f that sends out the response. But that future is only polled for the first time when the connection handler is polled again, if I see that right? If so, then it is at least a possible situation where the connection handler may return Pending and only be polled again when the inbound_substreams_delay fires after your 10 second RESPONSE_TIMEOUT.

Since I didn't find any obvious mistakes when re-reviewing the libp2p changes between the two mentioned versions, I'm at a point where I need to reproduce it for myself and experiment with the code, especially since between the two mentioned libp2p versions there is also #1775 which may trigger similar effects. So I would really appreciate short instructions on how to run lighthouse to reproduce the error(s). I'm sure that once I can play around with the code myself, I can get to the bottom of it. Thanks in advance for your help.

@AgeManning
Copy link
Contributor Author

I've had a quick crack at building a reproducible example for you.
It appears locally, having two nodes on the latest libp2p work as expected without issue.

However running lighthouse on our testnet, I immediately see conflicts with older versions. It seems potentially that the update to mplex may not be backwards compatible.

I think there's a lot of noise in running lighthouse on our testnet and it may not be the best way to debug this. @pawanjay176 is working on an isolated example that we can work off.

@AgeManning
Copy link
Contributor Author

AgeManning commented Nov 5, 2020

We have managed to progress a little here. But would appreciate your help in further pinpointing the issue @romanb.

We have an isolated test you can run to reproduce the errors.

Pulling this repo: https://github.com/pawanjay176/libp2p_test.git and using the stripping-down branch.

The test works via first running a recv:

cargo run --bin recv

Then in another terminal run the sender

cargo run --bin sender

You can add arbitrary log events via the RUST_LOG env variable.

In these tests, we've noticed the error occurs for large RPC request/responses.

The basic test is for a sender to request a large number of responses (256). The recv attempts to respond to the request by sending all these responses.
The sender only receives some of these and the stream ends.

Looking at the logs, it seems the sender starts receiving the responses up until the frame buffer gets filled. It then seems to send a reset and drops remaining data. I would have expected the polling of the stream to pull the data off as we received them such that we wouldn't hit this buffer limit.

Also: Thanks for pointing out the non-polling of the Busy() state. That has been corrected :) (but doesn't seem related to this)

@pawanjay176
Copy link
Contributor

To add to @AgeManning 's comment above, these are the logs we get on the sender (who receives the responses) where the stream gets reset because of the buffer getting filled up and then starts dropping data.

[2020-11-05T06:59:31Z TRACE libp2p_mplex::io] 38f409a94ece6b85: Received Data { stream_id: RemoteStreamId { num: 1, role: Listener }, data: b"\0*\xff\x06\0\0sNaPpY\0\n\0\0\\\xb6\xd9O*\0*\xa2\x01\0" }
[2020-11-05T06:59:31Z TRACE libp2p_mplex::io] 38f409a94ece6b85: Buffering b"\0*\xff\x06\0\0sNaPpY\0\n\0\0\\\xb6\xd9O*\0*\xa2\x01\0" for stream (1/initiator) (total: 32)
[2020-11-05T06:59:31Z TRACE libp2p_mplex::io] 38f409a94ece6b85: Received Data { stream_id: RemoteStreamId { num: 1, role: Listener }, data: b"\0*\xff\x06\0\0sNaPpY\0\n\0\0\\\xb6\xd9O*\0*\xa2\x01\0" }
[2020-11-05T06:59:31Z TRACE libp2p_mplex::io] 38f409a94ece6b85: Buffering b"\0*\xff\x06\0\0sNaPpY\0\n\0\0\\\xb6\xd9O*\0*\xa2\x01\0" for stream (1/initiator) (total: 33)
[2020-11-05T06:59:31Z DEBUG libp2p_mplex::io] 38f409a94ece6b85: Frame buffer of stream (1/initiator) is full.
[2020-11-05T06:59:31Z DEBUG libp2p_mplex::io] 38f409a94ece6b85: Pending reset for stream (1/initiator)
[2020-11-05T06:59:31Z TRACE libp2p_mplex::io] 38f409a94ece6b85: Sending Reset { stream_id: LocalStreamId { num: 1, role: Dialer } }
[2020-11-05T06:59:31Z TRACE libp2p_mplex::io] 38f409a94ece6b85: Received Data { stream_id: RemoteStreamId { num: 1, role: Listener }, data: b"\0*\xff\x06\0\0sNaPpY\0\n\0\0\\\xb6\xd9O*\0*\xa2\x01\0" }
[2020-11-05T06:59:31Z TRACE libp2p_mplex::io] 38f409a94ece6b85: Dropping data b"\0*\xff\x06\0\0sNaPpY\0\n\0\0\\\xb6\xd9O*\0*\xa2\x01\0" for closed or reset substream (1/initiator)

I believe we got the original error in lighthouse that we mentioned in the issue

IO Error: bytes remaining on stream

because we didn't get the complete data at the application level since the stream got reset and the rest of the data got dropped.

@divagant-martian
Copy link
Contributor

Some clarifications:

  • In the test both the sender and the receiver use the same version of libp2p (latest)
  • both issues (not receiving all responses and the "bytes remaining on stream" occur polling outbound substreams.
    • not receiving all responses occurs without error on Poll::Ready(None)
    • bytes remaining on stream occurs on Poll::Ready(Some(Err(e))) but we have not been able to reproduce this scenario in a new vs new setup

@romanb
Copy link
Contributor

romanb commented Nov 5, 2020

Thanks for all the new information.

[..] I would have expected the polling of the stream to pull the data off as we received them such that we wouldn't hit this buffer limit.

When a task tries to read from a particular substream, the multiplexer eagerly reads all the frames from the underlying connection until it finds one for the substream in question (or the connection is Pending). All frames of other substreams are buffered in this process. So a single call to StreamMuxer::read_substream() can potentially buffer any number of frames for other substreams. This behaviour isn't really new and there isn't much of a choice for the implementation, but I guess the reason you run into that more frequently now is because of the combination of MaxBufferBehaviour::ResetStream and max_buffer_len: 32. We drastically reduced the default max_buffer_len in #1784 because the buffer is no longer shared between substreams, so to retain the same maximum total buffer size. Meaning the previous buffer size allowed buffering up to 4096 frames for a single substream (if none are buffered for any others). This change in configuration leads to the resets in the test setup you provided, especially because the network I/O has basically no latency.

The options for this problem are the following, both via MplexConfig:

  1. Increase the max_buffer_len e.g. to the previous 4096. We should probably in any case change the default back to 4096 to retain the old behaviour for a single substream, even if that means that the absolute total buffer size across all substreams is naturally larger. The default combination of MaxBufferBehaviour::ResetStream with a very small buffer size per stream seems to be not such a good idea.

  2. Use MaxBufferBehaviour::Block. This will stall reading from the underlying connection until frames are read from the offending full substream buffer. If you try your test with this configuration, even with a low buffer size like 32, it should always work. MaxBufferBehaviour::Block may even be the better default for rust-libp2p.

However, the problem that has been described in earlier comments and whose logs analyzed in #1814 (comment) seems to be different, i.e. a scenario where the sender of the responses suffers from a 10 second delay before it starts sending the responses. I suppose that is not something you were able to reproduce yet?

@divagant-martian
Copy link
Contributor

Hi @romanb that's been addressed by @AgeManning (he mentioned it in the last line of his last comment but it doesn't look like is related. I'll try running a LH node with the new libp2p and the MaxBufferBehaviour::Block to check if we still get the "bytes remaining on stream" issue

@AgeManning
Copy link
Contributor Author

Thanks @romanb

It appears this was the issue. Increasing the buffer size and changing the max buffer behaviour has resolved this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants