Skip to content

Commit

Permalink
Add comments and clean-up the test
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Apr 22, 2024
1 parent 44a5ea5 commit 07959f2
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 55 deletions.
125 changes: 72 additions & 53 deletions core/src/drop_bank_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ impl DropBankService {
for banks in bank_receiver.iter() {
let len = banks.len();
let mut dropped_banks_time = Measure::start("drop_banks");
drop(banks);
// Drop BankWithScheduler with no alive lock to avoid deadlocks. That's because
// BankWithScheduler::drop() could block on transaction execution if unified
// scheduler is installed. As a historical context, it's dropped early inside
// the replaying stage not here and that caused a deadlock for BankForks.
drop::<Vec<BankWithScheduler>>(banks);
dropped_banks_time.stop();
if dropped_banks_time.as_ms() > 10 {
datapoint_info!(
Expand Down Expand Up @@ -109,17 +113,15 @@ pub mod tests {
}
}

let (drop_bank_sender1, drop_bank_receiver1) = unbounded();
let (drop_bank_sender2, drop_bank_receiver2) = unbounded();

let drop_bank_service = DropBankService::new(drop_bank_receiver2);
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let bank0 = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank0);

// Setup bankforks with unified scheduler enabled
let genesis_bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(genesis_bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool_raw = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new(
None,
Expand All @@ -134,72 +136,88 @@ pub mod tests {
let genesis_bank = &bank_forks.read().unwrap().get(genesis).unwrap();
genesis_bank.set_fork_graph_in_program_cache(bank_forks.clone());

// Create bank, which is pruned later
let pruned = 2;
let pruned_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), pruned);
let pruned_bank = bank_forks.write().unwrap().insert(pruned_bank);

// Create new root bank
let root = 3;
let root_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), root);
root_bank.freeze();
let root_hash = root_bank.hash();
bank_forks.write().unwrap().insert(root_bank);

let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new((root, root_hash));

let mut progress = ProgressMap::default();
for i in genesis..=root {
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
}

let mut duplicate_slots_tracker: DuplicateSlotsTracker =
vec![root - 1, root, root + 1].into_iter().collect();
let mut duplicate_confirmed_slots: DuplicateConfirmedSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, Hash::default()))
.collect();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes =
UnfrozenGossipVerifiedVoteHashes {
votes_per_slot: vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, HashMap::new()))
.collect(),
};
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|slot| (slot, Hash::default()))
.collect();

let tx = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));

// Delay transaction execution to ensure transaction execution happens after termintion has
// been started
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
pruned_bank.schedule_transaction_executions([(&tx, &0)].into_iter());
drop(pruned_bank);
assert_eq!(pool_raw.pooled_scheduler_count(), 0);
drop(lock_to_stall);
//std::thread::sleep(std::time::Duration::from_secs(6));

// Create 2 channels to check actual pruned banks
let (drop_bank_sender1, drop_bank_receiver1) = unbounded();
let (drop_bank_sender2, drop_bank_receiver2) = unbounded();
let drop_bank_service = DropBankService::new(drop_bank_receiver2);

info!("calling handle_new_root()...");
ReplayStage::handle_new_root(
root,
&bank_forks,
&mut progress,
&AbsRequestSender::default(),
None,
&mut heaviest_subtree_fork_choice,
&mut duplicate_slots_tracker,
&mut duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
&mut true,
&mut Vec::new(),
&mut epoch_slots_frozen_slots,
&drop_bank_sender1,
)
.unwrap();
// Mostly copied from: test_handle_new_root()
{
let mut heaviest_subtree_fork_choice =
HeaviestSubtreeForkChoice::new((root, root_hash));

let mut progress = ProgressMap::default();
for i in genesis..=root {
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
}

let mut duplicate_slots_tracker: DuplicateSlotsTracker =
vec![root - 1, root, root + 1].into_iter().collect();
let mut duplicate_confirmed_slots: DuplicateConfirmedSlots =
vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, Hash::default()))
.collect();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes =
UnfrozenGossipVerifiedVoteHashes {
votes_per_slot: vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, HashMap::new()))
.collect(),
};
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots =
vec![root - 1, root, root + 1]
.into_iter()
.map(|slot| (slot, Hash::default()))
.collect();
ReplayStage::handle_new_root(
root,
&bank_forks,
&mut progress,
&AbsRequestSender::default(),
None,
&mut heaviest_subtree_fork_choice,
&mut duplicate_slots_tracker,
&mut duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
&mut true,
&mut Vec::new(),
&mut epoch_slots_frozen_slots,
&drop_bank_sender1,
)
.unwrap();
}

// Receive pruned banks from the above handle_new_root
let pruned_banks = drop_bank_receiver1.recv().unwrap();
//assert_eq!(pruned_banks.iter().map(|b| Arc::strong_count(b)).sorted().collect::<Vec<_>>(), vec![1, 1]);
assert_eq!(
pruned_banks
.iter()
Expand All @@ -208,17 +226,18 @@ pub mod tests {
.collect::<Vec<_>>(),
vec![genesis, pruned]
);
info!("sending pruned banks...");
info!("sending pruned banks to DropBankService...");
drop_bank_sender2.send(pruned_banks).unwrap();

info!("joining the drop bank service...");
drop((
(drop_bank_sender1, drop_bank_receiver1),
(drop_bank_sender2,),
));
info!("joining the drop bank service...");
drop_bank_service.join().unwrap();
info!("finally joined the drop bank service!");
// the scheduler used for pruned_bank have been returned now

// the scheduler used by the pruned_bank have been returned now.
assert_eq!(pool_raw.pooled_scheduler_count(), 1);
}
}
4 changes: 2 additions & 2 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ impl WaitReason {
///
/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
/// ::drop() inside BankForks::set_root()'s pruning, perfectly matching to Arc<Bank>'s lifetime by
/// piggybacking on the pruning.
/// ::drop() by DropBankService, which receives Vec<BankWithScheduler> from BankForks::set_root()'s
/// pruning, mostly matching to Arc<Bank>'s lifetime by piggybacking on the pruning.
///
/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
Expand Down

0 comments on commit 07959f2

Please sign in to comment.