Skip to content

Commit

Permalink
encapsulates turbine peers computations of broadcast & retransmit sta…
Browse files Browse the repository at this point in the history
…ges (solana-labs#18238) (solana-labs#18464)

Broadcast stage and retransmit stage should arrange nodes on turbine
broadcast tree in exactly same order. Additionally any changes to this
ordering (e.g. updating how unstaked nodes are handled) requires feature
gating to keep the cluster in sync.

Current implementation is scattered out over several public methods and
exposes too much of implementation details (e.g. usize indices into
peers vector) which makes code changes and checking for feature
activations more difficult.

This commit encapsulates turbine peer computations into a new struct,
and only exposes two public methods, get_broadcast_peer and
get_retransmit_peers, for call-sites.

(cherry picked from commit 04787be)

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
mergify[bot] and behzadnouri authored Jul 7, 2021
1 parent 0d0478c commit c534c92
Show file tree
Hide file tree
Showing 12 changed files with 589 additions and 273 deletions.
17 changes: 10 additions & 7 deletions core/benches/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
extern crate test;

use rand::{thread_rng, Rng};
use solana_core::broadcast_stage::broadcast_metrics::TransmitShredsStats;
use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers};
use solana_gossip::cluster_info::{ClusterInfo, Node};
use solana_gossip::contact_info::ContactInfo;
use solana_core::{
broadcast_stage::{broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage},
cluster_nodes::ClusterNodes,
};
use solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
};
use solana_ledger::shred::Shred;
use solana_sdk::pubkey;
use solana_sdk::timing::timestamp;
Expand Down Expand Up @@ -36,16 +40,15 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
}
let cluster_info = Arc::new(cluster_info);
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes));
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
let shreds = Arc::new(shreds);
let last_datapoint = Arc::new(AtomicU64::new(0));
bencher.iter(move || {
let shreds = shreds.clone();
broadcast_shreds(
&socket,
&shreds,
&peers_and_stakes,
&peers,
&cluster_nodes,
&last_datapoint,
&mut TransmitShredsStats::default(),
)
Expand Down
67 changes: 11 additions & 56 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ use self::{
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
standard_broadcast_run::StandardBroadcastRun,
};
use crate::result::{Error, Result};
use crate::{
cluster_nodes::ClusterNodes,
result::{Error, Result},
};
use crossbeam_channel::{
Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError,
Sender as CrossbeamSender,
};
use solana_gossip::{
cluster_info::{self, ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
weighted_shuffle::weighted_best,
};
use solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError};
use solana_ledger::{blockstore::Blockstore, shred::Shred};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
Expand Down Expand Up @@ -380,37 +378,26 @@ fn update_peer_stats(
}
}

pub fn get_broadcast_peers(
cluster_info: &ClusterInfo,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = cluster_info.tvu_peers();
let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes);
(peers, peers_and_stakes)
}

/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
peers_and_stakes: &[(u64, usize)],
peers: &[ContactInfo],
cluster_nodes: &ClusterNodes<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicU64>,
transmit_stats: &mut TransmitShredsStats,
) -> Result<()> {
let broadcast_len = peers_and_stakes.len();
let broadcast_len = cluster_nodes.num_peers();
if broadcast_len == 0 {
update_peer_stats(1, 1, last_datapoint_submit);
return Ok(());
}
let mut shred_select = Measure::start("shred_select");
let packets: Vec<_> = shreds
.iter()
.map(|shred| {
let broadcast_index = weighted_best(peers_and_stakes, shred.seed());

(&shred.payload, &peers[broadcast_index].tvu)
.filter_map(|shred| {
let node = cluster_nodes.get_broadcast_peer(shred.seed())?;
Some((&shred.payload, &node.tvu))
})
.collect();
shred_select.stop();
Expand All @@ -429,7 +416,7 @@ pub fn broadcast_shreds(
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();

let num_live_peers = num_live_peers(peers);
let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64;
update_peer_stats(
num_live_peers,
broadcast_len as i64 + 1,
Expand All @@ -438,25 +425,6 @@ pub fn broadcast_shreds(
Ok(())
}

fn distance(a: u64, b: u64) -> u64 {
if a > b {
a - b
} else {
b - a
}
}

fn num_live_peers(peers: &[ContactInfo]) -> i64 {
let mut num_live_peers = 1i64;
peers.iter().for_each(|p| {
// A peer is considered live if they generated their contact info recently
if distance(timestamp(), p.wallclock) <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS {
num_live_peers += 1;
}
});
num_live_peers
}

#[cfg(test)]
pub mod test {
use super::*;
Expand Down Expand Up @@ -540,19 +508,6 @@ pub mod test {
assert_eq!(num_expected_coding_shreds, coding_index);
}

#[test]
fn test_num_live_peers() {
let mut ci = ContactInfo {
wallclock: std::u64::MAX,
..ContactInfo::default()
};
assert_eq!(num_live_peers(&[ci.clone()]), 1);
ci.wallclock = timestamp() - 1;
assert_eq!(num_live_peers(&[ci.clone()]), 2);
ci.wallclock = timestamp() - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS - 1;
assert_eq!(num_live_peers(&[ci]), 1);
}

#[test]
fn test_duplicate_retransmit_signal() {
// Setup
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::cluster_nodes::ClusterNodes;
use solana_ledger::shred::Shredder;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;
Expand Down Expand Up @@ -134,13 +135,14 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
) -> Result<()> {
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
// Broadcast data
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes.as_deref());

let cluster_nodes = ClusterNodes::<BroadcastStage>::new(
cluster_info,
stakes.as_deref().unwrap_or(&HashMap::default()),
);
broadcast_shreds(
sock,
&shreds,
&peers_and_stakes,
&peers,
&cluster_nodes,
&Arc::new(AtomicU64::new(0)),
&mut TransmitShredsStats::default(),
)?;
Expand Down
27 changes: 10 additions & 17 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{
broadcast_utils::{self, ReceiveResults},
*,
};
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes};
use solana_ledger::{
entry::Entry,
shred::{
Expand All @@ -27,16 +27,10 @@ pub struct StandardBroadcastRun {
shred_version: u16,
last_datapoint_submit: Arc<AtomicU64>,
num_batches: usize,
broadcast_peer_cache: Arc<RwLock<BroadcastPeerCache>>,
cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>,
last_peer_update: Arc<AtomicU64>,
}

#[derive(Default)]
struct BroadcastPeerCache {
peers: Vec<ContactInfo>,
peers_and_stakes: Vec<(u64, usize)>,
}

impl StandardBroadcastRun {
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
Self {
Expand All @@ -50,7 +44,7 @@ impl StandardBroadcastRun {
shred_version,
last_datapoint_submit: Arc::default(),
num_batches: 0,
broadcast_peer_cache: Arc::default(),
cluster_nodes: Arc::default(),
last_peer_update: Arc::default(),
}
}
Expand Down Expand Up @@ -354,26 +348,25 @@ impl StandardBroadcastRun {
.compare_and_swap(now, last, Ordering::Relaxed)
== last
{
let mut w_broadcast_peer_cache = self.broadcast_peer_cache.write().unwrap();
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
w_broadcast_peer_cache.peers = peers;
w_broadcast_peer_cache.peers_and_stakes = peers_and_stakes;
*self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new(
cluster_info,
stakes.unwrap_or(&HashMap::default()),
);
}
get_peers_time.stop();
let r_broadcast_peer_cache = self.broadcast_peer_cache.read().unwrap();
let cluster_nodes = self.cluster_nodes.read().unwrap();

let mut transmit_stats = TransmitShredsStats::default();
// Broadcast the shreds
let mut transmit_time = Measure::start("broadcast_shreds");
broadcast_shreds(
sock,
&shreds,
&r_broadcast_peer_cache.peers_and_stakes,
&r_broadcast_peer_cache.peers,
&cluster_nodes,
&self.last_datapoint_submit,
&mut transmit_stats,
)?;
drop(r_broadcast_peer_cache);
drop(cluster_nodes);
transmit_time.stop();

transmit_stats.transmit_elapsed = transmit_time.as_us();
Expand Down
Loading

0 comments on commit c534c92

Please sign in to comment.