Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Write helper for multithread update #18808

Merged
merged 1 commit into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/benches/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ use solana_ledger::{
shred::Shred,
};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{pubkey, signature::Keypair, timing::timestamp};
use solana_sdk::{
pubkey,
signature::Keypair,
timing::{timestamp, AtomicInterval},
};
use solana_streamer::socket::SocketAddrSpace;
use std::{
collections::HashMap,
net::UdpSocket,
sync::{atomic::AtomicU64, Arc, RwLock},
sync::{Arc, RwLock},
};
use test::Bencher;

Expand Down Expand Up @@ -54,7 +58,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
let cluster_info = Arc::new(cluster_info);
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
let shreds = Arc::new(shreds);
let last_datapoint = Arc::new(AtomicU64::new(0));
let last_datapoint = Arc::new(AtomicInterval::default());
bencher.iter(move || {
let shreds = shreds.clone();
broadcast_shreds(
Expand Down
18 changes: 3 additions & 15 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use solana_sdk::{
sanitized_transaction::SanitizedTransaction,
short_vec::decode_shortu16_len,
signature::Signature,
timing::{duration_as_ms, timestamp},
timing::{duration_as_ms, timestamp, AtomicInterval},
transaction::{self, Transaction, TransactionError},
};
use solana_transaction_status::token_balances::{
Expand Down Expand Up @@ -79,7 +79,7 @@ const DEFAULT_LRU_SIZE: usize = 200_000;

#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicU64,
last_report: AtomicInterval,
id: u32,
process_packets_count: AtomicUsize,
new_tx_count: AtomicUsize,
Expand Down Expand Up @@ -115,19 +115,7 @@ impl BankingStageStats {
}

fn report(&self, report_interval_ms: u64) {
let should_report = {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
};

if should_report {
if self.last_report.should_update(report_interval_ms) {
datapoint_info!(
"banking_stage-loop-stats",
("id", self.id as i64, i64),
Expand Down
14 changes: 4 additions & 10 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use solana_poh::poh_recorder::WorkingBankEntry;
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::timing::timestamp;
use solana_sdk::timing::{timestamp, AtomicInterval};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
use solana_streamer::{
sendmmsg::{batch_send, SendPktsError},
socket::SocketAddrSpace,
};
use std::sync::atomic::AtomicU64;
use std::{
collections::HashMap,
net::UdpSocket,
Expand Down Expand Up @@ -378,14 +377,9 @@ impl BroadcastStage {
fn update_peer_stats(
num_live_peers: i64,
broadcast_len: i64,
last_datapoint_submit: &Arc<AtomicU64>,
last_datapoint_submit: &Arc<AtomicInterval>,
) {
let now = timestamp();
let last = last_datapoint_submit.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 1000
&& last_datapoint_submit.compare_and_swap(last, now, Ordering::Relaxed) == last
{
if last_datapoint_submit.should_update(1000) {
datapoint_info!(
"cluster_info-num_nodes",
("live_count", num_live_peers, i64),
Expand All @@ -400,7 +394,7 @@ pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
cluster_nodes: &ClusterNodes<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicU64>,
last_datapoint_submit: &Arc<AtomicInterval>,
transmit_stats: &mut TransmitShredsStats,
self_pubkey: Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
sock,
&shreds,
&cluster_nodes,
&Arc::new(AtomicU64::new(0)),
&Arc::new(AtomicInterval::default()),
&mut TransmitShredsStats::default(),
cluster_info.id(),
bank_forks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock,
&shreds,
&cluster_nodes,
&Arc::new(AtomicU64::new(0)),
&Arc::new(AtomicInterval::default()),
&mut TransmitShredsStats::default(),
cluster_info.id(),
bank_forks,
Expand Down
23 changes: 11 additions & 12 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use solana_entry::entry::Entry;
use solana_ledger::shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_TICK_REFERENCE_MASK,
};
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
use solana_sdk::{
pubkey::Pubkey,
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
};
use std::{collections::HashMap, sync::RwLock, time::Duration};

#[derive(Clone)]
Expand All @@ -21,10 +25,10 @@ pub struct StandardBroadcastRun {
current_slot_and_parent: Option<(u64, u64)>,
slot_broadcast_start: Option<Instant>,
shred_version: u16,
last_datapoint_submit: Arc<AtomicU64>,
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>,
last_peer_update: Arc<AtomicU64>,
last_peer_update: Arc<AtomicInterval>,
}

impl StandardBroadcastRun {
Expand All @@ -40,7 +44,7 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes: Arc::default(),
last_peer_update: Arc::default(),
last_peer_update: Arc::new(AtomicInterval::default()),
}
}

Expand Down Expand Up @@ -338,14 +342,9 @@ impl StandardBroadcastRun {
trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to
let mut get_peers_time = Measure::start("broadcast::get_peers");
let now = timestamp();
let last = self.last_peer_update.load(Ordering::Relaxed);
#[allow(deprecated)]
if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS
&& self
.last_peer_update
.compare_and_swap(last, now, Ordering::Relaxed)
== last
if self
.last_peer_update
.should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false)
{
*self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new(
cluster_info,
Expand Down
28 changes: 12 additions & 16 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ use solana_metrics::inc_new_counter_error;
use solana_perf::packet::{Packet, Packets};
use solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use solana_sdk::{
clock::Slot,
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::{timestamp, AtomicInterval},
};
use solana_streamer::streamer::PacketReceiver;
use std::{
collections::hash_set::HashSet,
Expand Down Expand Up @@ -55,7 +60,7 @@ struct RetransmitStats {
repair_total: AtomicU64,
discard_total: AtomicU64,
retransmit_total: AtomicU64,
last_ts: AtomicU64,
last_ts: AtomicInterval,
compute_turbine_peers_total: AtomicU64,
retransmit_tree_mismatch: AtomicU64,
packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
Expand Down Expand Up @@ -115,12 +120,7 @@ fn update_retransmit_stats(
}
}

let now = timestamp();
let last = stats.last_ts.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 2000
&& stats.last_ts.compare_and_swap(last, now, Ordering::Relaxed) == last
{
if stats.last_ts.should_update(2000) {
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
datapoint_info!(
"retransmit-stage",
Expand Down Expand Up @@ -278,7 +278,7 @@ fn retransmit(
id: u32,
stats: &RetransmitStats,
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
last_peer_update: &AtomicU64,
last_peer_update: &AtomicInterval,
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
Expand Down Expand Up @@ -308,12 +308,7 @@ fn retransmit(
epoch_fetch.stop();

let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
let now = timestamp();
let last = last_peer_update.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 1000
&& last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
{
if last_peer_update.should_update_ext(1000, false) {
let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch);
*cluster_nodes.write().unwrap() = ClusterNodes::<RetransmitStage>::new(
cluster_info,
Expand Down Expand Up @@ -473,7 +468,7 @@ pub fn retransmitter(
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let cluster_nodes = Arc::default();
let last_peer_update = Arc::new(AtomicU64::new(0));
let last_peer_update = Arc::new(AtomicInterval::default());
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
let first_shreds_received = first_shreds_received.clone();
Expand Down Expand Up @@ -658,6 +653,7 @@ mod tests {
let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap();
// need to make sure tvu and tpu are valid addresses
me.tvu_forwards = me_retransmit.local_addr().unwrap();

let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap();
me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port))
.unwrap()
Expand Down
47 changes: 7 additions & 40 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use solana_sdk::{
genesis_config::ClusterType,
hash::{Hash, Hasher},
pubkey::Pubkey,
timing::AtomicInterval,
};
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::{
Expand Down Expand Up @@ -977,7 +978,7 @@ struct AccountsStats {
delta_hash_accumulate_time_total_us: AtomicU64,
delta_hash_num: AtomicU64,

last_store_report: AtomicU64,
last_store_report: AtomicInterval,
store_hash_accounts: AtomicU64,
calc_stored_meta: AtomicU64,
store_accounts: AtomicU64,
Expand All @@ -997,7 +998,7 @@ struct AccountsStats {

#[derive(Debug, Default)]
struct PurgeStats {
last_report: AtomicU64,
last_report: AtomicInterval,
safety_checks_elapsed: AtomicU64,
remove_cache_elapsed: AtomicU64,
remove_storage_entries_elapsed: AtomicU64,
Expand All @@ -1016,18 +1017,7 @@ struct PurgeStats {
impl PurgeStats {
fn report(&self, metric_name: &'static str, report_interval_ms: Option<u64>) {
let should_report = report_interval_ms
.map(|report_interval_ms| {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
&& last != 0
})
.map(|report_interval_ms| self.last_report.should_update(report_interval_ms))
.unwrap_or(true);

if should_report {
Expand Down Expand Up @@ -1201,7 +1191,7 @@ impl CleanAccountsStats {

#[derive(Debug, Default)]
struct ShrinkStats {
last_report: AtomicU64,
last_report: AtomicInterval,
num_slots_shrunk: AtomicUsize,
storage_read_elapsed: AtomicU64,
index_read_elapsed: AtomicU64,
Expand All @@ -1222,20 +1212,7 @@ struct ShrinkStats {

impl ShrinkStats {
fn report(&self) {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();

// last is initialized to 0 by ::default()
// thus, the first 'report' call would always log.
// Instead, the first call now initialializes 'last_report' to now.
let is_first_call = last == 0;
let should_report = now.saturating_sub(last) > 1000
&& self
.last_report
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
== Ok(last);

if !is_first_call && should_report {
if self.last_report.should_update(1000) {
datapoint_info!(
"shrink_stats",
(
Expand Down Expand Up @@ -5512,17 +5489,7 @@ impl AccountsDb {
}

fn report_store_timings(&self) {
let last = self.stats.last_store_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();

if now.saturating_sub(last) > 1000
&& self.stats.last_store_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
{
if self.stats.last_store_report.should_update(1000) {
let (read_only_cache_hits, read_only_cache_misses) =
self.read_only_accounts_cache.get_and_reset_stats();
datapoint_info!(
Expand Down
16 changes: 3 additions & 13 deletions runtime/src/secondary_index.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::{pubkey::Pubkey, timing::AtomicInterval};
use std::{
collections::HashSet,
fmt::Debug,
Expand All @@ -26,7 +26,7 @@ pub trait SecondaryIndexEntry: Debug {

#[derive(Debug, Default)]
pub struct SecondaryIndexStats {
last_report: AtomicU64,
last_report: AtomicInterval,
num_inner_keys: AtomicU64,
}

Expand Down Expand Up @@ -142,17 +142,7 @@ impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
}
}

let now = solana_sdk::timing::timestamp();
let last = self.stats.last_report.load(Ordering::Relaxed);
let should_report = now.saturating_sub(last) > 1000
&& self.stats.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last);

if should_report {
if self.stats.last_report.should_update(1000) {
datapoint_info!(
self.metrics_name,
("num_secondary_keys", self.index.len() as i64, i64),
Expand Down
Loading