Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Graph-based Replay Algorithm and Other Latency/Throughput Changes #27720

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use {
solana_ledger::{
blockstore::Blockstore,
blockstore_processor::process_entries_for_tests,
executor::{Replayer, ReplayerHandle},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
},
solana_perf::{packet::to_packet_batches, test_tx::test_tx},
solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks, cost_model::CostModel},
solana_sdk::{
genesis_config::GenesisConfig,
Expand Down Expand Up @@ -305,6 +307,7 @@ fn simulate_process_entries(
keypairs: &[Keypair],
initial_lamports: u64,
num_accounts: usize,
replayer_handle: &ReplayerHandle,
) {
let bank = Arc::new(Bank::new_for_benches(genesis_config));

Expand All @@ -331,7 +334,15 @@ fn simulate_process_entries(
hash: next_hash(&bank.last_blockhash(), 1, &tx_vector),
transactions: tx_vector,
};
process_entries_for_tests(&bank, vec![entry], randomize_txs, None, None).unwrap();
process_entries_for_tests(
&bank,
vec![entry],
randomize_txs,
None,
None,
replayer_handle,
)
.unwrap();
}

#[allow(clippy::same_item_push)]
Expand All @@ -349,6 +360,8 @@ fn bench_process_entries(randomize_txs: bool, bencher: &mut Bencher) {
mint_keypair,
..
} = create_genesis_config((num_accounts + 1) as u64 * initial_lamports);
let replayer = Replayer::new(get_thread_count());
let replayer_handle = replayer.handle();

let mut keypairs: Vec<Keypair> = vec![];
let tx_vector: Vec<VersionedTransaction> = Vec::with_capacity(num_accounts / 2);
Expand All @@ -367,6 +380,7 @@ fn bench_process_entries(randomize_txs: bool, bencher: &mut Bencher) {
&keypairs,
initial_lamports,
num_accounts,
&replayer_handle,
);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think I made a similar comment on the previous PR, but it'd be good if we could join the replayer threads here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg! will def do before cleaning up, would like to get high-level comments out of the way before going in and fixing/adding tests jjust to make sure on right path

Expand Down
1 change: 1 addition & 0 deletions core/src/progress_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl ReplaySlotStats {
i64
),
("replay_time", self.replay_elapsed as i64, i64),
("planning_elapsed", self.planning_elapsed as i64, i64),
("execute_batches_us", self.execute_batches_us as i64, i64),
(
"replay_total_elapsed",
Expand Down
85 changes: 59 additions & 26 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ use {
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
},
executor::{Replayer, ReplayerHandle},
leader_schedule_cache::LeaderScheduleCache,
leader_schedule_utils::first_of_consecutive_leader_slots,
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_info,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -467,6 +469,9 @@ impl ReplayStage {
let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks = LatestValidatorVotesForFrozenBanks::default();
let mut voted_signatures = Vec::new();
let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader;

let replayer = Replayer::new(get_thread_count());

let mut last_vote_refresh_time = LastVoteRefreshTime {
last_refresh_time: Instant::now(),
last_print_time: Instant::now(),
Expand Down Expand Up @@ -535,6 +540,7 @@ impl ReplayStage {
log_messages_bytes_limit,
replay_slots_concurrently,
&prioritization_fee_cache,
&replayer,
);
replay_active_banks_time.stop();

Expand Down Expand Up @@ -668,7 +674,7 @@ impl ReplayStage {
last_vote_refresh_time,
&voting_sender,
wait_to_vote_slot,
);
);
}
}

Expand Down Expand Up @@ -701,7 +707,7 @@ impl ReplayStage {
for r in heaviest_fork_failures {
if let HeaviestForkFailures::NoPropagatedConfirmation(slot) = r {
if let Some(latest_leader_slot) =
progress.get_latest_leader_slot_must_exist(slot)
progress.get_latest_leader_slot_must_exist(slot)
{
progress.log_propagated_stats(latest_leader_slot, &bank_forks);
}
Expand All @@ -714,7 +720,7 @@ impl ReplayStage {
// Vote on a fork
if let Some((ref vote_bank, ref switch_fork_decision)) = vote_bank {
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank))
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank))
{
Self::log_leader_change(
&my_pubkey,
Expand Down Expand Up @@ -1186,8 +1192,8 @@ impl ReplayStage {
*duplicate_slot,
);
true
// TODO: Send signal to repair to repair the correct version of
// `duplicate_slot` with hash == `correct_hash`
// TODO: Send signal to repair to repair the correct version of
// `duplicate_slot` with hash == `correct_hash`
} else {
warn!(
"PoH bank for slot {} is building on duplicate slot {}",
Expand Down Expand Up @@ -1710,6 +1716,7 @@ impl ReplayStage {
}
}

#[allow(clippy::too_many_arguments)]
fn replay_blockstore_into_bank(
bank: &Arc<Bank>,
blockstore: &Blockstore,
Expand All @@ -1720,27 +1727,37 @@ impl ReplayStage {
verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
replayer_handle: &ReplayerHandle,
) -> result::Result<usize, BlockstoreProcessorError> {
let mut w_replay_stats = replay_stats.write().unwrap();
let mut w_replay_progress = replay_progress.write().unwrap();
let tx_count_before = w_replay_progress.num_txs;
// All errors must lead to marking the slot as dead, otherwise,
// the `check_slot_agrees_with_cluster()` called by `replay_active_banks()`
// will break!
blockstore_processor::confirm_slot(
blockstore,
bank,
&mut w_replay_stats,
&mut w_replay_progress,
false,
transaction_status_sender,
Some(replay_vote_sender),
None,
verify_recyclers,
false,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;

let mut did_process_entries = true;

// more entries may have been received while replaying this slot.
// looping over this ensures that slots will be processed as fast as possible with the
// lowest latency.
while did_process_entries {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this give preference to the current bank over other banks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems ideal? you nede to finish bank before you can vote and work on next one

// All errors must lead to marking the slot as dead, otherwise,
// the `check_slot_agrees_with_cluster()` called by `replay_active_banks()`
// will break!
did_process_entries = blockstore_processor::confirm_slot(
blockstore,
bank,
&mut w_replay_stats,
&mut w_replay_progress,
false,
transaction_status_sender,
Some(replay_vote_sender),
None,
verify_recyclers,
false,
log_messages_bytes_limit,
prioritization_fee_cache,
replayer_handle,
)?;
}
let tx_count_after = w_replay_progress.num_txs;
let tx_count = tx_count_after - tx_count_before;
Ok(tx_count)
Expand Down Expand Up @@ -2091,7 +2108,7 @@ impl ReplayStage {
}
if my_latest_landed_vote >= last_voted_slot
|| heaviest_bank_on_same_fork
.is_hash_valid_for_age(&tower.last_vote_tx_blockhash(), MAX_PROCESSING_AGE)
.is_hash_valid_for_age(&tower.last_vote_tx_blockhash(), MAX_PROCESSING_AGE)
// In order to avoid voting on multiple forks all past MAX_PROCESSING_AGE that don't
// include the last voted blockhash
|| last_vote_refresh_time.last_refresh_time.elapsed().as_millis() < MAX_VOTE_REFRESH_INTERVAL_MILLIS as u128
Expand Down Expand Up @@ -2243,16 +2260,22 @@ impl ReplayStage {
log_messages_bytes_limit: Option<usize>,
active_bank_slots: &[Slot],
prioritization_fee_cache: &PrioritizationFeeCache,
replayer: &Replayer,
) -> Vec<ReplaySlotFromBlockstore> {
// Make mutable shared structures thread safe.
let progress = RwLock::new(progress);
let longest_replay_time_us = AtomicU64::new(0);

let bank_slots_replayers: Vec<_> = active_bank_slots
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: makes it seem like there are multiple replayers, but these are actually just different handles

.iter()
.map(|s| (s, replayer.handle()))
.collect();

// Allow for concurrent replaying of slots from different forks.
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = PAR_THREAD_POOL.install(|| {
active_bank_slots
bank_slots_replayers
.into_par_iter()
.map(|bank_slot| {
.map(|(bank_slot, replayer_handle)| {
let bank_slot = *bank_slot;
let mut replay_result = ReplaySlotFromBlockstore {
is_slot_dead: false,
Expand Down Expand Up @@ -2319,6 +2342,7 @@ impl ReplayStage {
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
&replayer_handle,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
Expand Down Expand Up @@ -2350,6 +2374,7 @@ impl ReplayStage {
log_messages_bytes_limit: Option<usize>,
bank_slot: Slot,
prioritization_fee_cache: &PrioritizationFeeCache,
replayer: &Replayer,
) -> ReplaySlotFromBlockstore {
let mut replay_result = ReplaySlotFromBlockstore {
is_slot_dead: false,
Expand Down Expand Up @@ -2389,6 +2414,7 @@ impl ReplayStage {
});

if bank.collector_id() != my_pubkey {
let replayer_handle = replayer.handle();
let mut replay_blockstore_time = Measure::start("replay_blockstore_into_bank");
let blockstore_result = Self::replay_blockstore_into_bank(
bank,
Expand All @@ -2400,6 +2426,7 @@ impl ReplayStage {
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
&replayer_handle,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
Expand Down Expand Up @@ -2621,6 +2648,7 @@ impl ReplayStage {
log_messages_bytes_limit: Option<usize>,
replay_slots_concurrently: bool,
prioritization_fee_cache: &PrioritizationFeeCache,
replayer: &Replayer,
) -> bool /* completed a bank */ {
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
let num_active_banks = active_bank_slots.len();
Expand All @@ -2644,6 +2672,7 @@ impl ReplayStage {
log_messages_bytes_limit,
&active_bank_slots,
prioritization_fee_cache,
replayer,
)
} else {
active_bank_slots
Expand All @@ -2662,6 +2691,7 @@ impl ReplayStage {
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
replayer,
)
})
.collect()
Expand Down Expand Up @@ -3119,7 +3149,7 @@ impl ReplayStage {
// newly achieved threshold, then there's no further
// information to propagate backwards to past leader blocks
(newly_voted_pubkeys.is_empty() && cluster_slot_pubkeys.is_empty() &&
!did_newly_reach_threshold)
!did_newly_reach_threshold)
{
break;
}
Expand Down Expand Up @@ -4153,6 +4183,8 @@ pub(crate) mod tests {
validator_keypairs,
..
} = vote_simulator;
let replayer = Replayer::new(get_thread_count());
let replayer_handle = replayer.handle();

let bank0 = bank_forks.read().unwrap().get(0).unwrap();
assert!(bank0.is_frozen());
Expand Down Expand Up @@ -4180,6 +4212,7 @@ pub(crate) mod tests {
&VerifyRecyclers::default(),
None,
&PrioritizationFeeCache::new(0u64),
&replayer_handle,
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
Expand Down
Loading