diff --git a/ledger/src/shred/stats.rs b/ledger/src/shred/stats.rs index 5b4a75a2489bbb..60dfa9a79859c2 100644 --- a/ledger/src/shred/stats.rs +++ b/ledger/src/shred/stats.rs @@ -23,6 +23,8 @@ pub struct ProcessShredsStats { num_data_shreds_hist: [usize; 5], // If the blockstore already has shreds for the broadcast slot. pub num_extant_slots: u64, + // When looking up chained merkle root from parent slot fails. + pub err_unknown_chained_merkle_root: u64, pub(crate) data_buffer_residual: usize, pub num_merkle_data_shreds: usize, pub num_merkle_coding_shreds: usize, @@ -89,6 +91,11 @@ impl ProcessShredsStats { ("sign_coding_time", self.sign_coding_elapsed, i64), ("coding_send_time", self.coding_send_elapsed, i64), ("num_extant_slots", self.num_extant_slots, i64), + ( + "err_unknown_chained_merkle_root", + self.err_unknown_chained_merkle_root, + i64 + ), ("data_buffer_residual", self.data_buffer_residual, i64), ("num_data_shreds_07", self.num_data_shreds_hist[0], i64), ("num_data_shreds_15", self.num_data_shreds_hist[1], i64), @@ -161,6 +168,7 @@ impl AddAssign for ProcessShredsStats { coalesce_elapsed, num_data_shreds_hist, num_extant_slots, + err_unknown_chained_merkle_root, data_buffer_residual, num_merkle_data_shreds, num_merkle_coding_shreds, @@ -175,6 +183,7 @@ impl AddAssign for ProcessShredsStats { self.get_leader_schedule_elapsed += get_leader_schedule_elapsed; self.coalesce_elapsed += coalesce_elapsed; self.num_extant_slots += num_extant_slots; + self.err_unknown_chained_merkle_root += err_unknown_chained_merkle_root; self.data_buffer_residual += data_buffer_residual; self.num_merkle_data_shreds += num_merkle_data_shreds; self.num_merkle_coding_shreds += num_merkle_coding_shreds; diff --git a/turbine/src/broadcast_stage/broadcast_utils.rs b/turbine/src/broadcast_stage/broadcast_utils.rs index 3468a86dfd64ff..be231581e7fbfe 100644 --- a/turbine/src/broadcast_stage/broadcast_utils.rs +++ b/turbine/src/broadcast_stage/broadcast_utils.rs @@ -28,6 +28,7 @@ pub(super) struct ReceiveResults { #[derive(Clone)] pub struct UnfinishedSlotInfo { + pub(super) chained_merkle_root: Hash, pub next_shred_index: u32, pub(crate) next_code_index: u32, pub slot: Slot, diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index e2b8871b4bc3c2..9ded28d105e57f 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -14,6 +14,8 @@ use { shred::{shred_code, ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder}, }, solana_sdk::{ + genesis_config::ClusterType, + hash::Hash, signature::Keypair, timing::{duration_as_us, AtomicInterval}, }, @@ -69,6 +71,7 @@ impl StandardBroadcastRun { &mut self, keypair: &Keypair, max_ticks_in_slot: u8, + cluster_type: ClusterType, stats: &mut ProcessShredsStats, ) -> Vec { const SHRED_TICK_REFERENCE_MASK: u8 = ShredFlags::SHRED_TICK_REFERENCE_MASK.bits(); @@ -85,7 +88,8 @@ impl StandardBroadcastRun { keypair, &[], // entries true, // is_last_in_slot, - None, // chained_merkle_root + should_chain_merkle_shreds(state.slot, cluster_type) + .then_some(state.chained_merkle_root), state.next_shred_index, state.next_code_index, true, // merkle_variant @@ -110,6 +114,7 @@ impl StandardBroadcastRun { blockstore: &Blockstore, reference_tick: u8, is_slot_end: bool, + cluster_type: ClusterType, process_stats: &mut ProcessShredsStats, max_data_shreds_per_slot: u32, max_code_shreds_per_slot: u32, @@ -121,8 +126,12 @@ impl StandardBroadcastRun { BroadcastError, > { let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); - let (next_shred_index, next_code_index) = match &self.unfinished_slot { - Some(state) => (state.next_shred_index, state.next_code_index), + let (next_shred_index, next_code_index, chained_merkle_root) = match &self.unfinished_slot { + Some(state) => ( + state.next_shred_index, + state.next_code_index, + state.chained_merkle_root, + ), None => { // If the blockstore has shreds for the slot, it should not // recreate the slot: @@ -135,7 +144,17 @@ impl StandardBroadcastRun { return Ok((Vec::default(), Vec::default())); } } - (0u32, 0u32) + let chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent( + slot, + parent_slot, + blockstore, + ) + .unwrap_or_else(|err| { + error!("Unknown chained Merkle root: {err}"); + process_stats.err_unknown_chained_merkle_root += 1; + Hash::default() + }); + (0u32, 0u32, chained_merkle_root) } }; let shredder = @@ -144,7 +163,7 @@ impl StandardBroadcastRun { keypair, entries, is_slot_end, - None, // chained_merkle_root + should_chain_merkle_shreds(slot, cluster_type).then_some(chained_merkle_root), next_shred_index, next_code_index, true, // merkle_variant @@ -153,6 +172,10 @@ impl StandardBroadcastRun { ); process_stats.num_merkle_data_shreds += data_shreds.len(); process_stats.num_merkle_coding_shreds += coding_shreds.len(); + let chained_merkle_root = match data_shreds.iter().max_by_key(|shred| shred.index()) { + None => chained_merkle_root, + Some(shred) => shred.merkle_root().unwrap(), + }; let next_shred_index = match data_shreds.iter().map(Shred::index).max() { Some(index) => index + 1, None => next_shred_index, @@ -169,6 +192,7 @@ impl StandardBroadcastRun { return Err(BroadcastError::TooManyShreds); } self.unfinished_slot = Some(UnfinishedSlotInfo { + chained_merkle_root, next_shred_index, next_code_index, slot, @@ -232,10 +256,15 @@ impl StandardBroadcastRun { let mut process_stats = ProcessShredsStats::default(); let mut to_shreds_time = Measure::start("broadcast_to_shreds"); + let cluster_type = bank.cluster_type(); // 1) Check if slot was interrupted - let prev_slot_shreds = - self.finish_prev_slot(keypair, bank.ticks_per_slot() as u8, &mut process_stats); + let prev_slot_shreds = self.finish_prev_slot( + keypair, + bank.ticks_per_slot() as u8, + cluster_type, + &mut process_stats, + ); // 2) Convert entries to shreds and coding shreds let is_last_in_slot = last_tick_height == bank.max_tick_height(); @@ -247,6 +276,7 @@ impl StandardBroadcastRun { blockstore, reference_tick as u8, is_last_in_slot, + cluster_type, &mut process_stats, blockstore::MAX_DATA_SHREDS_PER_SLOT as u32, shred_code::MAX_CODE_SHREDS_PER_SLOT as u32, @@ -497,10 +527,15 @@ impl BroadcastRun for StandardBroadcastRun { } } +fn should_chain_merkle_shreds(_slot: Slot, _cluster_type: ClusterType) -> bool { + false +} + #[cfg(test)] mod test { use { super::*, + rand::Rng, solana_entry::entry::create_ticks, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -510,6 +545,7 @@ mod test { solana_runtime::bank::Bank, solana_sdk::{ genesis_config::GenesisConfig, + hash::Hash, signature::{Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, @@ -569,6 +605,7 @@ mod test { let slot = 1; let parent = 0; run.unfinished_slot = Some(UnfinishedSlotInfo { + chained_merkle_root: Hash::new_from_array(rand::thread_rng().gen()), next_shred_index, next_code_index: 17, slot, @@ -580,7 +617,12 @@ mod test { run.current_slot_and_parent = Some((4, 2)); // Slot 2 interrupted slot 1 - let shreds = run.finish_prev_slot(&keypair, 0, &mut ProcessShredsStats::default()); + let shreds = run.finish_prev_slot( + &keypair, + 0, // max_ticks_in_slot + ClusterType::Development, + &mut ProcessShredsStats::default(), + ); let shred = shreds .first() .expect("Expected a shred that signals an interrupt"); @@ -602,18 +644,50 @@ mod test { let (quic_endpoint_sender, _quic_endpoint_receiver) = tokio::sync::mpsc::channel(/*capacity:*/ 128); + let receive_results = { + let ticks = create_ticks( + genesis_config.ticks_per_slot, // num_ticks + 0, // hashes_per_tick + genesis_config.hash(), + ); + ReceiveResults { + entries: ticks.clone(), + time_elapsed: Duration::new(3, 0), + time_coalesced: Duration::new(2, 0), + bank: bank0.clone(), + last_tick_height: ticks.len() as u64, + } + }; + let mut standard_broadcast_run = StandardBroadcastRun::new(/*shred_version:*/ 0); + standard_broadcast_run + .test_process_receive_results( + &leader_keypair, + &cluster_info, + &socket, + &blockstore, + receive_results, + &bank_forks, + &quic_endpoint_sender, + ) + .unwrap(); + assert!(blockstore.is_full(0)); + + let bank1 = Arc::new(Bank::new_from_parent( + bank0.clone(), + &leader_keypair.pubkey(), + 1, // slot + )); // Insert 1 less than the number of ticks needed to finish the slot let ticks0 = create_ticks(genesis_config.ticks_per_slot - 1, 0, genesis_config.hash()); let receive_results = ReceiveResults { entries: ticks0.clone(), time_elapsed: Duration::new(3, 0), time_coalesced: Duration::new(2, 0), - bank: bank0.clone(), + bank: bank1.clone(), last_tick_height: (ticks0.len() - 1) as u64, }; - // Step 1: Make an incomplete transmission for slot 0 - let mut standard_broadcast_run = StandardBroadcastRun::new(0); + // Step 1: Make an incomplete transmission for slot 1 standard_broadcast_run .test_process_receive_results( &leader_keypair, @@ -627,10 +701,10 @@ mod test { .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds_per_slot); - assert_eq!(unfinished_slot.slot, 0); + assert_eq!(unfinished_slot.slot, 1); assert_eq!(unfinished_slot.parent, 0); // Make sure the slot is not complete - assert!(!blockstore.is_full(0)); + assert!(!blockstore.is_full(1)); // Modify the stats, should reset later standard_broadcast_run.process_shreds_stats.receive_elapsed = 10; // Broadcast stats should exist, and 2 batches should have been sent, @@ -656,9 +730,16 @@ mod test { 2 ); // Try to fetch ticks from blockstore, nothing should break - assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0); assert_eq!( - blockstore.get_slot_entries(0, num_shreds_per_slot).unwrap(), + blockstore + .get_slot_entries(/*slot:*/ 1, /*shred_start_index:*/ 0) + .unwrap(), + ticks0 + ); + assert_eq!( + blockstore + .get_slot_entries(/*slot:*/ 1, num_shreds_per_slot) + .unwrap(), vec![], ); @@ -721,9 +802,16 @@ mod test { .is_none()); // Try to fetch the incomplete ticks from blockstore, should succeed - assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0); assert_eq!( - blockstore.get_slot_entries(0, num_shreds_per_slot).unwrap(), + blockstore + .get_slot_entries(/*slot:*/ 1, /*shred_start_index:*/ 0) + .unwrap(), + ticks0 + ); + assert_eq!( + blockstore + .get_slot_entries(/*slot:*/ 1, num_shreds_per_slot) + .unwrap(), vec![], ); } @@ -814,7 +902,7 @@ mod test { solana_logger::setup(); let keypair = Keypair::new(); let mut bs = StandardBroadcastRun::new(0); - bs.current_slot_and_parent = Some((1, 0)); + bs.current_slot_and_parent = Some((0, 0)); let entries = create_ticks(10_000, 1, solana_sdk::hash::Hash::default()); let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -831,6 +919,7 @@ mod test { &blockstore, 0, false, + ClusterType::Development, &mut stats, 1000, 1000, @@ -846,6 +935,7 @@ mod test { &blockstore, 0, false, + ClusterType::Development, &mut stats, 10, 10,