This repository has been archived by the owner on Jan 22, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
Graph-based Replay Algorithm and Other Latency/Throughput Changes #27720
Closed
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
b619a08
start to add replay logic
buffalu f2e6a12
more
buffalu 064e301
add comment
buffalu 6a557a2
tests compiling
buffalu 52b82ae
more cleanup
buffalu 3fef5ee
more cleanup
buffalu 6494570
add tests for dependency graph
buffalu 4bc8b55
fix ci
buffalu 2fd4d03
fix ci
buffalu 68bb9d2
ci
buffalu 5209b28
ci
buffalu e7fdf6d
fix bug
buffalu 19f909d
need to fix more tests
buffalu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,13 +40,15 @@ use { | |
blockstore_processor::{ | ||
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender, | ||
}, | ||
executor::{Replayer, ReplayerHandle}, | ||
leader_schedule_cache::LeaderScheduleCache, | ||
leader_schedule_utils::first_of_consecutive_leader_slots, | ||
}, | ||
solana_measure::measure::Measure, | ||
solana_metrics::inc_new_counter_info, | ||
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, | ||
solana_program_runtime::timings::ExecuteTimings, | ||
solana_rayon_threadlimit::get_thread_count, | ||
solana_rpc::{ | ||
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, | ||
rpc_subscriptions::RpcSubscriptions, | ||
|
@@ -467,6 +469,9 @@ impl ReplayStage { | |
let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks = LatestValidatorVotesForFrozenBanks::default(); | ||
let mut voted_signatures = Vec::new(); | ||
let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader; | ||
|
||
let replayer = Replayer::new(get_thread_count()); | ||
|
||
let mut last_vote_refresh_time = LastVoteRefreshTime { | ||
last_refresh_time: Instant::now(), | ||
last_print_time: Instant::now(), | ||
|
@@ -535,6 +540,7 @@ impl ReplayStage { | |
log_messages_bytes_limit, | ||
replay_slots_concurrently, | ||
&prioritization_fee_cache, | ||
&replayer, | ||
); | ||
replay_active_banks_time.stop(); | ||
|
||
|
@@ -668,7 +674,7 @@ impl ReplayStage { | |
last_vote_refresh_time, | ||
&voting_sender, | ||
wait_to_vote_slot, | ||
); | ||
); | ||
} | ||
} | ||
|
||
|
@@ -701,7 +707,7 @@ impl ReplayStage { | |
for r in heaviest_fork_failures { | ||
if let HeaviestForkFailures::NoPropagatedConfirmation(slot) = r { | ||
if let Some(latest_leader_slot) = | ||
progress.get_latest_leader_slot_must_exist(slot) | ||
progress.get_latest_leader_slot_must_exist(slot) | ||
{ | ||
progress.log_propagated_stats(latest_leader_slot, &bank_forks); | ||
} | ||
|
@@ -714,7 +720,7 @@ impl ReplayStage { | |
// Vote on a fork | ||
if let Some((ref vote_bank, ref switch_fork_decision)) = vote_bank { | ||
if let Some(votable_leader) = | ||
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank)) | ||
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank)) | ||
{ | ||
Self::log_leader_change( | ||
&my_pubkey, | ||
|
@@ -1186,8 +1192,8 @@ impl ReplayStage { | |
*duplicate_slot, | ||
); | ||
true | ||
// TODO: Send signal to repair to repair the correct version of | ||
// `duplicate_slot` with hash == `correct_hash` | ||
// TODO: Send signal to repair to repair the correct version of | ||
// `duplicate_slot` with hash == `correct_hash` | ||
} else { | ||
warn!( | ||
"PoH bank for slot {} is building on duplicate slot {}", | ||
|
@@ -1710,6 +1716,7 @@ impl ReplayStage { | |
} | ||
} | ||
|
||
#[allow(clippy::too_many_arguments)] | ||
fn replay_blockstore_into_bank( | ||
bank: &Arc<Bank>, | ||
blockstore: &Blockstore, | ||
|
@@ -1720,27 +1727,37 @@ impl ReplayStage { | |
verify_recyclers: &VerifyRecyclers, | ||
log_messages_bytes_limit: Option<usize>, | ||
prioritization_fee_cache: &PrioritizationFeeCache, | ||
replayer_handle: &ReplayerHandle, | ||
) -> result::Result<usize, BlockstoreProcessorError> { | ||
let mut w_replay_stats = replay_stats.write().unwrap(); | ||
let mut w_replay_progress = replay_progress.write().unwrap(); | ||
let tx_count_before = w_replay_progress.num_txs; | ||
// All errors must lead to marking the slot as dead, otherwise, | ||
// the `check_slot_agrees_with_cluster()` called by `replay_active_banks()` | ||
// will break! | ||
blockstore_processor::confirm_slot( | ||
blockstore, | ||
bank, | ||
&mut w_replay_stats, | ||
&mut w_replay_progress, | ||
false, | ||
transaction_status_sender, | ||
Some(replay_vote_sender), | ||
None, | ||
verify_recyclers, | ||
false, | ||
log_messages_bytes_limit, | ||
prioritization_fee_cache, | ||
)?; | ||
|
||
let mut did_process_entries = true; | ||
|
||
// more entries may have been received while replaying this slot. | ||
// looping over this ensures that slots will be processed as fast as possible with the | ||
// lowest latency. | ||
while did_process_entries { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't this give preference to the current bank over other banks? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems ideal? you nede to finish bank before you can vote and work on next one |
||
// All errors must lead to marking the slot as dead, otherwise, | ||
// the `check_slot_agrees_with_cluster()` called by `replay_active_banks()` | ||
// will break! | ||
did_process_entries = blockstore_processor::confirm_slot( | ||
blockstore, | ||
bank, | ||
&mut w_replay_stats, | ||
&mut w_replay_progress, | ||
false, | ||
transaction_status_sender, | ||
Some(replay_vote_sender), | ||
None, | ||
verify_recyclers, | ||
false, | ||
log_messages_bytes_limit, | ||
prioritization_fee_cache, | ||
replayer_handle, | ||
)?; | ||
} | ||
let tx_count_after = w_replay_progress.num_txs; | ||
let tx_count = tx_count_after - tx_count_before; | ||
Ok(tx_count) | ||
|
@@ -2091,7 +2108,7 @@ impl ReplayStage { | |
} | ||
if my_latest_landed_vote >= last_voted_slot | ||
|| heaviest_bank_on_same_fork | ||
.is_hash_valid_for_age(&tower.last_vote_tx_blockhash(), MAX_PROCESSING_AGE) | ||
.is_hash_valid_for_age(&tower.last_vote_tx_blockhash(), MAX_PROCESSING_AGE) | ||
// In order to avoid voting on multiple forks all past MAX_PROCESSING_AGE that don't | ||
// include the last voted blockhash | ||
|| last_vote_refresh_time.last_refresh_time.elapsed().as_millis() < MAX_VOTE_REFRESH_INTERVAL_MILLIS as u128 | ||
|
@@ -2243,16 +2260,22 @@ impl ReplayStage { | |
log_messages_bytes_limit: Option<usize>, | ||
active_bank_slots: &[Slot], | ||
prioritization_fee_cache: &PrioritizationFeeCache, | ||
replayer: &Replayer, | ||
) -> Vec<ReplaySlotFromBlockstore> { | ||
// Make mutable shared structures thread safe. | ||
let progress = RwLock::new(progress); | ||
let longest_replay_time_us = AtomicU64::new(0); | ||
|
||
let bank_slots_replayers: Vec<_> = active_bank_slots | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. naming nit: makes it seem like there are multiple replayers, but these are actually just different handles |
||
.iter() | ||
.map(|s| (s, replayer.handle())) | ||
.collect(); | ||
|
||
// Allow for concurrent replaying of slots from different forks. | ||
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = PAR_THREAD_POOL.install(|| { | ||
active_bank_slots | ||
bank_slots_replayers | ||
.into_par_iter() | ||
.map(|bank_slot| { | ||
.map(|(bank_slot, replayer_handle)| { | ||
let bank_slot = *bank_slot; | ||
let mut replay_result = ReplaySlotFromBlockstore { | ||
is_slot_dead: false, | ||
|
@@ -2319,6 +2342,7 @@ impl ReplayStage { | |
&verify_recyclers.clone(), | ||
log_messages_bytes_limit, | ||
prioritization_fee_cache, | ||
&replayer_handle, | ||
); | ||
replay_blockstore_time.stop(); | ||
replay_result.replay_result = Some(blockstore_result); | ||
|
@@ -2350,6 +2374,7 @@ impl ReplayStage { | |
log_messages_bytes_limit: Option<usize>, | ||
bank_slot: Slot, | ||
prioritization_fee_cache: &PrioritizationFeeCache, | ||
replayer: &Replayer, | ||
) -> ReplaySlotFromBlockstore { | ||
let mut replay_result = ReplaySlotFromBlockstore { | ||
is_slot_dead: false, | ||
|
@@ -2389,6 +2414,7 @@ impl ReplayStage { | |
}); | ||
|
||
if bank.collector_id() != my_pubkey { | ||
let replayer_handle = replayer.handle(); | ||
let mut replay_blockstore_time = Measure::start("replay_blockstore_into_bank"); | ||
let blockstore_result = Self::replay_blockstore_into_bank( | ||
bank, | ||
|
@@ -2400,6 +2426,7 @@ impl ReplayStage { | |
&verify_recyclers.clone(), | ||
log_messages_bytes_limit, | ||
prioritization_fee_cache, | ||
&replayer_handle, | ||
); | ||
replay_blockstore_time.stop(); | ||
replay_result.replay_result = Some(blockstore_result); | ||
|
@@ -2621,6 +2648,7 @@ impl ReplayStage { | |
log_messages_bytes_limit: Option<usize>, | ||
replay_slots_concurrently: bool, | ||
prioritization_fee_cache: &PrioritizationFeeCache, | ||
replayer: &Replayer, | ||
) -> bool /* completed a bank */ { | ||
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots(); | ||
let num_active_banks = active_bank_slots.len(); | ||
|
@@ -2644,6 +2672,7 @@ impl ReplayStage { | |
log_messages_bytes_limit, | ||
&active_bank_slots, | ||
prioritization_fee_cache, | ||
replayer, | ||
) | ||
} else { | ||
active_bank_slots | ||
|
@@ -2662,6 +2691,7 @@ impl ReplayStage { | |
log_messages_bytes_limit, | ||
*bank_slot, | ||
prioritization_fee_cache, | ||
replayer, | ||
) | ||
}) | ||
.collect() | ||
|
@@ -3119,7 +3149,7 @@ impl ReplayStage { | |
// newly achieved threshold, then there's no further | ||
// information to propagate backwards to past leader blocks | ||
(newly_voted_pubkeys.is_empty() && cluster_slot_pubkeys.is_empty() && | ||
!did_newly_reach_threshold) | ||
!did_newly_reach_threshold) | ||
{ | ||
break; | ||
} | ||
|
@@ -4153,6 +4183,8 @@ pub(crate) mod tests { | |
validator_keypairs, | ||
.. | ||
} = vote_simulator; | ||
let replayer = Replayer::new(get_thread_count()); | ||
let replayer_handle = replayer.handle(); | ||
|
||
let bank0 = bank_forks.read().unwrap().get(0).unwrap(); | ||
assert!(bank0.is_frozen()); | ||
|
@@ -4180,6 +4212,7 @@ pub(crate) mod tests { | |
&VerifyRecyclers::default(), | ||
None, | ||
&PrioritizationFeeCache::new(0u64), | ||
&replayer_handle, | ||
); | ||
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); | ||
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think I made a similar comment on the previous PR, but it'd be good if we could join the replayer threads here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg! will def do before cleaning up, would like to get high-level comments out of the way before going in and fixing/adding tests jjust to make sure on right path