Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Blockstore special columns: minimize deletes in PurgeType::Exact #33498

Merged
merged 3 commits into from
Oct 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 76 additions & 8 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,32 @@ impl Blockstore {
) -> Result<()> {
let mut index0 = self.transaction_status_index_cf.get(0)?.unwrap_or_default();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap_or_default();
let slot_indexes = |slot: Slot| -> Vec<u64> {
let mut indexes = vec![];
if slot <= index0.max_slot && (index0.frozen || slot >= index1.max_slot) {
indexes.push(0);
}
if slot <= index1.max_slot && (index1.frozen || slot >= index0.max_slot) {
indexes.push(1);
}
indexes
};

for slot in from_slot..=to_slot {
let primary_indexes = slot_indexes(slot);
if primary_indexes.is_empty() {
continue;
}

let slot_entries = self.get_any_valid_slot_entries(slot, 0);
let transactions = slot_entries
.into_iter()
.flat_map(|entry| entry.transactions);
for transaction in transactions {
if let Some(&signature) = transaction.signatures.get(0) {
batch.delete::<cf::TransactionStatus>((0, signature, slot))?;
batch.delete::<cf::TransactionStatus>((1, signature, slot))?;
for primary_index in &primary_indexes {
batch.delete::<cf::TransactionStatus>((*primary_index, signature, slot))?;
}

let meta = self.read_transaction_status((signature, slot))?;
let loaded_addresses = meta.map(|meta| meta.loaded_addresses);
Expand All @@ -372,8 +389,14 @@ impl Blockstore {
);

for pubkey in account_keys.iter() {
batch.delete::<cf::AddressSignatures>((0, *pubkey, slot, signature))?;
batch.delete::<cf::AddressSignatures>((1, *pubkey, slot, signature))?;
for primary_index in &primary_indexes {
batch.delete::<cf::AddressSignatures>((
*primary_index,
*pubkey,
slot,
signature,
))?;
}
}
}
}
Expand Down Expand Up @@ -697,7 +720,7 @@ pub mod tests {
blockstore.initialize_transaction_status_index().unwrap();
*blockstore.active_transaction_status_index.write().unwrap() = 0;

for x in 0..index0_max_slot + 1 {
for x in 0..index0_max_slot {
let entries = make_slot_entries_with_transactions(1);
let shreds = entries_to_test_shreds(
&entries,
Expand Down Expand Up @@ -726,6 +749,36 @@ pub mod tests {
)
.unwrap();
}

// Add slot that crosses primary indexes
let entries = make_slot_entries_with_transactions(2);
let shreds = entries_to_test_shreds(
&entries,
index0_max_slot, // slot
index0_max_slot.saturating_sub(1), // parent_slot
true, // is_full_slot
0, // version
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, false).unwrap();
let signatures = entries
.iter()
.filter(|entry| !entry.is_tick())
.cloned()
.flat_map(|entry| entry.transactions)
.map(|transaction| transaction.signatures[0])
.collect::<Vec<Signature>>();
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
index0_max_slot,
signatures[0],
vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()],
vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()],
TransactionStatusMeta::default(),
)
.unwrap();

// Freeze index 0
let mut write_batch = blockstore.db.batch().unwrap();
let mut w_active_transaction_status_index =
Expand All @@ -740,6 +793,19 @@ pub mod tests {
drop(w_active_transaction_status_index);
blockstore.db.write(write_batch).unwrap();

let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
index0_max_slot,
signatures[1],
vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()],
vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()],
TransactionStatusMeta::default(),
)
.unwrap();

// Note: index0_max_slot exists in both indexes

for x in index0_max_slot + 1..index1_max_slot + 1 {
let entries = make_slot_entries_with_transactions(1);
let shreds = entries_to_test_shreds(
Expand Down Expand Up @@ -993,7 +1059,7 @@ pub mod tests {
for _ in 13..index1_max_slot + 1 {
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.0, 1);
assert!(entry.2 > 12);
assert!(entry.2 == index0_max_slot || entry.2 > 12);
}
drop(status_entry_iterator);

Expand Down Expand Up @@ -1190,6 +1256,8 @@ pub mod tests {
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let index0_max_slot = 9;
let index1_max_slot = 19;
// includes slot 0, and slot 9 has 2 transactions
let num_total_transactions = index1_max_slot + 2;

clear_and_repopulate_transaction_statuses_for_test(
&blockstore,
Expand Down Expand Up @@ -1227,7 +1295,7 @@ pub mod tests {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, index1_max_slot - (oldest_slot - 1));
assert_eq!(count, num_total_transactions - oldest_slot);

clear_and_repopulate_transaction_statuses_for_test(
&blockstore,
Expand Down Expand Up @@ -1265,6 +1333,6 @@ pub mod tests {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, index1_max_slot - (oldest_slot - 1));
assert_eq!(count, num_total_transactions - oldest_slot - 1); // Extra transaction in slot 9
}
}