Skip to content

Commit

Permalink
DAG-based replay (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu committed Mar 27, 2024
1 parent 7d1c7c7 commit b659e0f
Show file tree
Hide file tree
Showing 9 changed files with 587 additions and 334 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ merlin = "3"
min-max-heap = "1.3.0"
modular-bitfield = "0.11.2"
nix = "0.26.4"
nohash-hasher = "0.2.0"
num-bigint = "0.4.4"
num_cpus = "1.16.0"
num_enum = "0.6.1"
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ lazy_static = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
min-max-heap = { workspace = true }
num_cpus = { workspace = true }
num_enum = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
Expand Down
94 changes: 60 additions & 34 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use {
block_error::BlockError,
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
self, BankTransactionExecutor, BankTransactionExecutorHandle, BlockstoreProcessorError,
ConfirmationProgress, TransactionStatusSender,
},
entry_notifier_service::EntryNotifierSender,
leader_schedule_cache::LeaderScheduleCache,
Expand All @@ -52,6 +53,7 @@ use {
solana_measure::measure::Measure,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -284,6 +286,7 @@ pub struct ReplayTiming {
generate_new_bank_forks_write_lock_us: u64,
replay_blockstore_us: u64, //< When processing forks concurrently, only captures the longest fork
}

impl ReplayTiming {
#[allow(clippy::too_many_arguments)]
fn update(
Expand Down Expand Up @@ -472,6 +475,7 @@ impl ReplayTiming {
pub struct ReplayStage {
t_replay: JoinHandle<()>,
commitment_service: AggregateCommitmentService,
bank_executor: BankTransactionExecutor,
}

impl ReplayStage {
Expand Down Expand Up @@ -533,6 +537,11 @@ impl ReplayStage {
replay_slots_concurrently,
} = config;

let bank_executor =
BankTransactionExecutor::new(get_thread_count(), &prioritization_fee_cache);

let executor_handle = bank_executor.handle();

trace!("replay stage");
// Start the replay stage loop
let (lockouts_sender, commitment_service) = AggregateCommitmentService::new(
Expand Down Expand Up @@ -658,6 +667,7 @@ impl ReplayStage {
replay_slots_concurrently,
&prioritization_fee_cache,
&mut purge_repair_slot_counter,
&executor_handle,
);
replay_active_banks_time.stop();

Expand Down Expand Up @@ -1114,6 +1124,7 @@ impl ReplayStage {
Ok(Self {
t_replay,
commitment_service,
bank_executor,
})
}

Expand Down Expand Up @@ -1381,8 +1392,8 @@ impl ReplayStage {
return false;
} else if frozen_hash == Hash::default()
&& !progress.is_dead(*duplicate_slot).expect(
"If slot exists in BankForks must exist in the progress map",
)
"If slot exists in BankForks must exist in the progress map",
)
{
warn!(
"Trying to dump unfrozen slot {} that is not dead",
Expand Down Expand Up @@ -2057,6 +2068,7 @@ impl ReplayStage {
verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
executor_handle: &BankTransactionExecutorHandle,
) -> result::Result<usize, BlockstoreProcessorError> {
let mut w_replay_stats = replay_stats.write().unwrap();
let mut w_replay_progress = replay_progress.write().unwrap();
Expand All @@ -2067,22 +2079,21 @@ impl ReplayStage {
let start = Instant::now();
while start.elapsed() < Duration::from_millis(50)
&& blockstore_processor::confirm_slot(
blockstore,
bank,
&mut w_replay_stats,
&mut w_replay_progress,
false,
transaction_status_sender,
entry_notification_sender,
Some(replay_vote_sender),
verify_recyclers,
false,
log_messages_bytes_limit,
prioritization_fee_cache,
)?
blockstore,
bank,
&mut w_replay_stats,
&mut w_replay_progress,
false,
transaction_status_sender,
entry_notification_sender,
Some(replay_vote_sender),
verify_recyclers,
false,
log_messages_bytes_limit,
prioritization_fee_cache,
executor_handle,
)?
{}


let tx_count_after = w_replay_progress.num_txs;
let tx_count = tx_count_after - tx_count_before;
Ok(tx_count)
Expand Down Expand Up @@ -2633,6 +2644,7 @@ impl ReplayStage {
log_messages_bytes_limit: Option<usize>,
active_bank_slots: &[Slot],
prioritization_fee_cache: &PrioritizationFeeCache,
executor_handle: &BankTransactionExecutorHandle,
) -> Vec<ReplaySlotFromBlockstore> {
// Make mutable shared structures thread safe.
let progress = RwLock::new(progress);
Expand Down Expand Up @@ -2710,6 +2722,7 @@ impl ReplayStage {
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
executor_handle,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
Expand Down Expand Up @@ -2742,12 +2755,14 @@ impl ReplayStage {
log_messages_bytes_limit: Option<usize>,
bank_slot: Slot,
prioritization_fee_cache: &PrioritizationFeeCache,
executor_handle: &BankTransactionExecutorHandle,
) -> ReplaySlotFromBlockstore {
let mut replay_result = ReplaySlotFromBlockstore {
is_slot_dead: false,
bank_slot,
replay_result: None,
};

let my_pubkey = &my_pubkey.clone();
trace!("Replay active bank: slot {}", bank_slot);
if progress.get(&bank_slot).map(|p| p.is_dead).unwrap_or(false) {
Expand Down Expand Up @@ -2793,6 +2808,7 @@ impl ReplayStage {
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
executor_handle,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
Expand Down Expand Up @@ -3049,6 +3065,7 @@ impl ReplayStage {
replay_slots_concurrently: bool,
prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
executor_handle: &BankTransactionExecutorHandle,
) -> bool /* completed a bank */ {
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
let num_active_banks = active_bank_slots.len();
Expand All @@ -3073,6 +3090,7 @@ impl ReplayStage {
log_messages_bytes_limit,
&active_bank_slots,
prioritization_fee_cache,
executor_handle,
)
} else {
active_bank_slots
Expand All @@ -3092,6 +3110,7 @@ impl ReplayStage {
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
executor_handle,
)
})
.collect()
Expand Down Expand Up @@ -3398,10 +3417,10 @@ impl ReplayStage {
// Last vote did not land
{
my_latest_landed_vote < last_voted_slot
// If we are already voting at the tip, there is nothing we can do.
&& last_voted_slot < heaviest_bank_on_same_voted_fork.slot()
// Last vote outside slot hashes of the tip of fork
&& !heaviest_bank_on_same_voted_fork
// If we are already voting at the tip, there is nothing we can do.
&& last_voted_slot < heaviest_bank_on_same_voted_fork.slot()
// Last vote outside slot hashes of the tip of fork
&& !heaviest_bank_on_same_voted_fork
.is_in_slot_hashes_history(&last_voted_slot)
}
None => false,
Expand Down Expand Up @@ -4112,6 +4131,7 @@ impl ReplayStage {

pub fn join(self) -> thread::Result<()> {
self.commitment_service.join()?;
self.bank_executor.join()?;
self.t_replay.join().map(|_| ())
}
}
Expand Down Expand Up @@ -4834,6 +4854,9 @@ pub(crate) mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let exit = Arc::new(AtomicBool::new(false));
let priotization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let bank_executor = BankTransactionExecutor::new(1, &priotization_fee_cache);

let res = ReplayStage::replay_blockstore_into_bank(
&bank1,
&blockstore,
Expand All @@ -4844,8 +4867,11 @@ pub(crate) mod tests {
&replay_vote_sender,
&VerifyRecyclers::default(),
None,
&PrioritizationFeeCache::new(0u64),
&priotization_fee_cache,
&bank_executor.handle(),
);
bank_executor.join().unwrap();

let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
Expand Down Expand Up @@ -5294,7 +5320,7 @@ pub(crate) mod tests {
// Fill banks with votes
for vote in votes {
assert!(vote_simulator
.simulate_vote(vote, &my_node_pubkey, &mut tower,)
.simulate_vote(vote, &my_node_pubkey, &mut tower)
.is_empty());
}

Expand Down Expand Up @@ -5356,14 +5382,14 @@ pub(crate) mod tests {
// >= 4 + NUM_CONSECUTIVE_LEADER_SLOTS, or if we reset to < 4
assert!(!ReplayStage::should_retransmit(
poh_slot,
&mut last_retransmit_slot
&mut last_retransmit_slot,
));
assert_eq!(last_retransmit_slot, 4);

for poh_slot in 4..4 + NUM_CONSECUTIVE_LEADER_SLOTS {
assert!(!ReplayStage::should_retransmit(
poh_slot,
&mut last_retransmit_slot
&mut last_retransmit_slot,
));
assert_eq!(last_retransmit_slot, 4);
}
Expand All @@ -5372,15 +5398,15 @@ pub(crate) mod tests {
last_retransmit_slot = 4;
assert!(ReplayStage::should_retransmit(
poh_slot,
&mut last_retransmit_slot
&mut last_retransmit_slot,
));
assert_eq!(last_retransmit_slot, poh_slot);

let poh_slot = 3;
last_retransmit_slot = 4;
assert!(ReplayStage::should_retransmit(
poh_slot,
&mut last_retransmit_slot
&mut last_retransmit_slot,
));
assert_eq!(last_retransmit_slot, poh_slot);
}
Expand Down Expand Up @@ -6348,11 +6374,11 @@ pub(crate) mod tests {
bank_forks.write().unwrap().remove(2);
assert!(check_map_eq(
&ancestors,
&bank_forks.read().unwrap().ancestors()
&bank_forks.read().unwrap().ancestors(),
));
assert!(check_map_eq(
&descendants,
&bank_forks.read().unwrap().descendants()
&bank_forks.read().unwrap().descendants(),
));

// Try to purge the root
Expand Down Expand Up @@ -8271,7 +8297,7 @@ pub(crate) mod tests {
failures,
vec![
HeaviestForkFailures::FailedSwitchThreshold(4, 0, 30000),
HeaviestForkFailures::LockedOut(4)
HeaviestForkFailures::LockedOut(4),
]
);

Expand All @@ -8290,7 +8316,7 @@ pub(crate) mod tests {
failures,
vec![
HeaviestForkFailures::FailedSwitchThreshold(4, 0, 30000),
HeaviestForkFailures::LockedOut(4)
HeaviestForkFailures::LockedOut(4),
]
);
}
Expand Down Expand Up @@ -8350,7 +8376,7 @@ pub(crate) mod tests {

assert_eq!(vote_fork, None);
assert_eq!(reset_fork, Some(4));
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]);
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4)]);

let (vote_fork, reset_fork, failures) = run_compute_and_select_forks(
&bank_forks,
Expand All @@ -8363,7 +8389,7 @@ pub(crate) mod tests {

assert_eq!(vote_fork, None);
assert_eq!(reset_fork, Some(4));
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]);
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4)]);
}

#[test]
Expand Down
Loading

0 comments on commit b659e0f

Please sign in to comment.