diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 54750a249704f1..9f526eb59b8831 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -3,10 +3,14 @@ extern crate test; use rand::{thread_rng, Rng}; -use solana_core::broadcast_stage::broadcast_metrics::TransmitShredsStats; -use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers}; -use solana_gossip::cluster_info::{ClusterInfo, Node}; -use solana_gossip::contact_info::ContactInfo; +use solana_core::{ + broadcast_stage::{broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage}, + cluster_nodes::ClusterNodes, +}; +use solana_gossip::{ + cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, +}; use solana_ledger::shred::Shred; use solana_sdk::pubkey; use solana_sdk::timing::timestamp; @@ -36,7 +40,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); } let cluster_info = Arc::new(cluster_info); - let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes)); + let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); let shreds = Arc::new(shreds); let last_datapoint = Arc::new(AtomicU64::new(0)); bencher.iter(move || { @@ -44,8 +48,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { broadcast_shreds( &socket, &shreds, - &peers_and_stakes, - &peers, + &cluster_nodes, &last_datapoint, &mut TransmitShredsStats::default(), ) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index a0953aee628e3e..ede892b320fd32 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -6,17 +6,15 @@ use self::{ fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, standard_broadcast_run::StandardBroadcastRun, }; -use crate::result::{Error, Result}; +use crate::{ + cluster_nodes::ClusterNodes, + result::{Error, Result}, +}; use crossbeam_channel::{ Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError, Sender as CrossbeamSender, }; -use solana_gossip::{ - cluster_info::{self, ClusterInfo, ClusterInfoError}, - contact_info::ContactInfo, - crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, - weighted_shuffle::weighted_best, -}; +use solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}; use solana_ledger::{blockstore::Blockstore, shred::Shred}; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; @@ -380,26 +378,16 @@ fn update_peer_stats( } } -pub fn get_broadcast_peers( - cluster_info: &ClusterInfo, - stakes: Option<&HashMap>, -) -> (Vec, Vec<(u64, usize)>) { - let mut peers = cluster_info.tvu_peers(); - let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes); - (peers, peers_and_stakes) -} - /// broadcast messages from the leader to layer 1 nodes /// # Remarks pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], - peers_and_stakes: &[(u64, usize)], - peers: &[ContactInfo], + cluster_nodes: &ClusterNodes, last_datapoint_submit: &Arc, transmit_stats: &mut TransmitShredsStats, ) -> Result<()> { - let broadcast_len = peers_and_stakes.len(); + let broadcast_len = cluster_nodes.num_peers(); if broadcast_len == 0 { update_peer_stats(1, 1, last_datapoint_submit); return Ok(()); @@ -407,10 +395,9 @@ pub fn broadcast_shreds( let mut shred_select = Measure::start("shred_select"); let packets: Vec<_> = shreds .iter() - .map(|shred| { - let broadcast_index = weighted_best(peers_and_stakes, shred.seed()); - - (&shred.payload, &peers[broadcast_index].tvu) + .filter_map(|shred| { + let node = cluster_nodes.get_broadcast_peer(shred.seed())?; + Some((&shred.payload, &node.tvu)) }) .collect(); shred_select.stop(); @@ -429,7 +416,7 @@ pub fn broadcast_shreds( send_mmsg_time.stop(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); - let num_live_peers = num_live_peers(peers); + let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64; update_peer_stats( num_live_peers, broadcast_len as i64 + 1, @@ -438,25 +425,6 @@ pub fn broadcast_shreds( Ok(()) } -fn distance(a: u64, b: u64) -> u64 { - if a > b { - a - b - } else { - b - a - } -} - -fn num_live_peers(peers: &[ContactInfo]) -> i64 { - let mut num_live_peers = 1i64; - peers.iter().for_each(|p| { - // A peer is considered live if they generated their contact info recently - if distance(timestamp(), p.wallclock) <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS { - num_live_peers += 1; - } - }); - num_live_peers -} - #[cfg(test)] pub mod test { use super::*; @@ -540,19 +508,6 @@ pub mod test { assert_eq!(num_expected_coding_shreds, coding_index); } - #[test] - fn test_num_live_peers() { - let mut ci = ContactInfo { - wallclock: std::u64::MAX, - ..ContactInfo::default() - }; - assert_eq!(num_live_peers(&[ci.clone()]), 1); - ci.wallclock = timestamp() - 1; - assert_eq!(num_live_peers(&[ci.clone()]), 2); - ci.wallclock = timestamp() - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS - 1; - assert_eq!(num_live_peers(&[ci]), 1); - } - #[test] fn test_duplicate_retransmit_signal() { // Setup diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index b66681786d6f7d..720aec8d21deb1 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,4 +1,5 @@ use super::*; +use crate::cluster_nodes::ClusterNodes; use solana_ledger::shred::Shredder; use solana_sdk::hash::Hash; use solana_sdk::signature::Keypair; @@ -134,13 +135,14 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) -> Result<()> { let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; // Broadcast data - let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes.as_deref()); - + let cluster_nodes = ClusterNodes::::new( + cluster_info, + stakes.as_deref().unwrap_or(&HashMap::default()), + ); broadcast_shreds( sock, &shreds, - &peers_and_stakes, - &peers, + &cluster_nodes, &Arc::new(AtomicU64::new(0)), &mut TransmitShredsStats::default(), )?; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 8b9cf78e27f961..f618d6a0a787c1 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -4,7 +4,7 @@ use super::{ broadcast_utils::{self, ReceiveResults}, *, }; -use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; +use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}; use solana_ledger::{ entry::Entry, shred::{ @@ -27,16 +27,10 @@ pub struct StandardBroadcastRun { shred_version: u16, last_datapoint_submit: Arc, num_batches: usize, - broadcast_peer_cache: Arc>, + cluster_nodes: Arc>>, last_peer_update: Arc, } -#[derive(Default)] -struct BroadcastPeerCache { - peers: Vec, - peers_and_stakes: Vec<(u64, usize)>, -} - impl StandardBroadcastRun { pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { Self { @@ -50,7 +44,7 @@ impl StandardBroadcastRun { shred_version, last_datapoint_submit: Arc::default(), num_batches: 0, - broadcast_peer_cache: Arc::default(), + cluster_nodes: Arc::default(), last_peer_update: Arc::default(), } } @@ -354,13 +348,13 @@ impl StandardBroadcastRun { .compare_and_swap(now, last, Ordering::Relaxed) == last { - let mut w_broadcast_peer_cache = self.broadcast_peer_cache.write().unwrap(); - let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); - w_broadcast_peer_cache.peers = peers; - w_broadcast_peer_cache.peers_and_stakes = peers_and_stakes; + *self.cluster_nodes.write().unwrap() = ClusterNodes::::new( + cluster_info, + stakes.unwrap_or(&HashMap::default()), + ); } get_peers_time.stop(); - let r_broadcast_peer_cache = self.broadcast_peer_cache.read().unwrap(); + let cluster_nodes = self.cluster_nodes.read().unwrap(); let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds @@ -368,12 +362,11 @@ impl StandardBroadcastRun { broadcast_shreds( sock, &shreds, - &r_broadcast_peer_cache.peers_and_stakes, - &r_broadcast_peer_cache.peers, + &cluster_nodes, &self.last_datapoint_submit, &mut transmit_stats, )?; - drop(r_broadcast_peer_cache); + drop(cluster_nodes); transmit_time.stop(); transmit_stats.transmit_elapsed = transmit_time.as_us(); diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs new file mode 100644 index 00000000000000..c19f841b8638e5 --- /dev/null +++ b/core/src/cluster_nodes.rs @@ -0,0 +1,441 @@ +use { + crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, + itertools::Itertools, + solana_gossip::{ + cluster_info::{compute_retransmit_peers, ClusterInfo}, + contact_info::ContactInfo, + crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + weighted_shuffle::{weighted_best, weighted_shuffle}, + }, + solana_sdk::pubkey::Pubkey, + std::{any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData}, +}; + +enum NodeId { + // TVU node obtained through gossip (staked or not). + ContactInfo(ContactInfo), + // Staked node with no contact-info in gossip table. + Pubkey(Pubkey), +} + +struct Node { + node: NodeId, + stake: u64, +} + +pub struct ClusterNodes { + pubkey: Pubkey, // The local node itself. + // All staked nodes + other known tvu-peers + the node itself; + // sorted by (stake, pubkey) in descending order. + nodes: Vec, + // Weights and indices for sampling peers. weighted_{shuffle,best} expect + // weights >= 1. For backward compatibility we use max(1, stake) for + // weights and exclude nodes with no contact-info. + index: Vec<(/*weight:*/ u64, /*index:*/ usize)>, + _phantom: PhantomData, +} + +impl Node { + #[inline] + fn pubkey(&self) -> Pubkey { + match &self.node { + NodeId::Pubkey(pubkey) => *pubkey, + NodeId::ContactInfo(node) => node.id, + } + } + + #[inline] + fn contact_info(&self) -> Option<&ContactInfo> { + match &self.node { + NodeId::Pubkey(_) => None, + NodeId::ContactInfo(node) => Some(node), + } + } +} + +impl ClusterNodes { + pub fn num_peers(&self) -> usize { + self.index.len() + } + + // A peer is considered live if they generated their contact info recently. + pub fn num_peers_live(&self, now: u64) -> usize { + self.index + .iter() + .filter_map(|(_, index)| self.nodes[*index].contact_info()) + .filter(|node| { + let elapsed = if node.wallclock < now { + now - node.wallclock + } else { + node.wallclock - now + }; + elapsed < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + }) + .count() + } +} + +impl ClusterNodes { + pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap) -> Self { + new_cluster_nodes(cluster_info, stakes) + } + + /// Returns the root of turbine broadcast tree, which the leader sends the + /// shred to. + pub fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { + if self.index.is_empty() { + None + } else { + let index = weighted_best(&self.index, shred_seed); + match &self.nodes[index].node { + NodeId::ContactInfo(node) => Some(node), + NodeId::Pubkey(_) => panic!("this should not happen!"), + } + } + } +} + +impl ClusterNodes { + pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap) -> Self { + new_cluster_nodes(cluster_info, stakes) + } + + pub fn get_retransmit_peers( + &self, + shred_seed: [u8; 32], + fanout: usize, + slot_leader: Option, + ) -> ( + Vec<&ContactInfo>, // neighbors + Vec<&ContactInfo>, // children + ) { + // Exclude leader from list of nodes. + let index = self.index.iter().copied(); + let (weights, index): (Vec, Vec) = match slot_leader { + None => { + error!("unknown leader for shred slot"); + index.unzip() + } + Some(slot_leader) if slot_leader == self.pubkey => { + error!("retransmit from slot leader: {}", slot_leader); + index.unzip() + } + Some(slot_leader) => index + .filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader) + .unzip(), + }; + let index: Vec<_> = { + let shuffle = weighted_shuffle(&weights, shred_seed); + shuffle.into_iter().map(|i| index[i]).collect() + }; + let self_index = index + .iter() + .position(|i| self.nodes[*i].pubkey() == self.pubkey) + .unwrap(); + let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &index); + // Assert that the node itself is included in the set of neighbors, at + // the right offset. + debug_assert_eq!( + self.nodes[neighbors[self_index % fanout]].pubkey(), + self.pubkey + ); + let get_contact_infos = |index: Vec| -> Vec<&ContactInfo> { + index + .into_iter() + .map(|i| self.nodes[i].contact_info().unwrap()) + .collect() + }; + (get_contact_infos(neighbors), get_contact_infos(children)) + } +} + +fn new_cluster_nodes( + cluster_info: &ClusterInfo, + stakes: &HashMap, +) -> ClusterNodes { + let self_pubkey = cluster_info.id(); + let nodes = get_nodes(cluster_info, stakes); + let broadcast = TypeId::of::() == TypeId::of::(); + // For backward compatibility: + // * nodes which do not have contact-info are excluded. + // * stakes are floored at 1. + // The sorting key here should be equivalent to + // solana_gossip::deprecated::sorted_stakes_with_index. + // Leader itself is excluded when sampling broadcast peers. + let index = nodes + .iter() + .enumerate() + .filter(|(_, node)| node.contact_info().is_some()) + .filter(|(_, node)| !broadcast || node.pubkey() != self_pubkey) + .sorted_by_key(|(_, node)| Reverse((node.stake.max(1), node.pubkey()))) + .map(|(index, node)| (node.stake.max(1), index)) + .collect(); + ClusterNodes { + pubkey: self_pubkey, + nodes, + index, + _phantom: PhantomData::default(), + } +} + +// All staked nodes + other known tvu-peers + the node itself; +// sorted by (stake, pubkey) in descending order. +fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec { + let self_pubkey = cluster_info.id(); + // The local node itself. + std::iter::once({ + let stake = stakes.get(&self_pubkey).copied().unwrap_or_default(); + let node = NodeId::from(cluster_info.my_contact_info()); + Node { node, stake } + }) + // All known tvu-peers from gossip. + .chain(cluster_info.tvu_peers().into_iter().map(|node| { + let stake = stakes.get(&node.id).copied().unwrap_or_default(); + let node = NodeId::from(node); + Node { node, stake } + })) + // All staked nodes. + .chain( + stakes + .iter() + .filter(|(_, stake)| **stake > 0) + .map(|(&pubkey, &stake)| Node { + node: NodeId::from(pubkey), + stake, + }), + ) + .sorted_by_key(|node| Reverse((node.stake, node.pubkey()))) + // Since sorted_by_key is stable, in case of duplicates, this + // will keep nodes with contact-info. + .dedup_by(|a, b| a.pubkey() == b.pubkey()) + .collect() +} + +impl From for NodeId { + fn from(node: ContactInfo) -> Self { + NodeId::ContactInfo(node) + } +} + +impl From for NodeId { + fn from(pubkey: Pubkey) -> Self { + NodeId::Pubkey(pubkey) + } +} + +impl Default for ClusterNodes { + fn default() -> Self { + Self { + pubkey: Pubkey::default(), + nodes: Vec::default(), + index: Vec::default(), + _phantom: PhantomData::default(), + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + rand::{seq::SliceRandom, Rng}, + solana_gossip::{ + crds_value::{CrdsData, CrdsValue}, + deprecated::{ + shuffle_peers_and_index, sorted_retransmit_peers_and_stakes, + sorted_stakes_with_index, + }, + }, + solana_sdk::timing::timestamp, + std::iter::repeat_with, + }; + + // Legacy methods copied for testing backward compatibility. + + fn get_broadcast_peers( + cluster_info: &ClusterInfo, + stakes: Option<&HashMap>, + ) -> (Vec, Vec<(u64, usize)>) { + let mut peers = cluster_info.tvu_peers(); + let peers_and_stakes = stake_weight_peers(&mut peers, stakes); + (peers, peers_and_stakes) + } + + fn stake_weight_peers( + peers: &mut Vec, + stakes: Option<&HashMap>, + ) -> Vec<(u64, usize)> { + peers.dedup(); + sorted_stakes_with_index(peers, stakes) + } + + fn make_cluster( + rng: &mut R, + ) -> ( + Vec, + HashMap, // stakes + ClusterInfo, + ) { + let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None)) + .take(1000) + .collect(); + nodes.shuffle(rng); + let this_node = nodes[0].clone(); + let mut stakes: HashMap = nodes + .iter() + .filter_map(|node| { + if rng.gen_ratio(1, 7) { + None // No stake for some of the nodes. + } else { + Some((node.id, rng.gen_range(0, 20))) + } + }) + .collect(); + // Add some staked nodes with no contact-info. + stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0, 20))).take(100)); + let cluster_info = ClusterInfo::new_with_invalid_keypair(this_node); + { + let now = timestamp(); + let mut gossip = cluster_info.gossip.write().unwrap(); + // First node is pushed to crds table by ClusterInfo constructor. + for node in nodes.iter().skip(1) { + let node = CrdsData::ContactInfo(node.clone()); + let node = CrdsValue::new_unsigned(node); + assert_eq!(gossip.crds.insert(node, now), Ok(())); + } + } + (nodes, stakes, cluster_info) + } + + #[test] + fn test_cluster_nodes_retransmit() { + let mut rng = rand::thread_rng(); + let (nodes, stakes, cluster_info) = make_cluster(&mut rng); + let this_node = cluster_info.my_contact_info(); + // ClusterInfo::tvu_peers excludes the node itself. + assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); + let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); + // All nodes with contact-info should be in the index. + assert_eq!(cluster_nodes.index.len(), nodes.len()); + // Staked nodes with no contact-info should be included. + assert!(cluster_nodes.nodes.len() > nodes.len()); + // Assert that all nodes keep their contact-info. + // and, all staked nodes are also included. + { + let cluster_nodes: HashMap<_, _> = cluster_nodes + .nodes + .iter() + .map(|node| (node.pubkey(), node)) + .collect(); + for node in &nodes { + assert_eq!(cluster_nodes[&node.id].contact_info().unwrap().id, node.id); + } + for (pubkey, stake) in &stakes { + if *stake > 0 { + assert_eq!(cluster_nodes[pubkey].stake, *stake); + } + } + } + let (peers, stakes_and_index) = + sorted_retransmit_peers_and_stakes(&cluster_info, Some(&stakes)); + assert_eq!(stakes_and_index.len(), peers.len()); + assert_eq!(cluster_nodes.index.len(), peers.len()); + for (i, node) in cluster_nodes + .index + .iter() + .map(|(_, i)| &cluster_nodes.nodes[*i]) + .enumerate() + { + let (stake, index) = stakes_and_index[i]; + // Wallclock may be update by ClusterInfo::push_self. + if node.pubkey() == this_node.id { + assert_eq!(this_node.id, peers[index].id) + } else { + assert_eq!(node.contact_info().unwrap(), &peers[index]); + } + assert_eq!(node.stake.max(1), stake); + } + let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; + // Remove slot leader from peers indices. + let stakes_and_index: Vec<_> = stakes_and_index + .into_iter() + .filter(|(_stake, index)| peers[*index].id != slot_leader) + .collect(); + assert_eq!(peers.len(), stakes_and_index.len() + 1); + let mut shred_seed = [0u8; 32]; + rng.fill(&mut shred_seed[..]); + let (self_index, shuffled_peers_and_stakes) = + shuffle_peers_and_index(&this_node.id, &peers, &stakes_and_index, shred_seed); + let shuffled_index: Vec<_> = shuffled_peers_and_stakes + .into_iter() + .map(|(_, index)| index) + .collect(); + assert_eq!(this_node.id, peers[shuffled_index[self_index]].id); + for fanout in 1..200 { + let (neighbors_indices, children_indices) = + compute_retransmit_peers(fanout, self_index, &shuffled_index); + let (neighbors, children) = + cluster_nodes.get_retransmit_peers(shred_seed, fanout, Some(slot_leader)); + assert_eq!(children.len(), children_indices.len()); + for (node, index) in children.into_iter().zip(children_indices) { + assert_eq!(*node, peers[index]); + } + assert_eq!(neighbors.len(), neighbors_indices.len()); + assert_eq!(neighbors[0].id, peers[neighbors_indices[0]].id); + for (node, index) in neighbors.into_iter().zip(neighbors_indices).skip(1) { + assert_eq!(*node, peers[index]); + } + } + } + + #[test] + fn test_cluster_nodes_broadcast() { + let mut rng = rand::thread_rng(); + let (nodes, stakes, cluster_info) = make_cluster(&mut rng); + // ClusterInfo::tvu_peers excludes the node itself. + assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); + let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); + // All nodes with contact-info should be in the index. + // Excluding this node itself. + assert_eq!(cluster_nodes.index.len() + 1, nodes.len()); + // Staked nodes with no contact-info should be included. + assert!(cluster_nodes.nodes.len() > nodes.len()); + // Assert that all nodes keep their contact-info. + // and, all staked nodes are also included. + { + let cluster_nodes: HashMap<_, _> = cluster_nodes + .nodes + .iter() + .map(|node| (node.pubkey(), node)) + .collect(); + for node in &nodes { + assert_eq!(cluster_nodes[&node.id].contact_info().unwrap().id, node.id); + } + for (pubkey, stake) in &stakes { + if *stake > 0 { + assert_eq!(cluster_nodes[pubkey].stake, *stake); + } + } + } + let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes)); + assert_eq!(peers_and_stakes.len(), peers.len()); + assert_eq!(cluster_nodes.index.len(), peers.len()); + for (i, node) in cluster_nodes + .index + .iter() + .map(|(_, i)| &cluster_nodes.nodes[*i]) + .enumerate() + { + let (stake, index) = peers_and_stakes[i]; + assert_eq!(node.contact_info().unwrap(), &peers[index]); + assert_eq!(node.stake.max(1), stake); + } + for _ in 0..100 { + let mut shred_seed = [0u8; 32]; + rng.fill(&mut shred_seed[..]); + let index = weighted_best(&peers_and_stakes, shred_seed); + let peer = cluster_nodes.get_broadcast_peer(shred_seed).unwrap(); + assert_eq!(*peer, peers[index]); + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index a9a17bfddec9a1..b0a2055d42bf24 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -12,6 +12,7 @@ pub mod banking_stage; pub mod broadcast_stage; pub mod cache_block_meta_service; pub mod cluster_info_vote_listener; +pub mod cluster_nodes; pub mod cluster_slot_state_verifier; pub mod cluster_slots; pub mod cluster_slots_service; diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 1910fc5594e7d8..440228a42b39ee 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,6 +3,7 @@ use crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, + cluster_nodes::ClusterNodes, cluster_slots::ClusterSlots, cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, completed_data_sets_service::CompletedDataSetsSender, @@ -13,10 +14,7 @@ use crate::{ use crossbeam_channel::{Receiver, Sender}; use lru::LruCache; use solana_client::rpc_response::SlotUpdate; -use solana_gossip::{ - cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, - contact_info::ContactInfo, -}; +use solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}; use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; use solana_ledger::{ blockstore::{Blockstore, CompletedSlotsReceiver}, @@ -33,7 +31,6 @@ use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use solana_streamer::streamer::PacketReceiver; use std::{ - cmp, collections::hash_set::HashSet, collections::{BTreeMap, BTreeSet, HashMap}, net::UdpSocket, @@ -217,12 +214,6 @@ fn update_retransmit_stats( } } -#[derive(Default)] -struct EpochStakesCache { - peers: Vec, - stakes_and_index: Vec<(u64, usize)>, -} - use crate::packet_hasher::PacketHasher; // Map of shred (slot, index, is_data) => list of hash values seen for that key. pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; @@ -283,33 +274,6 @@ fn check_if_first_shred_received( } } -// Drops shred slot leader from retransmit peers. -// TODO: decide which bank should be used here. -fn get_retransmit_peers( - self_pubkey: Pubkey, - shred_slot: Slot, - leader_schedule_cache: &LeaderScheduleCache, - bank: &Bank, - stakes_cache: &EpochStakesCache, -) -> Vec<(u64 /*stakes*/, usize /*index*/)> { - match leader_schedule_cache.slot_leader_at(shred_slot, Some(bank)) { - None => { - error!("unknown leader for shred slot"); - stakes_cache.stakes_and_index.clone() - } - Some(pubkey) if pubkey == self_pubkey => { - error!("retransmit from slot leader: {}", pubkey); - stakes_cache.stakes_and_index.clone() - } - Some(pubkey) => stakes_cache - .stakes_and_index - .iter() - .filter(|(_, i)| stakes_cache.peers[*i].id != pubkey) - .copied() - .collect(), - } -} - #[allow(clippy::too_many_arguments)] fn retransmit( bank_forks: &RwLock, @@ -319,7 +283,7 @@ fn retransmit( sock: &UdpSocket, id: u32, stats: &RetransmitStats, - epoch_stakes_cache: &RwLock, + cluster_nodes: &RwLock>, last_peer_update: &AtomicU64, shreds_received: &Mutex, max_slots: &MaxSlots, @@ -357,20 +321,17 @@ fn retransmit( && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last { let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); - let (peers, stakes_and_index) = - cluster_info.sorted_retransmit_peers_and_stakes(epoch_staked_nodes.as_ref()); - { - let mut epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); - epoch_stakes_cache.peers = peers; - epoch_stakes_cache.stakes_and_index = stakes_and_index; - } + *cluster_nodes.write().unwrap() = ClusterNodes::::new( + cluster_info, + &epoch_staked_nodes.unwrap_or_default(), + ); { let mut sr = shreds_received.lock().unwrap(); sr.0.clear(); sr.1.reset(); } } - let r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + let cluster_nodes = cluster_nodes.read().unwrap(); let mut peers_len = 0; epoch_cache_update.stop(); @@ -411,52 +372,19 @@ fn retransmit( } let mut compute_turbine_peers = Measure::start("turbine_start"); - let stakes_and_index = get_retransmit_peers( - my_id, - shred_slot, - leader_schedule_cache, - r_bank.deref(), - r_epoch_stakes_cache.deref(), - ); - let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( - &my_id, - &r_epoch_stakes_cache.peers, - &stakes_and_index, - packet.meta.seed, - ); + let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(r_bank.deref())); + let (neighbors, children) = + cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); // If the node is on the critical path (i.e. the first node in each // neighborhood), then we expect that the packet arrives at tvu socket // as opposed to tvu-forwards. If this is not the case, then the // turbine broadcast/retransmit tree is mismatched across nodes. - let anchor_node = my_index % DATA_PLANE_FANOUT == 0; + let anchor_node = neighbors[0].id == my_id; if packet.meta.forward == anchor_node { // TODO: Consider forwarding the packet to the root node here. retransmit_tree_mismatch += 1; } - peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); - // split off the indexes, we don't need the stakes anymore - let indexes: Vec<_> = shuffled_stakes_and_index - .into_iter() - .map(|(_, index)| index) - .collect(); - debug_assert_eq!(my_id, r_epoch_stakes_cache.peers[indexes[my_index]].id); - - let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes); - let neighbors: Vec<_> = neighbors - .into_iter() - .filter_map(|index| { - let peer = &r_epoch_stakes_cache.peers[index]; - if peer.id == my_id { - None - } else { - Some(peer) - } - }) - .collect(); - let children: Vec<_> = children - .into_iter() - .map(|index| &r_epoch_stakes_cache.peers[index]) - .collect(); + peers_len = peers_len.max(cluster_nodes.num_peers()); compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); @@ -471,7 +399,13 @@ fn retransmit( // children and also tvu_forward socket of its neighbors. Otherwise it // should only forward to tvu_forward socket of its children. if anchor_node { - ClusterInfo::retransmit_to(&neighbors, packet, sock, /*forward socket=*/ true); + // First neighbor is this node itself, so skip it. + ClusterInfo::retransmit_to( + &neighbors[1..], + packet, + sock, + /*forward socket=*/ true, + ); } ClusterInfo::retransmit_to( &children, @@ -541,7 +475,7 @@ pub fn retransmitter( let r = r.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); - let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default())); + let cluster_nodes = Arc::default(); let last_peer_update = Arc::new(AtomicU64::new(0)); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); @@ -561,7 +495,7 @@ pub fn retransmitter( &sockets[s], s as u32, &stats, - &epoch_stakes_cache, + &cluster_nodes, &last_peer_update, &shreds_received, &max_slots, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 7a2060426d25e4..cf6eda727eb30a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1304,80 +1304,6 @@ impl ClusterInfo { || !ContactInfo::is_valid_address(&contact_info.tvu) } - fn sorted_stakes_with_index( - peers: &[ContactInfo], - stakes: Option<&HashMap>, - ) -> Vec<(u64, usize)> { - let stakes_and_index: Vec<_> = peers - .iter() - .enumerate() - .map(|(i, c)| { - // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is - // assumed to be missing entry. So let's make sure stake weights are atleast 1 - let stake = 1.max( - stakes - .as_ref() - .map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)), - ); - (stake, i) - }) - .sorted_by(|(l_stake, l_info), (r_stake, r_info)| { - if r_stake == l_stake { - peers[*r_info].id.cmp(&peers[*l_info].id) - } else { - r_stake.cmp(l_stake) - } - }) - .collect(); - - stakes_and_index - } - - fn stake_weighted_shuffle( - stakes_and_index: &[(u64, usize)], - seed: [u8; 32], - ) -> Vec<(u64, usize)> { - let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect(); - - let shuffle = weighted_shuffle(&stake_weights, seed); - - shuffle.iter().map(|x| stakes_and_index[*x]).collect() - } - - // Return sorted_retransmit_peers(including self) and their stakes - pub fn sorted_retransmit_peers_and_stakes( - &self, - stakes: Option<&HashMap>, - ) -> (Vec, Vec<(u64, usize)>) { - let mut peers = self.tvu_peers(); - // insert "self" into this list for the layer and neighborhood computation - peers.push(self.my_contact_info()); - let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes); - (peers, stakes_and_index) - } - - /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list - pub fn shuffle_peers_and_index( - id: &Pubkey, - peers: &[ContactInfo], - stakes_and_index: &[(u64, usize)], - seed: [u8; 32], - ) -> (usize, Vec<(u64, usize)>) { - let shuffled_stakes_and_index = ClusterInfo::stake_weighted_shuffle(stakes_and_index, seed); - let self_index = shuffled_stakes_and_index - .iter() - .enumerate() - .find_map(|(i, (_stake, index))| { - if peers[*index].id == *id { - Some(i) - } else { - None - } - }) - .unwrap(); - (self_index, shuffled_stakes_and_index) - } - /// compute broadcast table pub fn tpu_peers(&self) -> Vec { self.gossip @@ -3042,14 +2968,6 @@ pub fn push_messages_to_peer( Ok(()) } -pub fn stake_weight_peers( - peers: &mut Vec, - stakes: Option<&HashMap>, -) -> Vec<(u64, usize)> { - peers.dedup(); - ClusterInfo::sorted_stakes_with_index(peers, stakes) -} - // Filters out values from nodes with different shred-version. fn filter_on_shred_version( mut msg: Protocol, @@ -4033,15 +3951,6 @@ mod tests { assert_ne!(contact_info.shred_version, d.shred_version); cluster_info.insert_info(contact_info); stakes.insert(id4, 10); - - let mut peers = cluster_info.tvu_peers(); - let peers_and_stakes = stake_weight_peers(&mut peers, Some(&stakes)); - assert_eq!(peers.len(), 2); - assert_eq!(peers[0].id, id); - assert_eq!(peers[1].id, id2); - assert_eq!(peers_and_stakes.len(), 2); - assert_eq!(peers_and_stakes[0].0, 10); - assert_eq!(peers_and_stakes[1].0, 1); } #[test] diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 4485e8e1cc88f6..0143c0b3a670ce 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -105,7 +105,7 @@ impl ContactInfo { } /// New random ContactInfo for tests and simulations. - pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { + pub fn new_rand(rng: &mut R, pubkey: Option) -> Self { let delay = 10 * 60 * 1000; // 10 minutes let now = timestamp() - delay + rng.gen_range(0, 2 * delay); let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); diff --git a/gossip/src/deprecated.rs b/gossip/src/deprecated.rs index 57a7a8315cb237..120c69ffc1f53c 100644 --- a/gossip/src/deprecated.rs +++ b/gossip/src/deprecated.rs @@ -1,4 +1,11 @@ -use solana_sdk::clock::Slot; +use { + crate::{ + cluster_info::ClusterInfo, contact_info::ContactInfo, weighted_shuffle::weighted_shuffle, + }, + itertools::Itertools, + solana_sdk::{clock::Slot, pubkey::Pubkey}, + std::collections::HashMap, +}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample, AbiEnumVisitor)] enum CompressionType { @@ -19,3 +26,74 @@ pub(crate) struct EpochIncompleteSlots { compression: CompressionType, compressed_list: Vec, } + +// Legacy methods copied for testing backward compatibility. + +pub fn sorted_retransmit_peers_and_stakes( + cluster_info: &ClusterInfo, + stakes: Option<&HashMap>, +) -> (Vec, Vec<(u64, usize)>) { + let mut peers = cluster_info.tvu_peers(); + // insert "self" into this list for the layer and neighborhood computation + peers.push(cluster_info.my_contact_info()); + let stakes_and_index = sorted_stakes_with_index(&peers, stakes); + (peers, stakes_and_index) +} + +pub fn sorted_stakes_with_index( + peers: &[ContactInfo], + stakes: Option<&HashMap>, +) -> Vec<(u64, usize)> { + let stakes_and_index: Vec<_> = peers + .iter() + .enumerate() + .map(|(i, c)| { + // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is + // assumed to be missing entry. So let's make sure stake weights are atleast 1 + let stake = 1.max( + stakes + .as_ref() + .map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)), + ); + (stake, i) + }) + .sorted_by(|(l_stake, l_info), (r_stake, r_info)| { + if r_stake == l_stake { + peers[*r_info].id.cmp(&peers[*l_info].id) + } else { + r_stake.cmp(l_stake) + } + }) + .collect(); + + stakes_and_index +} + +pub fn shuffle_peers_and_index( + id: &Pubkey, + peers: &[ContactInfo], + stakes_and_index: &[(u64, usize)], + seed: [u8; 32], +) -> (usize, Vec<(u64, usize)>) { + let shuffled_stakes_and_index = stake_weighted_shuffle(stakes_and_index, seed); + let self_index = shuffled_stakes_and_index + .iter() + .enumerate() + .find_map(|(i, (_stake, index))| { + if peers[*index].id == *id { + Some(i) + } else { + None + } + }) + .unwrap(); + (self_index, shuffled_stakes_and_index) +} + +fn stake_weighted_shuffle(stakes_and_index: &[(u64, usize)], seed: [u8; 32]) -> Vec<(u64, usize)> { + let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect(); + + let shuffle = weighted_shuffle(&stake_weights, seed); + + shuffle.iter().map(|x| stakes_and_index[*x]).collect() +} diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 7562674a0cd861..e5b2d7ccce8e4e 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -13,7 +13,7 @@ pub mod crds_gossip_push; pub mod crds_shards; pub mod crds_value; pub mod data_budget; -mod deprecated; +pub mod deprecated; pub mod duplicate_shred; pub mod epoch_slots; pub mod gossip_error; diff --git a/gossip/tests/cluster_info.rs b/gossip/tests/cluster_info.rs index 921a842c321791..8d31a142f0f143 100644 --- a/gossip/tests/cluster_info.rs +++ b/gossip/tests/cluster_info.rs @@ -5,6 +5,7 @@ use { solana_gossip::{ cluster_info::{compute_retransmit_peers, ClusterInfo}, contact_info::ContactInfo, + deprecated::{shuffle_peers_and_index, sorted_retransmit_peers_and_stakes}, }, solana_sdk::pubkey::Pubkey, std::{ @@ -118,14 +119,13 @@ fn run_simulation(stakes: &[u64], fanout: usize) { .map(|i| { let mut seed = [0; 32]; seed[0..4].copy_from_slice(&i.to_le_bytes()); + // TODO: Ideally these should use the new methods in + // solana_core::cluster_nodes, however that would add build + // dependency on solana_core which is not desired. let (peers, stakes_and_index) = - cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes)); - let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index( - &cluster_info.id(), - &peers, - &stakes_and_index, - seed, - ); + sorted_retransmit_peers_and_stakes(&cluster_info, Some(&staked_nodes)); + let (_, shuffled_stakes_and_indexes) = + shuffle_peers_and_index(&cluster_info.id(), &peers, &stakes_and_index, seed); shuffled_stakes_and_indexes .into_iter() .map(|(_, i)| peers[i].clone())