Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Write helper for multithread update (#18808)
Browse files Browse the repository at this point in the history
(cherry picked from commit 84e7831)

# Conflicts:
#	core/benches/cluster_info.rs
#	core/src/broadcast_stage.rs
#	core/src/broadcast_stage/broadcast_duplicates_run.rs
#	core/src/broadcast_stage/standard_broadcast_run.rs
  • Loading branch information
sakridge authored and mergify-bot committed Aug 18, 2021
1 parent 5042808 commit bfd42a7
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 105 deletions.
19 changes: 18 additions & 1 deletion core/benches/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,30 @@ use solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
};
<<<<<<< HEAD
use solana_ledger::shred::Shred;
use solana_sdk::{pubkey, signature::Keypair, timing::timestamp};
=======
use solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::Shred,
};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{
pubkey,
signature::Keypair,
timing::{timestamp, AtomicInterval},
};
>>>>>>> 84e78316b (Write helper for multithread update (#18808))
use solana_streamer::socket::SocketAddrSpace;
use std::{
collections::HashMap,
net::UdpSocket,
<<<<<<< HEAD
sync::{atomic::AtomicU64, Arc},
=======
sync::{Arc, RwLock},
>>>>>>> 84e78316b (Write helper for multithread update (#18808))
};
use test::Bencher;

Expand Down Expand Up @@ -46,7 +63,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
let cluster_info = Arc::new(cluster_info);
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
let shreds = Arc::new(shreds);
let last_datapoint = Arc::new(AtomicU64::new(0));
let last_datapoint = Arc::new(AtomicInterval::default());
bencher.iter(move || {
let shreds = shreds.clone();
broadcast_shreds(
Expand Down
18 changes: 3 additions & 15 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use solana_sdk::{
pubkey::Pubkey,
short_vec::decode_shortu16_len,
signature::Signature,
timing::{duration_as_ms, timestamp},
timing::{duration_as_ms, timestamp, AtomicInterval},
transaction::{self, Transaction, TransactionError},
};
use solana_transaction_status::token_balances::{
Expand Down Expand Up @@ -78,7 +78,7 @@ const DEFAULT_LRU_SIZE: usize = 200_000;

#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicU64,
last_report: AtomicInterval,
id: u32,
process_packets_count: AtomicUsize,
new_tx_count: AtomicUsize,
Expand Down Expand Up @@ -107,19 +107,7 @@ impl BankingStageStats {
}

fn report(&self, report_interval_ms: u64) {
let should_report = {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
};

if should_report {
if self.last_report.should_update(report_interval_ms) {
datapoint_info!(
"banking_stage-loop-stats",
("id", self.id as i64, i64),
Expand Down
21 changes: 13 additions & 8 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ use solana_ledger::{blockstore::Blockstore, shred::Shred};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use solana_poh::poh_recorder::WorkingBankEntry;
<<<<<<< HEAD
use solana_runtime::bank::Bank;
use solana_sdk::timing::timestamp;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use solana_streamer::{sendmmsg::send_mmsg, socket::SocketAddrSpace};
use std::sync::atomic::AtomicU64;
=======
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::timing::{timestamp, AtomicInterval};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
use solana_streamer::{
sendmmsg::{batch_send, SendPktsError},
socket::SocketAddrSpace,
};
>>>>>>> 84e78316b (Write helper for multithread update (#18808))
use std::{
collections::HashMap,
net::UdpSocket,
Expand Down Expand Up @@ -362,14 +372,9 @@ impl BroadcastStage {
fn update_peer_stats(
num_live_peers: i64,
broadcast_len: i64,
last_datapoint_submit: &Arc<AtomicU64>,
last_datapoint_submit: &Arc<AtomicInterval>,
) {
let now = timestamp();
let last = last_datapoint_submit.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 1000
&& last_datapoint_submit.compare_and_swap(last, now, Ordering::Relaxed) == last
{
if last_datapoint_submit.should_update(1000) {
datapoint_info!(
"cluster_info-num_nodes",
("live_count", num_live_peers, i64),
Expand All @@ -384,7 +389,7 @@ pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
cluster_nodes: &ClusterNodes<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicU64>,
last_datapoint_submit: &Arc<AtomicInterval>,
transmit_stats: &mut TransmitShredsStats,
socket_addr_space: &SocketAddrSpace,
) -> Result<()> {
Expand Down
18 changes: 18 additions & 0 deletions core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
};

let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
<<<<<<< HEAD
let stakes = stakes.unwrap();
let socket_addr_space = cluster_info.socket_addr_space();
for peer in cluster_info.tvu_peers() {
Expand All @@ -323,6 +324,23 @@ impl BroadcastRun for BroadcastDuplicatesRun {
}
}
}
=======
// Broadcast data
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(
cluster_info,
stakes.as_deref().unwrap_or(&HashMap::default()),
);
broadcast_shreds(
sock,
&shreds,
&cluster_nodes,
&Arc::new(AtomicInterval::default()),
&mut TransmitShredsStats::default(),
cluster_info.id(),
bank_forks,
cluster_info.socket_addr_space(),
)?;
>>>>>>> 84e78316b (Write helper for multithread update (#18808))

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock,
&shreds,
&cluster_nodes,
&Arc::new(AtomicU64::new(0)),
&Arc::new(AtomicInterval::default()),
&mut TransmitShredsStats::default(),
cluster_info.socket_addr_space(),
)?;
Expand Down
26 changes: 15 additions & 11 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,17 @@ use solana_ledger::{
SHRED_TICK_REFERENCE_MASK,
},
};
<<<<<<< HEAD
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration};
=======
use solana_sdk::{
pubkey::Pubkey,
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
};
use std::{collections::HashMap, sync::RwLock, time::Duration};
>>>>>>> 84e78316b (Write helper for multithread update (#18808))

#[derive(Clone)]
pub struct StandardBroadcastRun {
Expand All @@ -25,10 +34,10 @@ pub struct StandardBroadcastRun {
slot_broadcast_start: Option<Instant>,
keypair: Arc<Keypair>,
shred_version: u16,
last_datapoint_submit: Arc<AtomicU64>,
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>,
last_peer_update: Arc<AtomicU64>,
last_peer_update: Arc<AtomicInterval>,
}

impl StandardBroadcastRun {
Expand All @@ -45,7 +54,7 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes: Arc::default(),
last_peer_update: Arc::default(),
last_peer_update: Arc::new(AtomicInterval::default()),
}
}

Expand Down Expand Up @@ -339,14 +348,9 @@ impl StandardBroadcastRun {
trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to
let mut get_peers_time = Measure::start("broadcast::get_peers");
let now = timestamp();
let last = self.last_peer_update.load(Ordering::Relaxed);
#[allow(deprecated)]
if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS
&& self
.last_peer_update
.compare_and_swap(last, now, Ordering::Relaxed)
== last
if self
.last_peer_update
.should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false)
{
*self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new(
cluster_info,
Expand Down
28 changes: 12 additions & 16 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ use solana_rpc::{
rpc_subscriptions::RpcSubscriptions,
};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use solana_sdk::{
clock::Slot,
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::{timestamp, AtomicInterval},
};
use solana_streamer::streamer::PacketReceiver;
use std::{
collections::hash_set::HashSet,
Expand Down Expand Up @@ -61,7 +66,7 @@ struct RetransmitStats {
repair_total: AtomicU64,
discard_total: AtomicU64,
retransmit_total: AtomicU64,
last_ts: AtomicU64,
last_ts: AtomicInterval,
compute_turbine_peers_total: AtomicU64,
retransmit_tree_mismatch: AtomicU64,
packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
Expand Down Expand Up @@ -121,12 +126,7 @@ fn update_retransmit_stats(
}
}

let now = timestamp();
let last = stats.last_ts.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 2000
&& stats.last_ts.compare_and_swap(last, now, Ordering::Relaxed) == last
{
if stats.last_ts.should_update(2000) {
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
datapoint_info!(
"retransmit-stage",
Expand Down Expand Up @@ -284,7 +284,7 @@ fn retransmit(
id: u32,
stats: &RetransmitStats,
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
last_peer_update: &AtomicU64,
last_peer_update: &AtomicInterval,
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
Expand Down Expand Up @@ -314,12 +314,7 @@ fn retransmit(
epoch_fetch.stop();

let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
let now = timestamp();
let last = last_peer_update.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 1000
&& last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
{
if last_peer_update.should_update_ext(1000, false) {
let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch);
*cluster_nodes.write().unwrap() = ClusterNodes::<RetransmitStage>::new(
cluster_info,
Expand Down Expand Up @@ -478,7 +473,7 @@ pub fn retransmitter(
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let cluster_nodes = Arc::default();
let last_peer_update = Arc::new(AtomicU64::new(0));
let last_peer_update = Arc::new(AtomicInterval::default());
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
let first_shreds_received = first_shreds_received.clone();
Expand Down Expand Up @@ -671,6 +666,7 @@ mod tests {
let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap();
// need to make sure tvu and tpu are valid addresses
me.tvu_forwards = me_retransmit.local_addr().unwrap();

let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap();
me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port))
.unwrap()
Expand Down
47 changes: 7 additions & 40 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use solana_sdk::{
genesis_config::ClusterType,
hash::{Hash, Hasher},
pubkey::Pubkey,
timing::AtomicInterval,
};
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::{
Expand Down Expand Up @@ -960,7 +961,7 @@ struct AccountsStats {
delta_hash_accumulate_time_total_us: AtomicU64,
delta_hash_num: AtomicU64,

last_store_report: AtomicU64,
last_store_report: AtomicInterval,
store_hash_accounts: AtomicU64,
calc_stored_meta: AtomicU64,
store_accounts: AtomicU64,
Expand All @@ -980,7 +981,7 @@ struct AccountsStats {

#[derive(Debug, Default)]
struct PurgeStats {
last_report: AtomicU64,
last_report: AtomicInterval,
safety_checks_elapsed: AtomicU64,
remove_cache_elapsed: AtomicU64,
remove_storage_entries_elapsed: AtomicU64,
Expand All @@ -999,18 +1000,7 @@ struct PurgeStats {
impl PurgeStats {
fn report(&self, metric_name: &'static str, report_interval_ms: Option<u64>) {
let should_report = report_interval_ms
.map(|report_interval_ms| {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
&& last != 0
})
.map(|report_interval_ms| self.last_report.should_update(report_interval_ms))
.unwrap_or(true);

if should_report {
Expand Down Expand Up @@ -1184,7 +1174,7 @@ impl CleanAccountsStats {

#[derive(Debug, Default)]
struct ShrinkStats {
last_report: AtomicU64,
last_report: AtomicInterval,
num_slots_shrunk: AtomicUsize,
storage_read_elapsed: AtomicU64,
index_read_elapsed: AtomicU64,
Expand All @@ -1205,20 +1195,7 @@ struct ShrinkStats {

impl ShrinkStats {
fn report(&self) {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();

// last is initialized to 0 by ::default()
// thus, the first 'report' call would always log.
// Instead, the first call now initialializes 'last_report' to now.
let is_first_call = last == 0;
let should_report = now.saturating_sub(last) > 1000
&& self
.last_report
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
== Ok(last);

if !is_first_call && should_report {
if self.last_report.should_update(1000) {
datapoint_info!(
"shrink_stats",
(
Expand Down Expand Up @@ -5494,17 +5471,7 @@ impl AccountsDb {
}

fn report_store_timings(&self) {
let last = self.stats.last_store_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();

if now.saturating_sub(last) > 1000
&& self.stats.last_store_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
{
if self.stats.last_store_report.should_update(1000) {
let (read_only_cache_hits, read_only_cache_misses) =
self.read_only_accounts_cache.get_and_reset_stats();
datapoint_info!(
Expand Down
Loading

0 comments on commit bfd42a7

Please sign in to comment.