Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add _total suffix to counter metrics #8263

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/consensus/beacon/src/engine/hooks/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<DB: Database + 'static> PruneHook<DB> {
let _ = tx.send((pruner, result));
}),
);
self.metrics.runs.increment(1);
self.metrics.runs_total.increment(1);
self.pruner_state = PrunerState::Running(rx);

Some(EngineHookEvent::Started)
Expand Down Expand Up @@ -160,7 +160,7 @@ enum PrunerState<DB> {
#[metrics(scope = "consensus.engine.prune")]
struct Metrics {
/// The number of times the pruner was run.
runs: Counter,
runs_total: Counter,
}

impl From<PrunerError> for EngineHookError {
Expand Down
38 changes: 19 additions & 19 deletions crates/metrics/src/common/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl<T> UnboundedMeteredSender<T> {
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
match self.sender.send(message) {
Ok(()) => {
self.metrics.messages_sent.increment(1);
self.metrics.messages_sent_total.increment(1);
Ok(())
}
Err(error) => {
self.metrics.send_errors.increment(1);
self.metrics.send_errors_total.increment(1);
Err(error)
}
}
Expand Down Expand Up @@ -94,15 +94,15 @@ impl<T> UnboundedMeteredReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
let msg = self.receiver.recv().await;
if msg.is_some() {
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
}
msg
}

/// Tries to receive the next value for this receiver.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let msg = self.receiver.try_recv()?;
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
Ok(msg)
}

Expand All @@ -115,7 +115,7 @@ impl<T> UnboundedMeteredReceiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
let msg = ready!(self.receiver.poll_recv(cx));
if msg.is_some() {
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
}
Poll::Ready(msg)
}
Expand Down Expand Up @@ -161,11 +161,11 @@ impl<T> MeteredSender<T> {
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
match self.sender.try_send(message) {
Ok(()) => {
self.metrics.messages_sent.increment(1);
self.metrics.messages_sent_total.increment(1);
Ok(())
}
Err(error) => {
self.metrics.send_errors.increment(1);
self.metrics.send_errors_total.increment(1);
Err(error)
}
}
Expand All @@ -176,11 +176,11 @@ impl<T> MeteredSender<T> {
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
match self.sender.send(value).await {
Ok(()) => {
self.metrics.messages_sent.increment(1);
self.metrics.messages_sent_total.increment(1);
Ok(())
}
Err(error) => {
self.metrics.send_errors.increment(1);
self.metrics.send_errors_total.increment(1);
Err(error)
}
}
Expand Down Expand Up @@ -214,15 +214,15 @@ impl<T> MeteredReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
let msg = self.receiver.recv().await;
if msg.is_some() {
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
}
msg
}

/// Tries to receive the next value for this receiver.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let msg = self.receiver.try_recv()?;
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
Ok(msg)
}

Expand All @@ -235,7 +235,7 @@ impl<T> MeteredReceiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
let msg = ready!(self.receiver.poll_recv(cx));
if msg.is_some() {
self.metrics.messages_received.increment(1);
self.metrics.messages_received_total.increment(1);
}
Poll::Ready(msg)
}
Expand All @@ -254,17 +254,17 @@ impl<T> Stream for MeteredReceiver<T> {
#[metrics(dynamic = true)]
struct MeteredSenderMetrics {
/// Number of messages sent
messages_sent: Counter,
messages_sent_total: Counter,
/// Number of failed message deliveries
send_errors: Counter,
send_errors_total: Counter,
}

/// Throughput metrics for [MeteredReceiver]
#[derive(Clone, Metrics)]
#[metrics(dynamic = true)]
struct MeteredReceiverMetrics {
/// Number of messages received
messages_received: Counter,
messages_received_total: Counter,
}

/// A wrapper type around [PollSender] that updates metrics on send.
Expand Down Expand Up @@ -294,7 +294,7 @@ impl<T: Send + 'static> MeteredPollSender<T> {
Poll::Ready(Ok(permit)) => Poll::Ready(Ok(permit)),
Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
Poll::Pending => {
self.metrics.back_pressure.increment(1);
self.metrics.back_pressure_total.increment(1);
Poll::Pending
}
}
Expand All @@ -305,7 +305,7 @@ impl<T: Send + 'static> MeteredPollSender<T> {
pub fn send_item(&mut self, item: T) -> Result<(), PollSendError<T>> {
match self.sender.send_item(item) {
Ok(()) => {
self.metrics.messages_sent.increment(1);
self.metrics.messages_sent_total.increment(1);
Ok(())
}
Err(error) => Err(error),
Expand All @@ -324,7 +324,7 @@ impl<T> Clone for MeteredPollSender<T> {
#[metrics(dynamic = true)]
struct MeteredPollSenderMetrics {
/// Number of messages sent
messages_sent: Counter,
messages_sent_total: Counter,
/// Number of delayed message deliveries caused by a full channel
back_pressure: Counter,
back_pressure_total: Counter,
}
28 changes: 14 additions & 14 deletions crates/rpc/rpc-builder/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(crate) struct RpcRequestMetricsService<S> {
impl<S> RpcRequestMetricsService<S> {
pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
// this instance is kept alive for the duration of the connection
metrics.inner.connection_metrics.connections_opened.increment(1);
metrics.inner.connection_metrics.connections_opened_total.increment(1);
Self { inner: service, metrics }
}
}
Expand All @@ -102,10 +102,10 @@ where
type Future = MeteredRequestFuture<S::Future>;

fn call(&self, req: Request<'a>) -> Self::Future {
self.metrics.inner.connection_metrics.requests_started.increment(1);
self.metrics.inner.connection_metrics.requests_started_total.increment(1);
let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
if let Some((_, call_metrics)) = &call_metrics {
call_metrics.started.increment(1);
call_metrics.started_total.increment(1);
}
MeteredRequestFuture {
fut: self.inner.call(req),
Expand All @@ -119,7 +119,7 @@ where
impl<S> Drop for RpcRequestMetricsService<S> {
fn drop(&mut self) {
// update connection metrics, connection closed
self.metrics.inner.connection_metrics.connections_closed.increment(1);
self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
}
}

Expand Down Expand Up @@ -153,7 +153,7 @@ impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
let elapsed = this.started_at.elapsed().as_secs_f64();

// update transport metrics
this.metrics.inner.connection_metrics.requests_finished.increment(1);
this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);

// update call metrics
Expand All @@ -162,9 +162,9 @@ impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
{
call_metrics.time_seconds.record(elapsed);
if resp.is_success() {
call_metrics.successful.increment(1);
call_metrics.successful_total.increment(1);
} else {
call_metrics.failed.increment(1);
call_metrics.failed_total.increment(1);
}
}
}
Expand Down Expand Up @@ -202,13 +202,13 @@ impl RpcTransport {
#[metrics(scope = "rpc_server.connections")]
struct RpcServerConnectionMetrics {
/// The number of connections opened
connections_opened: Counter,
connections_opened_total: Counter,
/// The number of connections closed
connections_closed: Counter,
connections_closed_total: Counter,
/// The number of requests started
requests_started: Counter,
requests_started_total: Counter,
/// The number of requests finished
requests_finished: Counter,
requests_finished_total: Counter,
/// Response for a single request/response pair
request_time_seconds: Histogram,
}
Expand All @@ -218,11 +218,11 @@ struct RpcServerConnectionMetrics {
#[metrics(scope = "rpc_server.calls")]
struct RpcServerCallMetrics {
/// The number of calls started
started: Counter,
started_total: Counter,
/// The number of successful calls
successful: Counter,
successful_total: Counter,
/// The number of failed calls
failed: Counter,
failed_total: Counter,
/// Response for a single call
time_seconds: Histogram,
}
4 changes: 2 additions & 2 deletions crates/rpc/rpc/src/eth/cache/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) struct CacheMetrics {
/// The number of queued consumers.
pub(crate) queued_consumers_count: Gauge,
/// The number of cache hits.
pub(crate) hits: Counter,
pub(crate) hits_total: Counter,
/// The number of cache misses.
pub(crate) misses: Counter,
pub(crate) misses_total: Counter,
}
4 changes: 2 additions & 2 deletions crates/rpc/rpc/src/eth/cache/multi_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ where
pub fn get(&mut self, key: &K) -> Option<&mut V> {
let entry = self.cache.get(key);
if entry.is_some() {
self.metrics.hits.increment(1);
self.metrics.hits_total.increment(1);
} else {
self.metrics.misses.increment(1);
self.metrics.misses_total.increment(1);
}
entry
}
Expand Down
11 changes: 7 additions & 4 deletions crates/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,14 @@ impl TaskExecutor {
let on_shutdown = self.on_shutdown.clone();

// Clone only the specific counter that we need.
let finished_regular_tasks_metrics = self.metrics.finished_regular_tasks.clone();
let finished_regular_tasks_total_metrics =
self.metrics.finished_regular_tasks_total.clone();
// Wrap the original future to increment the finished tasks counter upon completion
let task = {
async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_regular_tasks_metrics);
let _inc_counter_on_drop =
IncCounterOnDrop::new(finished_regular_tasks_total_metrics);
let fut = pin!(fut);
let _ = select(on_shutdown, fut).await;
}
Expand Down Expand Up @@ -405,10 +407,11 @@ impl TaskExecutor {
.in_current_span();

// Clone only the specific counter that we need.
let finished_critical_tasks_metrics = self.metrics.finished_critical_tasks.clone();
let finished_critical_tasks_total_metrics =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the var names here map to the metrics 1-1? they do tend to become long

self.metrics.finished_critical_tasks_total.clone();
let task = async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_metrics);
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
let task = pin!(task);
let _ = select(on_shutdown, task).await;
};
Expand Down
12 changes: 6 additions & 6 deletions crates/tasks/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ use reth_metrics::{metrics::Counter, Metrics};
#[metrics(scope = "executor.spawn")]
pub struct TaskExecutorMetrics {
/// Number of spawned critical tasks
pub(crate) critical_tasks: Counter,
pub(crate) critical_tasks_total: Counter,
/// Number of finished spawned critical tasks
pub(crate) finished_critical_tasks: Counter,
pub(crate) finished_critical_tasks_total: Counter,
/// Number of spawned regular tasks
pub(crate) regular_tasks: Counter,
pub(crate) regular_tasks_total: Counter,
/// Number of finished spawned regular tasks
pub(crate) finished_regular_tasks: Counter,
pub(crate) finished_regular_tasks_total: Counter,
}

impl TaskExecutorMetrics {
/// Increments the counter for spawned critical tasks.
pub(crate) fn inc_critical_tasks(&self) {
self.critical_tasks.increment(1);
self.critical_tasks_total.increment(1);
}

/// Increments the counter for spawned regular tasks.
pub(crate) fn inc_regular_tasks(&self) {
self.regular_tasks.increment(1);
self.regular_tasks_total.increment(1);
}
}

Expand Down
4 changes: 2 additions & 2 deletions etc/grafana/dashboards/overview.json
Original file line number Diff line number Diff line change
Expand Up @@ -7342,7 +7342,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "sum(reth_rpc_server_connections_connections_opened{instance=~\"$instance\"} - reth_rpc_server_connections_connections_closed{instance=~\"$instance\"}) by (transport)",
"expr": "sum(reth_rpc_server_connections_connections_opened_total{instance=~\"$instance\"} - reth_rpc_server_connections_connections_closed_total{instance=~\"$instance\"}) by (transport)",
"format": "time_series",
"fullMetaSearch": false,
"includeNullMetadata": true,
Expand Down Expand Up @@ -7933,7 +7933,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(rate(reth_rpc_server_calls_successful{instance =~ \"$instance\"}[$__rate_interval])) by (method) > 0",
"expr": "sum(rate(reth_rpc_server_calls_successful_total{instance =~ \"$instance\"}[$__rate_interval])) by (method) > 0",
"instant": false,
"legendFormat": "{{method}}",
"range": true,
Expand Down
8 changes: 4 additions & 4 deletions etc/grafana/dashboards/reth-mempool.json
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_network_pool_transactions_messages_sent{instance=~\"$instance\"}[$__rate_interval])",
"expr": "rate(reth_network_pool_transactions_messages_sent_total{instance=~\"$instance\"}[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "Tx",
Expand All @@ -1471,7 +1471,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_network_pool_transactions_messages_received{instance=~\"$instance\"}[$__rate_interval])",
"expr": "rate(reth_network_pool_transactions_messages_received_total{instance=~\"$instance\"}[$__rate_interval])",
"hide": false,
"legendFormat": "Rx",
"range": true,
Expand All @@ -1483,7 +1483,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "reth_network_pool_transactions_messages_sent{instance=~\"$instance\"} - reth_network_pool_transactions_messages_received{instance=~\"$instance\"}",
"expr": "reth_network_pool_transactions_messages_sent_total{instance=~\"$instance\"} - reth_network_pool_transactions_messages_received_total{instance=~\"$instance\"}",
"hide": false,
"legendFormat": "Messages in Channel",
"range": true,
Expand Down Expand Up @@ -3087,7 +3087,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_network_invalid_messages_received{instance=~\"$instance\"}[$__rate_interval])",
"expr": "rate(reth_network_invalid_messages_received_total{instance=~\"$instance\"}[$__rate_interval])",
"hide": false,
"legendFormat": "Invalid Messages",
"range": true,
Expand Down
Loading