Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

in hash calc, delete old cache files that will not be used earlier #33432

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 105 additions & 74 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ impl<'a> StoreTo<'a> {
}
}

enum ScanAccountStorageResult {
/// this data has already been scanned and cached
CacheFileAlreadyExists(CacheHashDataFileReference),
/// this data needs to be scanned and cached
CacheFileNeedsToBeCreated((String, Range<Slot>)),
}

#[derive(Default, Debug)]
/// hold alive accounts
/// alive means in the accounts index
Expand Down Expand Up @@ -7223,90 +7230,114 @@ impl AccountsDb {
.saturating_sub(slots_per_epoch);

stats.scan_chunks = splitter.chunk_count;
(0..splitter.chunk_count)
.into_par_iter()
.map(|chunk| {
let mut scanner = scanner.clone();

let cache_files = (0..splitter.chunk_count)
.into_par_iter()
.filter_map(|chunk| {
let range_this_chunk = splitter.get_slot_range(chunk)?;

let file_name = {
let mut load_from_cache = true;
let mut hasher = hash_map::DefaultHasher::new();
bin_range.start.hash(&mut hasher);
bin_range.end.hash(&mut hasher);
let is_first_scan_pass = bin_range.start == 0;

// calculate hash representing all storages in this chunk
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
if is_first_scan_pass && slot < one_epoch_old {
self.update_old_slot_stats(stats, storage);
}
if !Self::hash_storage_info(&mut hasher, storage, slot) {
load_from_cache = false;
break;
}
let mut load_from_cache = true;
let mut hasher = hash_map::DefaultHasher::new();
bin_range.start.hash(&mut hasher);
bin_range.end.hash(&mut hasher);
let is_first_scan_pass = bin_range.start == 0;

// calculate hash representing all storages in this chunk
let mut empty = true;
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
empty = false;
if is_first_scan_pass && slot < one_epoch_old {
self.update_old_slot_stats(stats, storage);
}
// we have a hash value for the storages in this chunk
// so, build a file name:
let hash = hasher.finish();
let file_name = format!(
"{}.{}.{}.{}.{:016x}",
range_this_chunk.start,
range_this_chunk.end,
bin_range.start,
bin_range.end,
hash
);
if load_from_cache {
if let Ok(mapped_file) =
cache_hash_data.get_file_reference_to_map_later(&file_name)
{
return Some(mapped_file);
}
if !Self::hash_storage_info(&mut hasher, storage, slot) {
load_from_cache = false;
break;
}
}
if empty {
return None;
}
// we have a hash value for the storages in this chunk
// so, build a file name:
let hash = hasher.finish();
let file_name = format!(
"{}.{}.{}.{}.{:016x}",
range_this_chunk.start,
range_this_chunk.end,
bin_range.start,
bin_range.end,
hash
);
if load_from_cache {
if let Ok(mapped_file) =
cache_hash_data.get_file_reference_to_map_later(&file_name)
{
return Some(ScanAccountStorageResult::CacheFileAlreadyExists(
mapped_file,
));
}
}

// fall through and load normally - we failed to load from a cache file
file_name
};
// fall through and load normally - we failed to load from a cache file but there are storages present
Some(ScanAccountStorageResult::CacheFileNeedsToBeCreated((
file_name,
range_this_chunk,
)))
})
.collect::<Vec<_>>();

let mut init_accum = true;
// load from cache failed, so create the cache file for this chunk
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
let ancient = slot < oldest_non_ancient_slot;
let (_, scan_us) = measure_us!(if let Some(storage) = storage {
if init_accum {
let range = bin_range.end - bin_range.start;
scanner.init_accum(range);
init_accum = false;
}
scanner.set_slot(slot);
// deletes the old files that will not be used before creating new ones
cache_hash_data.delete_old_cache_files();

Self::scan_single_account_storage(storage, &mut scanner);
});
if ancient {
stats
.sum_ancient_scans_us
.fetch_add(scan_us, Ordering::Relaxed);
stats.count_ancient_scans.fetch_add(1, Ordering::Relaxed);
stats
.longest_ancient_scan_us
.fetch_max(scan_us, Ordering::Relaxed);
cache_files
.into_par_iter()
.map(|chunk| {
match chunk {
ScanAccountStorageResult::CacheFileAlreadyExists(file) => Some(file),
ScanAccountStorageResult::CacheFileNeedsToBeCreated((
file_name,
range_this_chunk,
)) => {
let mut scanner = scanner.clone();
let mut init_accum = true;
// load from cache failed, so create the cache file for this chunk
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
let ancient = slot < oldest_non_ancient_slot;
let (_, scan_us) = measure_us!(if let Some(storage) = storage {
if init_accum {
let range = bin_range.end - bin_range.start;
scanner.init_accum(range);
init_accum = false;
}
scanner.set_slot(slot);

Self::scan_single_account_storage(storage, &mut scanner);
});
if ancient {
stats
.sum_ancient_scans_us
.fetch_add(scan_us, Ordering::Relaxed);
stats.count_ancient_scans.fetch_add(1, Ordering::Relaxed);
stats
.longest_ancient_scan_us
.fetch_max(scan_us, Ordering::Relaxed);
}
}
(!init_accum)
.then(|| {
let r = scanner.scanning_complete();
assert!(!file_name.is_empty());
(!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| {
// error if we can't write this
cache_hash_data.save(&file_name, &r).unwrap();
cache_hash_data
.get_file_reference_to_map_later(&file_name)
.unwrap()
})
})
.flatten()
}
}
(!init_accum)
.then(|| {
let r = scanner.scanning_complete();
assert!(!file_name.is_empty());
(!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| {
// error if we can't write this
cache_hash_data.save(&file_name, &r).unwrap();
cache_hash_data
.get_file_reference_to_map_later(&file_name)
.unwrap()
})
})
.flatten()
})
.filter_map(|x| x)
.collect()
Expand Down
28 changes: 16 additions & 12 deletions accounts-db/src/cache_hash_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ pub(crate) struct CacheHashData {

impl Drop for CacheHashData {
fn drop(&mut self) {
if self.should_delete_old_cache_files_on_drop {
self.delete_old_cache_files();
}
self.delete_old_cache_files();
self.stats.report();
}
}
Expand All @@ -224,18 +222,24 @@ impl CacheHashData {
result.get_cache_files();
result
}
fn delete_old_cache_files(&self) {
let old_cache_files = std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
if !old_cache_files.is_empty() {
self.stats
.unused_cache_files
.fetch_add(old_cache_files.len(), Ordering::Relaxed);
for file_name in old_cache_files.iter() {
let result = self.cache_dir.join(file_name);
let _ = fs::remove_file(result);

/// delete all pre-existing files that will not be used
pub(crate) fn delete_old_cache_files(&self) {
if self.should_delete_old_cache_files_on_drop {
let old_cache_files =
std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
if !old_cache_files.is_empty() {
self.stats
.unused_cache_files
.fetch_add(old_cache_files.len(), Ordering::Relaxed);
for file_name in old_cache_files.iter() {
let result = self.cache_dir.join(file_name);
let _ = fs::remove_file(result);
}
}
}
}

fn get_cache_files(&self) {
if self.cache_dir.is_dir() {
let dir = fs::read_dir(&self.cache_dir);
Expand Down