Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Condense Blockstore RPC API datapoints (#34045)
Browse files Browse the repository at this point in the history
Currently, the RPC API that touch the Blockstore emit a datapoint for
each call. For an RPC node serving many requests, these datapoints
could get quite noisy, both in logs as well as traffic to the metrics
agent.

So, instead of submitting a datapoint for every call, accumulate the
number of calls in a struct and report that entire struct periodically.
  • Loading branch information
steviez authored Nov 14, 2023
1 parent 0e91e96 commit d1d4c1c
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 34 deletions.
79 changes: 45 additions & 34 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use {
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
blockstore_metrics::BlockstoreRpcApiMetrics,
blockstore_options::{
AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_FIFO,
BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
Expand Down Expand Up @@ -222,6 +223,7 @@ pub struct Blockstore {
pub shred_timing_point_sender: Option<PohTimingSender>,
pub lowest_cleanup_slot: RwLock<Slot>,
pub slots_stats: SlotsStats,
rpc_api_metrics: BlockstoreRpcApiMetrics,
}

pub struct IndexMetaWorkingSetEntry {
Expand Down Expand Up @@ -362,6 +364,7 @@ impl Blockstore {
max_root,
lowest_cleanup_slot: RwLock::<Slot>::default(),
slots_stats: SlotsStats::default(),
rpc_api_metrics: BlockstoreRpcApiMetrics::default(),
};
blockstore.cleanup_old_entries()?;
blockstore.update_highest_primary_index_slot()?;
Expand Down Expand Up @@ -717,6 +720,11 @@ impl Blockstore {
self.merkle_root_meta_cf.submit_rocksdb_cf_metrics();
}

/// Report the accumulated RPC API metrics
pub(crate) fn report_rpc_api_metrics(&self) {
self.rpc_api_metrics.report();
}

fn try_shred_recovery(
&self,
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
Expand Down Expand Up @@ -1961,10 +1969,9 @@ impl Blockstore {
}

pub fn get_rooted_block_time(&self, slot: Slot) -> Result<UnixTimestamp> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_rooted_block_time", String)
);
self.rpc_api_metrics
.num_get_rooted_block_time
.fetch_add(1, Ordering::Relaxed);
let _lock = self.check_lowest_cleanup_slot(slot)?;

if self.is_root(slot) {
Expand All @@ -1981,8 +1988,11 @@ impl Blockstore {
}

pub fn get_block_height(&self, slot: Slot) -> Result<Option<u64>> {
datapoint_info!("blockstore-rpc-api", ("method", "get_block_height", String));
self.rpc_api_metrics
.num_get_block_height
.fetch_add(1, Ordering::Relaxed);
let _lock = self.check_lowest_cleanup_slot(slot)?;

self.block_height_cf.get(slot)
}

Expand Down Expand Up @@ -2010,7 +2020,9 @@ impl Blockstore {
slot: Slot,
require_previous_blockhash: bool,
) -> Result<VersionedConfirmedBlock> {
datapoint_info!("blockstore-rpc-api", ("method", "get_rooted_block", String));
self.rpc_api_metrics
.num_get_rooted_block
.fetch_add(1, Ordering::Relaxed);
let _lock = self.check_lowest_cleanup_slot(slot)?;

if self.is_root(slot) {
Expand All @@ -2033,10 +2045,9 @@ impl Blockstore {
slot: Slot,
require_previous_blockhash: bool,
) -> Result<VersionedConfirmedBlockWithEntries> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_rooted_block_with_entries", String)
);
self.rpc_api_metrics
.num_get_rooted_block_with_entries
.fetch_add(1, Ordering::Relaxed);
let _lock = self.check_lowest_cleanup_slot(slot)?;

if self.is_root(slot) {
Expand Down Expand Up @@ -2441,10 +2452,10 @@ impl Blockstore {
&self,
signature: Signature,
) -> Result<Option<(Slot, TransactionStatusMeta)>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_rooted_transaction_status", String)
);
self.rpc_api_metrics
.num_get_rooted_transaction_status
.fetch_add(1, Ordering::Relaxed);

self.get_transaction_status(signature, &HashSet::default())
}

Expand All @@ -2454,10 +2465,10 @@ impl Blockstore {
signature: Signature,
confirmed_unrooted_slots: &HashSet<Slot>,
) -> Result<Option<(Slot, TransactionStatusMeta)>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_transaction_status", String)
);
self.rpc_api_metrics
.num_get_transaction_status
.fetch_add(1, Ordering::Relaxed);

self.get_transaction_status_with_counter(signature, confirmed_unrooted_slots)
.map(|(status, _)| status)
}
Expand All @@ -2467,10 +2478,10 @@ impl Blockstore {
&self,
signature: Signature,
) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_rooted_transaction", String)
);
self.rpc_api_metrics
.num_get_rooted_transaction
.fetch_add(1, Ordering::Relaxed);

self.get_transaction_with_status(signature, &HashSet::default())
}

Expand All @@ -2480,10 +2491,10 @@ impl Blockstore {
signature: Signature,
highest_confirmed_slot: Slot,
) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_complete_transaction", String)
);
self.rpc_api_metrics
.num_get_complete_transaction
.fetch_add(1, Ordering::Relaxed);

let max_root = self.max_root();
let confirmed_unrooted_slots: HashSet<_> =
AncestorIterator::new_inclusive(highest_confirmed_slot, self)
Expand Down Expand Up @@ -2593,10 +2604,10 @@ impl Blockstore {
start_slot: Slot,
end_slot: Slot,
) -> Result<Vec<Signature>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_confirmed_signatures_for_address", String)
);
self.rpc_api_metrics
.num_get_confirmed_signatures_for_address
.fetch_add(1, Ordering::Relaxed);

self.find_address_signatures(pubkey, start_slot, end_slot)
.map(|signatures| signatures.iter().map(|(_, signature)| *signature).collect())
}
Expand Down Expand Up @@ -2631,10 +2642,10 @@ impl Blockstore {
until: Option<Signature>,
limit: usize,
) -> Result<SignatureInfosForAddress> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_confirmed_signatures_for_address2", String)
);
self.rpc_api_metrics
.num_get_confirmed_signatures_for_address2
.fetch_add(1, Ordering::Relaxed);

let max_root = self.max_root();
let confirmed_unrooted_slots: HashSet<_> =
AncestorIterator::new_inclusive(highest_slot, self)
Expand Down
1 change: 1 addition & 0 deletions ledger/src/blockstore_metric_report_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl BlockstoreMetricReportService {
BLOCKSTORE_METRICS_REPORT_PERIOD_MILLIS,
));
blockstore.submit_rocksdb_cf_metrics_for_all_cfs();
blockstore.report_rpc_api_metrics();
})
.unwrap();
Self { t_cf_metric }
Expand Down
98 changes: 98 additions & 0 deletions ledger/src/blockstore_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,104 @@ impl BlockstoreInsertionMetrics {
}
}

/// A metrics struct to track the number of times Blockstore RPC function are called.
#[derive(Default)]
pub(crate) struct BlockstoreRpcApiMetrics {
pub num_get_block_height: AtomicU64,
pub num_get_complete_transaction: AtomicU64,
pub num_get_confirmed_signatures_for_address: AtomicU64,
pub num_get_confirmed_signatures_for_address2: AtomicU64,
pub num_get_rooted_block: AtomicU64,
pub num_get_rooted_block_time: AtomicU64,
pub num_get_rooted_transaction: AtomicU64,
pub num_get_rooted_transaction_status: AtomicU64,
pub num_get_rooted_block_with_entries: AtomicU64,
pub num_get_transaction_status: AtomicU64,
}

impl BlockstoreRpcApiMetrics {
pub fn report(&self) {
let num_get_block_height = self.num_get_block_height.swap(0, Ordering::Relaxed);
let num_get_complete_transaction =
self.num_get_complete_transaction.swap(0, Ordering::Relaxed);
let num_get_confirmed_signatures_for_address = self
.num_get_confirmed_signatures_for_address
.swap(0, Ordering::Relaxed);
let num_get_confirmed_signatures_for_address2 = self
.num_get_confirmed_signatures_for_address2
.swap(0, Ordering::Relaxed);
let num_get_rooted_block = self.num_get_rooted_block.swap(0, Ordering::Relaxed);
let num_get_rooted_block_time = self.num_get_rooted_block_time.swap(0, Ordering::Relaxed);
let num_get_rooted_transaction = self.num_get_rooted_transaction.swap(0, Ordering::Relaxed);
let num_get_rooted_transaction_status = self
.num_get_rooted_transaction_status
.swap(0, Ordering::Relaxed);
let num_get_rooted_block_with_entries = self
.num_get_rooted_block_with_entries
.swap(0, Ordering::Relaxed);
let num_get_transaction_status = self.num_get_transaction_status.swap(0, Ordering::Relaxed);

let total_num_queries = num_get_block_height
.saturating_add(num_get_complete_transaction)
.saturating_add(num_get_confirmed_signatures_for_address)
.saturating_add(num_get_confirmed_signatures_for_address2)
.saturating_add(num_get_rooted_block)
.saturating_add(num_get_rooted_block_time)
.saturating_add(num_get_rooted_transaction)
.saturating_add(num_get_rooted_transaction_status)
.saturating_add(num_get_rooted_block_with_entries)
.saturating_add(num_get_transaction_status);

if total_num_queries > 0 {
datapoint_info!(
"blockstore-rpc-api",
("num_get_block_height", num_get_block_height as i64, i64),
(
"num_get_complete_transaction",
num_get_complete_transaction as i64,
i64
),
(
"num_get_confirmed_signatures_for_address",
num_get_confirmed_signatures_for_address as i64,
i64
),
(
"num_get_confirmed_signatures_for_address2",
num_get_confirmed_signatures_for_address2 as i64,
i64
),
("num_get_rooted_block", num_get_rooted_block as i64, i64),
(
"num_get_rooted_block_time",
num_get_rooted_block_time as i64,
i64
),
(
"num_get_rooted_transaction",
num_get_rooted_transaction as i64,
i64
),
(
"num_get_rooted_transaction_status",
num_get_rooted_transaction_status as i64,
i64
),
(
"num_get_rooted_block_with_entries",
num_get_rooted_block_with_entries as i64,
i64
),
(
"num_get_transaction_status",
num_get_transaction_status as i64,
i64
),
);
}
}
}

/// A metrics struct that exposes RocksDB's column family properties.
///
/// Here we only expose a subset of all the internal properties which are
Expand Down

0 comments on commit d1d4c1c

Please sign in to comment.