From 7d50bcaa9e15c8dba3da5b073e8cfdf783ea41a3 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 16 Jul 2024 19:56:25 +0200 Subject: [PATCH] Send PeerViewChange with high priority (#4755) Closes https://github.com/paritytech/polkadot-sdk/issues/577 ### Changed - `orchestra` updated to 0.4.0 - `PeerViewChange` sent with high priority and should be processed first in a queue. - To count them in tests added tracker to TestSender and TestOverseer. It acts more like a smoke test though. ### Testing on Versi The changes were tested on Versi with two objectives: 1. Make sure the node functionality does not change. 2. See how the changes affect performance. Test setup: - 2.5 hours for each case - 100 validators - 50 parachains - validatorsPerCore = 2 - neededApprovals = 100 - nDelayTranches = 89 - relayVrfModuloSamples = 50 During the test period, all nodes ran without any crashes, which satisfies the first objective. To estimate the change in performance we used ToF charts. The graphs show that there are no spikes in the top as before. This proves that our hypothesis is correct. ### Normalized charts with ToF ![image](https://github.com/user-attachments/assets/0d49d0db-8302-4a8c-a557-501856805ff5) [Before](https://grafana.teleport.parity.io/goto/ZoR53ClSg?orgId=1) ![image](https://github.com/user-attachments/assets/9cc73784-7e45-49d9-8212-152373c05880) [After](https://grafana.teleport.parity.io/goto/6ux5qC_IR?orgId=1) ### Conclusion The prioritization of subsystem messages reduces the ToF of the networking subsystem, which helps faster propagation of gossip messages. --- Cargo.lock | 24 ++++++-- Cargo.toml | 2 +- polkadot/node/malus/src/interceptor.rs | 13 +++- .../src/tests/state.rs | 2 +- polkadot/node/network/bridge/src/rx/mod.rs | 32 ++++++++-- polkadot/node/network/bridge/src/rx/tests.rs | 17 ++++++ polkadot/node/overseer/src/lib.rs | 17 +++--- .../node/subsystem-test-helpers/src/lib.rs | 61 +++++++++++++++++-- prdoc/pr_4755.prdoc | 24 ++++++++ 9 files changed, 165 insertions(+), 27 deletions(-) create mode 100644 prdoc/pr_4755.prdoc diff --git a/Cargo.lock b/Cargo.lock index 9972285780f35..3866a74b3f215 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5376,12 +5376,14 @@ dependencies = [ [[package]] name = "expander" -version = "2.0.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f86a749cf851891866c10515ef6c299b5c69661465e9c3bbe7e07a2b77fb0f7" +checksum = "e2c470c71d91ecbd179935b24170459e926382eaaa86b590b78814e180d8a8e2" dependencies = [ "blake2 0.10.6", + "file-guard", "fs-err", + "prettyplease 0.2.12", "proc-macro2 1.0.82", "quote 1.0.35", "syn 2.0.61", @@ -5514,6 +5516,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" +[[package]] +name = "file-guard" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21ef72acf95ec3d7dbf61275be556299490a245f017cf084bd23b4f68cf9407c" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "file-per-thread-logger" version = "0.1.6" @@ -9554,9 +9566,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestra" -version = "0.3.6" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92829eef0328a3d1cd22a02c0e51deb92a5362df3e7d21a4e9bdc38934694e66" +checksum = "41f6bbacc8c189a3f2e45e0fd0436e5d97f194db888e721bdbc3973e7dbed4c2" dependencies = [ "async-trait", "dyn-clonable", @@ -9571,9 +9583,9 @@ dependencies = [ [[package]] name = "orchestra-proc-macro" -version = "0.3.6" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1344346d5af32c95bbddea91b18a88cc83eac394192d20ef2fc4c40a74332355" +checksum = "f7b1d40dd8f367db3c65bec8d3dd47d4a604ee8874480738f93191bddab4e0e0" dependencies = [ "expander", "indexmap 2.2.3", diff --git a/Cargo.toml b/Cargo.toml index 5c2677fffeb22..0999d63040130 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -855,7 +855,7 @@ num-rational = { version = "0.4.1" } num-traits = { version = "0.2.17", default-features = false } num_cpus = { version = "1.13.1" } once_cell = { version = "1.19.0" } -orchestra = { version = "0.3.5", default-features = false } +orchestra = { version = "0.4.0", default-features = false } pallet-alliance = { path = "substrate/frame/alliance", default-features = false } pallet-asset-conversion = { path = "substrate/frame/asset-conversion", default-features = false } pallet-asset-conversion-ops = { path = "substrate/frame/asset-conversion/ops", default-features = false } diff --git a/polkadot/node/malus/src/interceptor.rs b/polkadot/node/malus/src/interceptor.rs index b44ffc8956b52..2181118646d56 100644 --- a/polkadot/node/malus/src/interceptor.rs +++ b/polkadot/node/malus/src/interceptor.rs @@ -90,6 +90,10 @@ where >::Error: std::fmt::Debug, { async fn send_message(&mut self, msg: OutgoingMessage) { + self.send_message_with_priority::(msg).await; + } + + async fn send_message_with_priority(&mut self, msg: OutgoingMessage) { let msg = < <>::Message as overseer::AssociateOutgoing >::OutgoingMessages as From>::from(msg); @@ -103,7 +107,14 @@ where } } - fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError> { + fn try_send_message( + &mut self, + msg: OutgoingMessage, + ) -> Result<(), polkadot_node_subsystem_util::metered::TrySendError> { + self.try_send_message_with_priority::(msg) + } + + fn try_send_message_with_priority(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError> { let msg = < <>::Message as overseer::AssociateOutgoing >::OutgoingMessages as From>::from(msg); diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index befbff0a2f27e..97e616f79fb75 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -216,7 +216,7 @@ impl TestState { // Test will fail if this does not happen until timeout. let mut remaining_stores = self.valid_chunks.len(); - let TestSubsystemContextHandle { tx, mut rx } = harness.virtual_overseer; + let TestSubsystemContextHandle { tx, mut rx, .. } = harness.virtual_overseer; // Spawning necessary as incoming queue can only hold a single item, we don't want to dead // lock ;-) diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs index 84e935366d0cb..56965ce6ba404 100644 --- a/polkadot/node/network/bridge/src/rx/mod.rs +++ b/polkadot/node/network/bridge/src/rx/mod.rs @@ -1135,13 +1135,33 @@ async fn dispatch_validation_events_to_all( I: IntoIterator>, I::IntoIter: Send, { + macro_rules! send_message { + ($event:expr, $message:ident) => { + if let Ok(event) = $event.focus() { + let has_high_priority = matches!( + event, + // NetworkBridgeEvent::OurViewChange(..) must also be here, + // but it is sent via an unbounded channel. + // See https://github.com/paritytech/polkadot-sdk/issues/824 + NetworkBridgeEvent::PeerConnected(..) | + NetworkBridgeEvent::PeerDisconnected(..) | + NetworkBridgeEvent::PeerViewChange(..) + ); + let message = $message::from(event); + if has_high_priority { + sender.send_message_with_priority::(message).await; + } else { + sender.send_message(message).await; + } + } + }; + } + for event in events { - sender - .send_messages(event.focus().map(StatementDistributionMessage::from)) - .await; - sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await; - sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; - sender.send_messages(event.focus().map(GossipSupportMessage::from)).await; + send_message!(event, StatementDistributionMessage); + send_message!(event, BitfieldDistributionMessage); + send_message!(event, ApprovalDistributionMessage); + send_message!(event, GossipSupportMessage); } } diff --git a/polkadot/node/network/bridge/src/rx/tests.rs b/polkadot/node/network/bridge/src/rx/tests.rs index 6182bf3d883b5..392ff7391a1c1 100644 --- a/polkadot/node/network/bridge/src/rx/tests.rs +++ b/polkadot/node/network/bridge/src/rx/tests.rs @@ -880,6 +880,8 @@ fn peer_view_updates_sent_via_overseer() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } network_handle @@ -895,6 +897,7 @@ fn peer_view_updates_sent_via_overseer() { &mut virtual_overseer, ) .await; + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12); virtual_overseer }); } @@ -930,6 +933,8 @@ fn peer_messages_sent_via_overseer() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } let approval_distribution_message = @@ -970,6 +975,7 @@ fn peer_messages_sent_via_overseer() { &mut virtual_overseer, ) .await; + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12); virtual_overseer }); } @@ -1008,6 +1014,8 @@ fn peer_disconnect_from_just_one_peerset() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } { @@ -1036,6 +1044,7 @@ fn peer_disconnect_from_just_one_peerset() { &mut virtual_overseer, ) .await; + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12); // to show that we're still connected on the collation protocol, send a view update. @@ -1094,6 +1103,8 @@ fn relays_collation_protocol_messages() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } { @@ -1201,6 +1212,8 @@ fn different_views_on_different_peer_sets() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } { @@ -1247,6 +1260,8 @@ fn different_views_on_different_peer_sets() { ) .await; + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12); + assert_sends_collation_event_to_all( NetworkBridgeEvent::PeerViewChange(peer, view_b.clone()), &mut virtual_overseer, @@ -1481,6 +1496,8 @@ fn network_protocol_versioning_subsystem_msg() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } let approval_distribution_message = diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 24985a99913d8..4e13d5eda76f6 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -105,10 +105,11 @@ pub use polkadot_node_metrics::{ pub use orchestra as gen; pub use orchestra::{ - contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket, - OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext, - SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters, - SubsystemSender, TimeoutExt, ToOrchestra, TrySendError, + contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket, + NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived, + Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance, + SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra, + TrySendError, }; #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] @@ -495,7 +496,7 @@ pub struct Overseer { RuntimeApiMessage, ProspectiveParachainsMessage, ChainApiMessage, - ])] + ], can_receive_priority_messages)] statement_distribution: StatementDistribution, #[subsystem(AvailabilityDistributionMessage, sends: [ @@ -524,7 +525,7 @@ pub struct Overseer { RuntimeApiMessage, NetworkBridgeTxMessage, ProvisionerMessage, - ])] + ], can_receive_priority_messages)] bitfield_distribution: BitfieldDistribution, #[subsystem(ProvisionerMessage, sends: [ @@ -580,7 +581,7 @@ pub struct Overseer { #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [ NetworkBridgeTxMessage, ApprovalVotingMessage, - ])] + ], can_receive_priority_messages)] approval_distribution: ApprovalDistribution, #[subsystem(blocking, ApprovalVotingMessage, sends: [ @@ -599,7 +600,7 @@ pub struct Overseer { NetworkBridgeRxMessage, // TODO RuntimeApiMessage, ChainSelectionMessage, - ])] + ], can_receive_priority_messages)] gossip_support: GossipSupport, #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [ diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 375121c374637..bdb0647fee6f5 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -36,7 +36,7 @@ use std::{ convert::Infallible, future::Future, pin::Pin, - sync::Arc, + sync::{atomic::AtomicUsize, Arc}, task::{Context, Poll, Waker}, time::Duration, }; @@ -146,12 +146,13 @@ pub fn single_item_sink() -> (SingleItemSink, SingleItemStream) { #[derive(Clone)] pub struct TestSubsystemSender { tx: mpsc::UnboundedSender, + message_counter: MessageCounter, } /// Construct a sender/receiver pair. pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver) { let (tx, rx) = mpsc::unbounded(); - (TestSubsystemSender { tx }, rx) + (TestSubsystemSender { tx, message_counter: MessageCounter::default() }, rx) } #[async_trait::async_trait] @@ -161,6 +162,11 @@ where OutgoingMessage: Send + 'static, { async fn send_message(&mut self, msg: OutgoingMessage) { + self.send_message_with_priority::(msg).await; + } + + async fn send_message_with_priority(&mut self, msg: OutgoingMessage) { + self.message_counter.increment(P::priority()); self.tx.send(msg.into()).await.expect("test overseer no longer live"); } @@ -168,6 +174,14 @@ where &mut self, msg: OutgoingMessage, ) -> Result<(), TrySendError> { + self.try_send_message_with_priority::(msg) + } + + fn try_send_message_with_priority( + &mut self, + msg: OutgoingMessage, + ) -> Result<(), TrySendError> { + self.message_counter.increment(P::priority()); self.tx.unbounded_send(msg.into()).expect("test overseer no longer live"); Ok(()) } @@ -277,6 +291,9 @@ pub struct TestSubsystemContextHandle { /// Direct access to the receiver. pub rx: mpsc::UnboundedReceiver, + + /// Message counter over subsystems. + pub message_counter: MessageCounter, } impl TestSubsystemContextHandle { @@ -322,6 +339,34 @@ pub fn make_subsystem_context( make_buffered_subsystem_context(spawner, 0) } +/// Message counter over subsystems. +#[derive(Default, Clone)] +pub struct MessageCounter { + total: Arc, + with_high_priority: Arc, +} + +impl MessageCounter { + /// Increment the message counter. + pub fn increment(&mut self, priority_level: overseer::PriorityLevel) { + self.total.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if matches!(priority_level, overseer::PriorityLevel::High) { + self.with_high_priority.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + } + + /// Reset the message counter. + pub fn reset(&mut self) { + self.total.store(0, std::sync::atomic::Ordering::SeqCst); + self.with_high_priority.store(0, std::sync::atomic::Ordering::SeqCst); + } + + /// Get the messages with high priority count. + pub fn with_high_priority(&self) -> usize { + self.with_high_priority.load(std::sync::atomic::Ordering::SeqCst) + } +} + /// Make a test subsystem context with buffered overseer channel. Some tests (e.g. /// `dispute-coordinator`) create too many parallel operations and deadlock unless /// the channel is buffered. Usually `buffer_size=1` is enough. @@ -331,15 +376,23 @@ pub fn make_buffered_subsystem_context( ) -> (TestSubsystemContext>, TestSubsystemContextHandle) { let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size); let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); + let message_counter = MessageCounter::default(); ( TestSubsystemContext { - tx: TestSubsystemSender { tx: all_messages_tx }, + tx: TestSubsystemSender { + tx: all_messages_tx, + message_counter: message_counter.clone(), + }, rx: overseer_rx, spawn: SpawnGlue(spawner), message_buffer: VecDeque::new(), }, - TestSubsystemContextHandle { tx: overseer_tx, rx: all_messages_rx }, + TestSubsystemContextHandle { + tx: overseer_tx, + rx: all_messages_rx, + message_counter: message_counter.clone(), + }, ) } diff --git a/prdoc/pr_4755.prdoc b/prdoc/pr_4755.prdoc new file mode 100644 index 0000000000000..1018446cb67e7 --- /dev/null +++ b/prdoc/pr_4755.prdoc @@ -0,0 +1,24 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Send PeerViewChange with high priority + +doc: + - audience: Node Dev + description: | + - orchestra updated to 0.4.0, which introduces support for prioritizing system messages. + - PeerViewChange sent with high priority and should be processed first in a queue. + - To count them in tests added tracker to TestSender and TestOverseer. It acts more like a smoke test though. + + +crates: + - name: polkadot-overseer + bump: minor + - name: polkadot-network-bridge + bump: patch + - name: polkadot-availability-distribution + bump: patch + - name: polkadot-test-malus + bump: patch + - name: polkadot-node-subsystem-test-helpers + bump: patch