From 07959f20c69bb27ae9d5f22ecd228fcb754ebcf0 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 22 Apr 2024 12:14:37 +0900 Subject: [PATCH] Add comments and clean-up the test --- core/src/drop_bank_service.rs | 125 ++++++++++++++---------- runtime/src/installed_scheduler_pool.rs | 4 +- 2 files changed, 74 insertions(+), 55 deletions(-) diff --git a/core/src/drop_bank_service.rs b/core/src/drop_bank_service.rs index 2fdbcbd52c78e9..0a7034f670adfe 100644 --- a/core/src/drop_bank_service.rs +++ b/core/src/drop_bank_service.rs @@ -17,7 +17,11 @@ impl DropBankService { for banks in bank_receiver.iter() { let len = banks.len(); let mut dropped_banks_time = Measure::start("drop_banks"); - drop(banks); + // Drop BankWithScheduler with no alive lock to avoid deadlocks. That's because + // BankWithScheduler::drop() could block on transaction execution if unified + // scheduler is installed. As a historical context, it's dropped early inside + // the replaying stage not here and that caused a deadlock for BankForks. + drop::>(banks); dropped_banks_time.stop(); if dropped_banks_time.as_ms() > 10 { datapoint_info!( @@ -109,17 +113,15 @@ pub mod tests { } } - let (drop_bank_sender1, drop_bank_receiver1) = unbounded(); - let (drop_bank_sender2, drop_bank_receiver2) = unbounded(); - - let drop_bank_service = DropBankService::new(drop_bank_receiver2); let GenesisConfigInfo { genesis_config, mint_keypair, .. } = create_genesis_config(10_000); - let bank0 = Bank::new_for_tests(&genesis_config); - let bank_forks = BankForks::new_rw_arc(bank0); + + // Setup bankforks with unified scheduler enabled + let genesis_bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new_rw_arc(genesis_bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool_raw = SchedulerPool::, _>::new( None, @@ -134,72 +136,88 @@ pub mod tests { let genesis_bank = &bank_forks.read().unwrap().get(genesis).unwrap(); genesis_bank.set_fork_graph_in_program_cache(bank_forks.clone()); + // Create bank, which is pruned later let pruned = 2; let pruned_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), pruned); let pruned_bank = bank_forks.write().unwrap().insert(pruned_bank); + + // Create new root bank let root = 3; let root_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), root); root_bank.freeze(); let root_hash = root_bank.hash(); bank_forks.write().unwrap().insert(root_bank); - let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new((root, root_hash)); - - let mut progress = ProgressMap::default(); - for i in genesis..=root { - progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); - } - - let mut duplicate_slots_tracker: DuplicateSlotsTracker = - vec![root - 1, root, root + 1].into_iter().collect(); - let mut duplicate_confirmed_slots: DuplicateConfirmedSlots = vec![root - 1, root, root + 1] - .into_iter() - .map(|s| (s, Hash::default())) - .collect(); - let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes = - UnfrozenGossipVerifiedVoteHashes { - votes_per_slot: vec![root - 1, root, root + 1] - .into_iter() - .map(|s| (s, HashMap::new())) - .collect(), - }; - let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = vec![root - 1, root, root + 1] - .into_iter() - .map(|slot| (slot, Hash::default())) - .collect(); - let tx = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); + + // Delay transaction execution to ensure transaction execution happens after termintion has + // been started let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); pruned_bank.schedule_transaction_executions([(&tx, &0)].into_iter()); drop(pruned_bank); assert_eq!(pool_raw.pooled_scheduler_count(), 0); drop(lock_to_stall); - //std::thread::sleep(std::time::Duration::from_secs(6)); + + // Create 2 channels to check actual pruned banks + let (drop_bank_sender1, drop_bank_receiver1) = unbounded(); + let (drop_bank_sender2, drop_bank_receiver2) = unbounded(); + let drop_bank_service = DropBankService::new(drop_bank_receiver2); info!("calling handle_new_root()..."); - ReplayStage::handle_new_root( - root, - &bank_forks, - &mut progress, - &AbsRequestSender::default(), - None, - &mut heaviest_subtree_fork_choice, - &mut duplicate_slots_tracker, - &mut duplicate_confirmed_slots, - &mut unfrozen_gossip_verified_vote_hashes, - &mut true, - &mut Vec::new(), - &mut epoch_slots_frozen_slots, - &drop_bank_sender1, - ) - .unwrap(); + // Mostly copied from: test_handle_new_root() + { + let mut heaviest_subtree_fork_choice = + HeaviestSubtreeForkChoice::new((root, root_hash)); + + let mut progress = ProgressMap::default(); + for i in genesis..=root { + progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); + } + + let mut duplicate_slots_tracker: DuplicateSlotsTracker = + vec![root - 1, root, root + 1].into_iter().collect(); + let mut duplicate_confirmed_slots: DuplicateConfirmedSlots = + vec![root - 1, root, root + 1] + .into_iter() + .map(|s| (s, Hash::default())) + .collect(); + let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes = + UnfrozenGossipVerifiedVoteHashes { + votes_per_slot: vec![root - 1, root, root + 1] + .into_iter() + .map(|s| (s, HashMap::new())) + .collect(), + }; + let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = + vec![root - 1, root, root + 1] + .into_iter() + .map(|slot| (slot, Hash::default())) + .collect(); + ReplayStage::handle_new_root( + root, + &bank_forks, + &mut progress, + &AbsRequestSender::default(), + None, + &mut heaviest_subtree_fork_choice, + &mut duplicate_slots_tracker, + &mut duplicate_confirmed_slots, + &mut unfrozen_gossip_verified_vote_hashes, + &mut true, + &mut Vec::new(), + &mut epoch_slots_frozen_slots, + &drop_bank_sender1, + ) + .unwrap(); + } + + // Receive pruned banks from the above handle_new_root let pruned_banks = drop_bank_receiver1.recv().unwrap(); - //assert_eq!(pruned_banks.iter().map(|b| Arc::strong_count(b)).sorted().collect::>(), vec![1, 1]); assert_eq!( pruned_banks .iter() @@ -208,17 +226,18 @@ pub mod tests { .collect::>(), vec![genesis, pruned] ); - info!("sending pruned banks..."); + info!("sending pruned banks to DropBankService..."); drop_bank_sender2.send(pruned_banks).unwrap(); + info!("joining the drop bank service..."); drop(( (drop_bank_sender1, drop_bank_receiver1), (drop_bank_sender2,), )); - info!("joining the drop bank service..."); drop_bank_service.join().unwrap(); info!("finally joined the drop bank service!"); - // the scheduler used for pruned_bank have been returned now + + // the scheduler used by the pruned_bank have been returned now. assert_eq!(pool_raw.pooled_scheduler_count(), 1); } } diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 77fc14372dd447..aaf3ea98f1b9aa 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -216,8 +216,8 @@ impl WaitReason { /// /// It brings type-safety against accidental mixing of bank and scheduler with different slots, /// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via -/// ::drop() inside BankForks::set_root()'s pruning, perfectly matching to Arc's lifetime by -/// piggybacking on the pruning. +/// ::drop() by DropBankService, which receives Vec from BankForks::set_root()'s +/// pruning, mostly matching to Arc's lifetime by piggybacking on the pruning. /// /// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put /// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied