Skip to content

Commit

Permalink
add metrics to handle_snapshot_requests (solana-labs#17937)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Jun 14, 2021
1 parent d52632f commit d286ae2
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 72 deletions.
2 changes: 1 addition & 1 deletion core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ mod tests {
// set_root should send a snapshot request
bank_forks.set_root(bank.slot(), &request_sender, None);
bank.update_accounts_hash();
snapshot_request_handler.handle_snapshot_requests(false, false, false);
snapshot_request_handler.handle_snapshot_requests(false, false, false, 0);
}
}

Expand Down
163 changes: 92 additions & 71 deletions runtime/src/accounts_background_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ impl SnapshotRequestHandler {
accounts_db_caching_enabled: bool,
test_hash_calculation: bool,
use_index_hash_calculation: bool,
non_snapshot_time_us: u128,
) -> Option<u64> {
self.snapshot_request_receiver
.try_iter()
.last()
.map(|snapshot_request| {
let mut total_time = Measure::start("wallclock time elapsed");
let SnapshotRequest {
snapshot_root_bank,
status_cache_slot_deltas,
Expand Down Expand Up @@ -188,6 +190,7 @@ impl SnapshotRequestHandler {
let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
snapshot_utils::purge_old_snapshots(&self.snapshot_config.snapshot_path);
purge_old_snapshots_time.stop();
total_time.stop();

datapoint_info!(
"handle_snapshot_requests-timing",
Expand All @@ -205,6 +208,8 @@ impl SnapshotRequestHandler {
purge_old_snapshots_time.as_us(),
i64
),
("total_us", total_time.as_us(), i64),
("non_snapshot_time_us", non_snapshot_time_us, i64),
);
snapshot_root_bank.block_height()
})
Expand Down Expand Up @@ -251,6 +256,7 @@ impl AbsRequestHandler {
accounts_db_caching_enabled: bool,
test_hash_calculation: bool,
use_index_hash_calculation: bool,
non_snapshot_time_us: u128,
) -> Option<u64> {
self.snapshot_request_handler
.as_ref()
Expand All @@ -259,6 +265,7 @@ impl AbsRequestHandler {
accounts_db_caching_enabled,
test_hash_calculation,
use_index_hash_calculation,
non_snapshot_time_us,
)
})
}
Expand Down Expand Up @@ -297,87 +304,101 @@ impl AccountsBackgroundService {
let mut last_expiration_check_time = Instant::now();
let t_background = Builder::new()
.name("solana-bg-accounts".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
.spawn(move || {
let mut last_snapshot_end_time = None;
loop {
if exit.load(Ordering::Relaxed) {
break;
}

// Grab the current root bank
let bank = bank_forks.read().unwrap().root_bank().clone();
// Grab the current root bank
let bank = bank_forks.read().unwrap().root_bank().clone();

// Purge accounts of any dead slots
Self::remove_dead_slots(
&bank,
&request_handler,
&mut removed_slots_count,
&mut total_remove_slots_time,
);
// Purge accounts of any dead slots
Self::remove_dead_slots(
&bank,
&request_handler,
&mut removed_slots_count,
&mut total_remove_slots_time,
);

Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);

// Check to see if there were any requests for snapshotting banks
// < the current root bank `bank` above.

// Claim: Any snapshot request for slot `N` found here implies that the last cleanup
// slot `M` satisfies `M < N`
//
// Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
// but cleanup has already happened on some slot `M >= N`. Because the call to
// `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
// then that means in some *previous* iteration of this loop, we must have gotten a root
// bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
// snapshot request channel.
//
// However, this is impossible because BankForks.set_root() will always flush the snapshot
// request for `N` to the snapshot request channel before setting a root `R > N`, and
// snapshot_request_handler.handle_requests() will always look for the latest
// available snapshot in the channel.
let snapshot_block_height = request_handler.handle_snapshot_requests(
accounts_db_caching_enabled,
test_hash_calculation,
use_index_hash_calculation,
);
if accounts_db_caching_enabled {
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.flush_accounts_cache_if_needed();
}
Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);

let non_snapshot_time = last_snapshot_end_time
.map(|last_snapshot_end_time: Instant| {
last_snapshot_end_time.elapsed().as_micros()
})
.unwrap_or_default();

// Check to see if there were any requests for snapshotting banks
// < the current root bank `bank` above.

// Claim: Any snapshot request for slot `N` found here implies that the last cleanup
// slot `M` satisfies `M < N`
//
// Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
// but cleanup has already happened on some slot `M >= N`. Because the call to
// `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
// then that means in some *previous* iteration of this loop, we must have gotten a root
// bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
// snapshot request channel.
//
// However, this is impossible because BankForks.set_root() will always flush the snapshot
// request for `N` to the snapshot request channel before setting a root `R > N`, and
// snapshot_request_handler.handle_requests() will always look for the latest
// available snapshot in the channel.
let snapshot_block_height = request_handler.handle_snapshot_requests(
accounts_db_caching_enabled,
test_hash_calculation,
use_index_hash_calculation,
non_snapshot_time,
);
if snapshot_block_height.is_some() {
last_snapshot_end_time = Some(Instant::now());
}

if let Some(snapshot_block_height) = snapshot_block_height {
// Safe, see proof above
assert!(last_cleaned_block_height <= snapshot_block_height);
last_cleaned_block_height = snapshot_block_height;
} else {
if accounts_db_caching_enabled {
bank.shrink_candidate_slots();
} else {
// under sustained writes, shrink can lag behind so cap to
// SHRUNKEN_ACCOUNT_PER_INTERVAL (which is based on INTERVAL_MS,
// which in turn roughly associated block time)
consumed_budget = bank
.process_stale_slot_with_budget(
consumed_budget,
SHRUNKEN_ACCOUNT_PER_INTERVAL,
)
.min(SHRUNKEN_ACCOUNT_PER_INTERVAL);
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.flush_accounts_cache_if_needed();
}
if bank.block_height() - last_cleaned_block_height
> (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
{

if let Some(snapshot_block_height) = snapshot_block_height {
// Safe, see proof above
assert!(last_cleaned_block_height <= snapshot_block_height);
last_cleaned_block_height = snapshot_block_height;
} else {
if accounts_db_caching_enabled {
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.force_flush_accounts_cache();
bank.shrink_candidate_slots();
} else {
// under sustained writes, shrink can lag behind so cap to
// SHRUNKEN_ACCOUNT_PER_INTERVAL (which is based on INTERVAL_MS,
// which in turn roughly associated block time)
consumed_budget = bank
.process_stale_slot_with_budget(
consumed_budget,
SHRUNKEN_ACCOUNT_PER_INTERVAL,
)
.min(SHRUNKEN_ACCOUNT_PER_INTERVAL);
}
if bank.block_height() - last_cleaned_block_height
> (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
{
if accounts_db_caching_enabled {
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.force_flush_accounts_cache();
}
bank.clean_accounts(true, false);
last_cleaned_block_height = bank.block_height();
}
bank.clean_accounts(true, false);
last_cleaned_block_height = bank.block_height();
}
sleep(Duration::from_millis(INTERVAL_MS));
}
sleep(Duration::from_millis(INTERVAL_MS));
})
.unwrap();
Self { t_background }
Expand Down

0 comments on commit d286ae2

Please sign in to comment.