Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect closed notification substreams instead of evicting all peers #3983

Merged
merged 8 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions prdoc/pr_3983.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Detect closed notification substreams instead of evicting all peers

doc:
- audience: Node Operator
description: |
Replace eviction of all peers when syncing has completely stalled with a more granular
detection of notification substreams closed by remote peers.

This change is expected to make the _reported_ peer count lower, as before this change
the number also included already closed substreams. Nevertheless, the currently reported
number should be seen as the real number of working connections.

crates:
- name: sc-network-sync
bump: minor
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ impl ConnectionHandler for NotifsHandler {
// performed before the code paths that can produce `Ready` (with some rare exceptions).
// Importantly, however, the flush is performed *after* notifications are queued with
// `Sink::start_send`.
// Note that we must call `poll_flush` on all substreams and not only on those we
// have called `Sink::start_send` on, because `NotificationsOutSubstream::poll_flush`
// also reports the substream termination (even if no data was written into it).
for protocol_index in 0..self.protocols.len() {
match &mut self.protocols[protocol_index].state {
State::Open { out_substream: out_substream @ Some(_), .. } => {
Expand Down Expand Up @@ -841,7 +844,7 @@ impl ConnectionHandler for NotifsHandler {
State::OpenDesiredByRemote { in_substream, pending_opening } =>
match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
Poll::Pending => {},
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
self.protocols[protocol_index].state =
State::Closed { pending_opening: *pending_opening };
Expand All @@ -857,7 +860,7 @@ impl ConnectionHandler for NotifsHandler {
cx,
) {
Poll::Pending => {},
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => *in_substream = None,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use log::{error, warn};
use unsigned_varint::codec::UviBytes;

use std::{
convert::Infallible,
io, mem,
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -223,10 +222,7 @@ where

/// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is
/// guaranteed to not generate any notification.
pub fn poll_process(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<Infallible, io::Error>> {
pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
let mut this = self.project();

loop {
Expand All @@ -248,8 +244,10 @@ where
},
NotificationsInSubstreamHandshake::Flush => {
match Sink::poll_flush(this.socket.as_mut(), cx)? {
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::Sent,
Poll::Ready(()) => {
*this.handshake = NotificationsInSubstreamHandshake::Sent;
return Poll::Ready(Ok(()));
},
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Flush;
return Poll::Pending
Expand All @@ -262,7 +260,7 @@ where
st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
*this.handshake = st;
return Poll::Pending
return Poll::Ready(Ok(()));
},
}
}
Expand Down Expand Up @@ -445,6 +443,21 @@ where

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();

// `Sink::poll_flush` does not expose stream closed error until we write something into
// the stream, so the code below makes sure we detect that the substream was closed
// even if we don't write anything into it.
match Stream::poll_next(this.socket.as_mut(), cx) {
Poll::Pending => {},
Poll::Ready(Some(_)) => {
error!(
target: "sub-libp2p",
"Unexpected incoming data in `NotificationsOutSubstream`",
);
},
Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)),
}

Sink::poll_flush(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
}

Expand Down Expand Up @@ -494,13 +507,19 @@ pub enum NotificationsOutError {
/// I/O error on the substream.
#[error(transparent)]
Io(#[from] io::Error),
#[error("substream was closed/reset")]
Terminated,
}

#[cfg(test)]
mod tests {
use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen};
use futures::{channel::oneshot, prelude::*};
use super::{
NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutError,
NotificationsOutOpen,
};
use futures::{channel::oneshot, future, prelude::*};
use libp2p::core::upgrade;
use std::{pin::Pin, task::Poll};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::TokioAsyncReadCompatExt;

Expand Down Expand Up @@ -693,4 +712,95 @@ mod tests {

client.await.unwrap();
}

#[tokio::test]
async fn send_handshake_without_polling_for_incoming_data() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();

let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, .. } = upgrade::apply_outbound(
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1,
)
.await
.unwrap();

assert_eq!(handshake, b"hello world");
});

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();

let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();

assert_eq!(handshake, b"initial message");
substream.send_handshake(&b"hello world"[..]);

// Actually send the handshake.
future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap();

client.await.unwrap();
}

#[tokio::test]
async fn can_detect_dropped_out_substream_without_writing_data() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();

let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1,
)
.await
.unwrap();

assert_eq!(handshake, b"hello world");

future::poll_fn(|cx| match Pin::new(&mut substream).poll_flush(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
cx.waker().wake_by_ref();
Poll::Pending
},
Poll::Ready(Err(e)) => {
assert!(matches!(e, NotificationsOutError::Terminated));
Poll::Ready(())
},
})
.await;
});

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();

let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();

assert_eq!(handshake, b"initial message");

// Send the handhsake.
substream.send_handshake(&b"hello world"[..]);
future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap();

drop(substream);

client.await.unwrap();
}
}
68 changes: 0 additions & 68 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ use std::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};

/// Interval at which we perform time based maintenance
Expand All @@ -99,23 +98,6 @@ const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100)
/// Maximum number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead

/// If the block announces stream to peer has been inactive for 30 seconds meaning local node
/// has not sent or received block announcements to/from the peer, report the node for inactivity,
/// disconnect it and attempt to establish connection to some other peer.
const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30);

/// When `SyncingEngine` is started, wait two minutes before actually staring to count peers as
/// evicted.
///
/// Parachain collator may incorrectly get evicted because it's waiting to receive a number of
/// relaychain blocks before it can start creating parachain blocks. During this wait,
/// `SyncingEngine` still counts it as active and as the peer is not sending blocks, it may get
/// evicted if a block is not received within the first 30 seconds since the peer connected.
///
/// To prevent this from happening, define a threshold for how long `SyncingEngine` should wait
/// before it starts evicting peers.
const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 60);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: The worst that can happen now is to stall indefinitely? And we presume that previously, we evicted peers because they closed their notification substreams but we didn't notice it on time (ie before the eviction timer triggered all peers to be dropped)? 🤔

I think this is related to: #3346

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order for this to happen, all peers have to keep the notification channel open and misbehave by not providing any useful data. This seems unlikely to me.

Yes, the peer eviction kicked in only if we had all notification streams half-closed, but didn't notice it.

I think we can revisit peer eviction and may be come up with a solution tracking individual peers (what has been done at some point i the past).

Also, people complained that suddenly the number of connected peers dropped to zero due to eviction. Now it should work on an individual peer level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we can keep the eviction as is for now and merge only the closed substream detection, and handle eviction in a separate PR.

@lexnv @bkchr what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just added this to work around the bug that this pr is fixing, thus I think it is fine to remove this. Especially as we already have seen issues with it.


/// Maximum allowed size for a block announce.
const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;

Expand All @@ -125,8 +107,6 @@ mod rep {
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer send us a block announcement that failed at validation.
pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
/// Block announce substream with the peer has been inactive too long
pub const INACTIVE_SUBSTREAM: Rep = Rep::new(-(1 << 10), "Inactive block announce substream");
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// Peer is on unsupported protocol version.
Expand Down Expand Up @@ -289,18 +269,9 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Handle that is used to communicate with `sc_network::Notifications`.
notification_service: Box<dyn NotificationService>,

/// When the syncing was started.
///
/// Stored as an `Option<Instant>` so once the initial wait has passed, `SyncingEngine`
/// can reset the peer timers and continue with the normal eviction process.
syncing_started: Option<Instant>,

/// Handle to `PeerStore`.
peer_store_handle: PeerStoreHandle,

/// Instant when the last notification was sent or received.
last_notification_io: Instant,

/// Pending responses
pending_responses: PendingResponses<B>,

Expand Down Expand Up @@ -490,9 +461,7 @@ where
event_streams: Vec::new(),
notification_service,
tick_timeout,
syncing_started: None,
peer_store_handle,
last_notification_io: Instant::now(),
metrics: if let Some(r) = metrics_registry {
match Metrics::register(r, is_major_syncing.clone()) {
Ok(metrics) => Some(metrics),
Expand Down Expand Up @@ -638,15 +607,12 @@ where
data: Some(data.clone()),
};

self.last_notification_io = Instant::now();
let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
}
}
}

pub async fn run(mut self) {
self.syncing_started = Some(Instant::now());

loop {
tokio::select! {
_ = self.tick_timeout.tick() => self.perform_periodic_actions(),
Expand Down Expand Up @@ -780,39 +746,6 @@ where

fn perform_periodic_actions(&mut self) {
self.report_metrics();

// if `SyncingEngine` has just started, don't evict seemingly inactive peers right away
// as they may not have produced blocks not because they've disconnected but because
// they're still waiting to receive enough relaychain blocks to start producing blocks.
if let Some(started) = self.syncing_started {
if started.elapsed() < INITIAL_EVICTION_WAIT_PERIOD {
return
}

self.syncing_started = None;
self.last_notification_io = Instant::now();
}

// if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`,
// it means the local node has stalled and is connected to peers who either don't
// consider it connected or are also all stalled. In order to unstall the node,
// disconnect all peers and allow `ProtocolController` to establish new connections.
if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
log::debug!(
target: LOG_TARGET,
"syncing has halted due to inactivity, evicting all peers",
);

for peer in self.peers.keys() {
self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
self.network_service
.disconnect_peer(*peer, self.block_announce_protocol_name.clone());
}

// after all the peers have been evicted, start timer again to prevent evicting
// new peers that join after the old peer have been evicted
self.last_notification_io = Instant::now();
}
}

fn process_service_command(&mut self, command: ToServiceCommand<B>) {
Expand Down Expand Up @@ -947,7 +880,6 @@ where
return
};

self.last_notification_io = Instant::now();
self.push_block_announce_validation(peer, announce);
},
}
Expand Down
Loading