Skip to content

Commit

Permalink
chore: add _total suffix to counter metrics (#8263)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
estensen and mattsse authored May 29, 2024
1 parent d7e822f commit 9441803
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 55 deletions.
4 changes: 2 additions & 2 deletions crates/consensus/beacon/src/engine/hooks/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<DB: Database + 'static> PruneHook<DB> {
let _ = tx.send((pruner, result));
}),
);
self.metrics.runs.increment(1);
self.metrics.runs_total.increment(1);
self.pruner_state = PrunerState::Running(rx);

Some(EngineHookEvent::Started)
Expand Down Expand Up @@ -160,7 +160,7 @@ enum PrunerState<DB> {
#[metrics(scope = "consensus.engine.prune")]
struct Metrics {
/// The number of times the pruner was run.
runs: Counter,
runs_total: Counter,
}

impl From<PrunerError> for EngineHookError {
Expand Down
38 changes: 19 additions & 19 deletions crates/metrics/src/common/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl<T> UnboundedMeteredSender<T> {
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
match self.sender.send(message) {
Ok(()) => {
self.metrics.messages_sent.increment(1);
self.metrics.messages_sent_total.increment(1);
Ok(())
}
Err(error) => {
self.metrics.send_errors.increment(1);
self.metrics.send_errors_total.increment(1);
Err(error)
}
}
Expand Down Expand Up @@ -94,15 +94,15 @@ impl<T> UnboundedMeteredReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
let msg = self.receiver.recv().await;
if msg.is_some() {
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
}
msg
}

/// Tries to receive the next value for this receiver.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let msg = self.receiver.try_recv()?;
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
Ok(msg)
}

Expand All @@ -115,7 +115,7 @@ impl<T> UnboundedMeteredReceiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
let msg = ready!(self.receiver.poll_recv(cx));
if msg.is_some() {
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
}
Poll::Ready(msg)
}
Expand Down Expand Up @@ -161,11 +161,11 @@ impl<T> MeteredSender<T> {
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
match self.sender.try_send(message) {
Ok(()) => {
self.metrics.messages_sent.increment(1);
self.metrics.messages_sent_total.increment(1);
Ok(())
}
Err(error) => {
self.metrics.send_errors.increment(1);
self.metrics.send_errors_total.increment(1);
Err(error)
}
}
Expand All @@ -176,11 +176,11 @@ impl<T> MeteredSender<T> {
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
match self.sender.send(value).await {
Ok(()) => {
self.metrics.messages_sent.increment(1);
self.metrics.messages_sent_total.increment(1);
Ok(())
}
Err(error) => {
self.metrics.send_errors.increment(1);
self.metrics.send_errors_total.increment(1);
Err(error)
}
}
Expand Down Expand Up @@ -214,15 +214,15 @@ impl<T> MeteredReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
let msg = self.receiver.recv().await;
if msg.is_some() {
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
}
msg
}

/// Tries to receive the next value for this receiver.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let msg = self.receiver.try_recv()?;
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
Ok(msg)
}

Expand All @@ -235,7 +235,7 @@ impl<T> MeteredReceiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
let msg = ready!(self.receiver.poll_recv(cx));
if msg.is_some() {
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
}
Poll::Ready(msg)
}
Expand All @@ -254,17 +254,17 @@ impl<T> Stream for MeteredReceiver<T> {
#[metrics(dynamic = true)]
struct MeteredSenderMetrics {
/// Number of messages sent
messages_sent: Counter,
messages_sent_total: Counter,
/// Number of failed message deliveries
send_errors: Counter,
send_errors_total: Counter,
}

/// Throughput metrics for [MeteredReceiver]
#[derive(Clone, Metrics)]
#[metrics(dynamic = true)]
struct MeteredReceiverMetrics {
/// Number of messages received
messages_received: Counter,
messages_received_total: Counter,
}

/// A wrapper type around [PollSender] that updates metrics on send.
Expand Down Expand Up @@ -294,7 +294,7 @@ impl<T: Send + 'static> MeteredPollSender<T> {
Poll::Ready(Ok(permit)) => Poll::Ready(Ok(permit)),
Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
Poll::Pending => {
self.metrics.back_pressure.increment(1);
self.metrics.back_pressure_total.increment(1);
Poll::Pending
}
}
Expand All @@ -305,7 +305,7 @@ impl<T: Send + 'static> MeteredPollSender<T> {
pub fn send_item(&mut self, item: T) -> Result<(), PollSendError<T>> {
match self.sender.send_item(item) {
Ok(()) => {
self.metrics.messages_sent.increment(1);
self.metrics.messages_sent_total.increment(1);
Ok(())
}
Err(error) => Err(error),
Expand All @@ -324,7 +324,7 @@ impl<T> Clone for MeteredPollSender<T> {
#[metrics(dynamic = true)]
struct MeteredPollSenderMetrics {
/// Number of messages sent
messages_sent: Counter,
messages_sent_total: Counter,
/// Number of delayed message deliveries caused by a full channel
back_pressure: Counter,
back_pressure_total: Counter,
}
28 changes: 14 additions & 14 deletions crates/rpc/rpc-builder/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(crate) struct RpcRequestMetricsService<S> {
impl<S> RpcRequestMetricsService<S> {
pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
// this instance is kept alive for the duration of the connection
metrics.inner.connection_metrics.connections_opened.increment(1);
metrics.inner.connection_metrics.connections_opened_total.increment(1);
Self { inner: service, metrics }
}
}
Expand All @@ -102,10 +102,10 @@ where
type Future = MeteredRequestFuture<S::Future>;

fn call(&self, req: Request<'a>) -> Self::Future {
self.metrics.inner.connection_metrics.requests_started.increment(1);
self.metrics.inner.connection_metrics.requests_started_total.increment(1);
let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
if let Some((_, call_metrics)) = &call_metrics {
call_metrics.started.increment(1);
call_metrics.started_total.increment(1);
}
MeteredRequestFuture {
fut: self.inner.call(req),
Expand All @@ -119,7 +119,7 @@ where
impl<S> Drop for RpcRequestMetricsService<S> {
fn drop(&mut self) {
// update connection metrics, connection closed
self.metrics.inner.connection_metrics.connections_closed.increment(1);
self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
}
}

Expand Down Expand Up @@ -153,7 +153,7 @@ impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
let elapsed = this.started_at.elapsed().as_secs_f64();

// update transport metrics
this.metrics.inner.connection_metrics.requests_finished.increment(1);
this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);

// update call metrics
Expand All @@ -162,9 +162,9 @@ impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
{
call_metrics.time_seconds.record(elapsed);
if resp.is_success() {
call_metrics.successful.increment(1);
call_metrics.successful_total.increment(1);
} else {
call_metrics.failed.increment(1);
call_metrics.failed_total.increment(1);
}
}
}
Expand Down Expand Up @@ -202,13 +202,13 @@ impl RpcTransport {
#[metrics(scope = "rpc_server.connections")]
struct RpcServerConnectionMetrics {
/// The number of connections opened
connections_opened: Counter,
connections_opened_total: Counter,
/// The number of connections closed
connections_closed: Counter,
connections_closed_total: Counter,
/// The number of requests started
requests_started: Counter,
requests_started_total: Counter,
/// The number of requests finished
requests_finished: Counter,
requests_finished_total: Counter,
/// Response for a single request/response pair
request_time_seconds: Histogram,
}
Expand All @@ -218,11 +218,11 @@ struct RpcServerConnectionMetrics {
#[metrics(scope = "rpc_server.calls")]
struct RpcServerCallMetrics {
/// The number of calls started
started: Counter,
started_total: Counter,
/// The number of successful calls
successful: Counter,
successful_total: Counter,
/// The number of failed calls
failed: Counter,
failed_total: Counter,
/// Response for a single call
time_seconds: Histogram,
}
4 changes: 2 additions & 2 deletions crates/rpc/rpc/src/eth/cache/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) struct CacheMetrics {
/// The number of queued consumers.
pub(crate) queued_consumers_count: Gauge,
/// The number of cache hits.
pub(crate) hits: Counter,
pub(crate) hits_total: Counter,
/// The number of cache misses.
pub(crate) misses: Counter,
pub(crate) misses_total: Counter,
}
4 changes: 2 additions & 2 deletions crates/rpc/rpc/src/eth/cache/multi_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ where
pub fn get(&mut self, key: &K) -> Option<&mut V> {
let entry = self.cache.get(key);
if entry.is_some() {
self.metrics.hits.increment(1);
self.metrics.hits_total.increment(1);
} else {
self.metrics.misses.increment(1);
self.metrics.misses_total.increment(1);
}
entry
}
Expand Down
11 changes: 7 additions & 4 deletions crates/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,14 @@ impl TaskExecutor {
let on_shutdown = self.on_shutdown.clone();

// Clone only the specific counter that we need.
let finished_regular_tasks_metrics = self.metrics.finished_regular_tasks.clone();
let finished_regular_tasks_total_metrics =
self.metrics.finished_regular_tasks_total.clone();
// Wrap the original future to increment the finished tasks counter upon completion
let task = {
async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_regular_tasks_metrics);
let _inc_counter_on_drop =
IncCounterOnDrop::new(finished_regular_tasks_total_metrics);
let fut = pin!(fut);
let _ = select(on_shutdown, fut).await;
}
Expand Down Expand Up @@ -405,10 +407,11 @@ impl TaskExecutor {
.in_current_span();

// Clone only the specific counter that we need.
let finished_critical_tasks_metrics = self.metrics.finished_critical_tasks.clone();
let finished_critical_tasks_total_metrics =
self.metrics.finished_critical_tasks_total.clone();
let task = async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_metrics);
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
let task = pin!(task);
let _ = select(on_shutdown, task).await;
};
Expand Down
12 changes: 6 additions & 6 deletions crates/tasks/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ use reth_metrics::{metrics::Counter, Metrics};
#[metrics(scope = "executor.spawn")]
pub struct TaskExecutorMetrics {
/// Number of spawned critical tasks
pub(crate) critical_tasks: Counter,
pub(crate) critical_tasks_total: Counter,
/// Number of finished spawned critical tasks
pub(crate) finished_critical_tasks: Counter,
pub(crate) finished_critical_tasks_total: Counter,
/// Number of spawned regular tasks
pub(crate) regular_tasks: Counter,
pub(crate) regular_tasks_total: Counter,
/// Number of finished spawned regular tasks
pub(crate) finished_regular_tasks: Counter,
pub(crate) finished_regular_tasks_total: Counter,
}

impl TaskExecutorMetrics {
/// Increments the counter for spawned critical tasks.
pub(crate) fn inc_critical_tasks(&self) {
self.critical_tasks.increment(1);
self.critical_tasks_total.increment(1);
}

/// Increments the counter for spawned regular tasks.
pub(crate) fn inc_regular_tasks(&self) {
self.regular_tasks.increment(1);
self.regular_tasks_total.increment(1);
}
}

Expand Down
4 changes: 2 additions & 2 deletions etc/grafana/dashboards/overview.json
Original file line number Diff line number Diff line change
Expand Up @@ -7342,7 +7342,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "sum(reth_rpc_server_connections_connections_opened{instance=~\"$instance\"} - reth_rpc_server_connections_connections_closed{instance=~\"$instance\"}) by (transport)",
"expr": "sum(reth_rpc_server_connections_connections_opened_total{instance=~\"$instance\"} - reth_rpc_server_connections_connections_closed_total{instance=~\"$instance\"}) by (transport)",
"format": "time_series",
"fullMetaSearch": false,
"includeNullMetadata": true,
Expand Down Expand Up @@ -7933,7 +7933,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(rate(reth_rpc_server_calls_successful{instance =~ \"$instance\"}[$__rate_interval])) by (method) > 0",
"expr": "sum(rate(reth_rpc_server_calls_successful_total{instance =~ \"$instance\"}[$__rate_interval])) by (method) > 0",
"instant": false,
"legendFormat": "{{method}}",
"range": true,
Expand Down
8 changes: 4 additions & 4 deletions etc/grafana/dashboards/reth-mempool.json
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_network_pool_transactions_messages_sent{instance=~\"$instance\"}[$__rate_interval])",
"expr": "rate(reth_network_pool_transactions_messages_sent_total{instance=~\"$instance\"}[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "Tx",
Expand All @@ -1471,7 +1471,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_network_pool_transactions_messages_received{instance=~\"$instance\"}[$__rate_interval])",
"expr": "rate(reth_network_pool_transactions_messages_received_total{instance=~\"$instance\"}[$__rate_interval])",
"hide": false,
"legendFormat": "Rx",
"range": true,
Expand All @@ -1483,7 +1483,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "reth_network_pool_transactions_messages_sent{instance=~\"$instance\"} - reth_network_pool_transactions_messages_received{instance=~\"$instance\"}",
"expr": "reth_network_pool_transactions_messages_sent_total{instance=~\"$instance\"} - reth_network_pool_transactions_messages_received_total{instance=~\"$instance\"}",
"hide": false,
"legendFormat": "Messages in Channel",
"range": true,
Expand Down Expand Up @@ -3087,7 +3087,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_network_invalid_messages_received{instance=~\"$instance\"}[$__rate_interval])",
"expr": "rate(reth_network_invalid_messages_received_total{instance=~\"$instance\"}[$__rate_interval])",
"hide": false,
"legendFormat": "Invalid Messages",
"range": true,
Expand Down

0 comments on commit 9441803

Please sign in to comment.