From 07c78a8b0084d259b7f59f56f2aafece6118230f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Anda=20Estensen?= Date: Tue, 14 May 2024 21:18:40 +0200 Subject: [PATCH 1/2] chore: add _total suffix to counter metrics --- .../beacon/src/engine/hooks/prune.rs | 4 +- crates/metrics/src/common/mpsc.rs | 38 +++++++++---------- crates/rpc/rpc-builder/src/metrics.rs | 28 +++++++------- crates/rpc/rpc/src/eth/cache/metrics.rs | 4 +- .../rpc/rpc/src/eth/cache/multi_consumer.rs | 4 +- crates/tasks/src/lib.rs | 11 ++++-- crates/tasks/src/metrics.rs | 12 +++--- 7 files changed, 52 insertions(+), 49 deletions(-) diff --git a/crates/consensus/beacon/src/engine/hooks/prune.rs b/crates/consensus/beacon/src/engine/hooks/prune.rs index a9bb4f05bd42..9ab4e893dc6a 100644 --- a/crates/consensus/beacon/src/engine/hooks/prune.rs +++ b/crates/consensus/beacon/src/engine/hooks/prune.rs @@ -99,7 +99,7 @@ impl PruneHook { let _ = tx.send((pruner, result)); }), ); - self.metrics.runs.increment(1); + self.metrics.runs_total.increment(1); self.pruner_state = PrunerState::Running(rx); Some(EngineHookEvent::Started) @@ -160,7 +160,7 @@ enum PrunerState { #[metrics(scope = "consensus.engine.prune")] struct Metrics { /// The number of times the pruner was run. - runs: Counter, + runs_total: Counter, } impl From for EngineHookError { diff --git a/crates/metrics/src/common/mpsc.rs b/crates/metrics/src/common/mpsc.rs index 98c670ef7990..40c6c2359282 100644 --- a/crates/metrics/src/common/mpsc.rs +++ b/crates/metrics/src/common/mpsc.rs @@ -55,11 +55,11 @@ impl UnboundedMeteredSender { pub fn send(&self, message: T) -> Result<(), SendError> { match self.sender.send(message) { Ok(()) => { - self.metrics.messages_sent.increment(1); + self.metrics.messages_sent_total.increment(1); Ok(()) } Err(error) => { - self.metrics.send_errors.increment(1); + self.metrics.send_errors_total.increment(1); Err(error) } } @@ -94,7 +94,7 @@ impl UnboundedMeteredReceiver { pub async fn recv(&mut self) -> Option { let msg = self.receiver.recv().await; if msg.is_some() { - self.metrics.messages_received.increment(1); + self.metrics.messages_received_total.increment(1); } msg } @@ -102,7 +102,7 @@ impl UnboundedMeteredReceiver { /// Tries to receive the next value for this receiver. pub fn try_recv(&mut self) -> Result { let msg = self.receiver.try_recv()?; - self.metrics.messages_received.increment(1); + self.metrics.messages_received_total.increment(1); Ok(msg) } @@ -115,7 +115,7 @@ impl UnboundedMeteredReceiver { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { let msg = ready!(self.receiver.poll_recv(cx)); if msg.is_some() { - self.metrics.messages_received.increment(1); + self.metrics.messages_received_total.increment(1); } Poll::Ready(msg) } @@ -161,11 +161,11 @@ impl MeteredSender { pub fn try_send(&self, message: T) -> Result<(), TrySendError> { match self.sender.try_send(message) { Ok(()) => { - self.metrics.messages_sent.increment(1); + self.metrics.messages_sent_total.increment(1); Ok(()) } Err(error) => { - self.metrics.send_errors.increment(1); + self.metrics.send_errors_total.increment(1); Err(error) } } @@ -176,11 +176,11 @@ impl MeteredSender { pub async fn send(&self, value: T) -> Result<(), SendError> { match self.sender.send(value).await { Ok(()) => { - self.metrics.messages_sent.increment(1); + self.metrics.messages_sent_total.increment(1); Ok(()) } Err(error) => { - self.metrics.send_errors.increment(1); + self.metrics.send_errors_total.increment(1); Err(error) } } @@ -214,7 +214,7 @@ impl MeteredReceiver { pub async fn recv(&mut self) -> Option { let msg = self.receiver.recv().await; if msg.is_some() { - self.metrics.messages_received.increment(1); + self.metrics.messages_received_total.increment(1); } msg } @@ -222,7 +222,7 @@ impl MeteredReceiver { /// Tries to receive the next value for this receiver. pub fn try_recv(&mut self) -> Result { let msg = self.receiver.try_recv()?; - self.metrics.messages_received.increment(1); + self.metrics.messages_received_total.increment(1); Ok(msg) } @@ -235,7 +235,7 @@ impl MeteredReceiver { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { let msg = ready!(self.receiver.poll_recv(cx)); if msg.is_some() { - self.metrics.messages_received.increment(1); + self.metrics.messages_received_total.increment(1); } Poll::Ready(msg) } @@ -254,9 +254,9 @@ impl Stream for MeteredReceiver { #[metrics(dynamic = true)] struct MeteredSenderMetrics { /// Number of messages sent - messages_sent: Counter, + messages_sent_total: Counter, /// Number of failed message deliveries - send_errors: Counter, + send_errors_total: Counter, } /// Throughput metrics for [MeteredReceiver] @@ -264,7 +264,7 @@ struct MeteredSenderMetrics { #[metrics(dynamic = true)] struct MeteredReceiverMetrics { /// Number of messages received - messages_received: Counter, + messages_received_total: Counter, } /// A wrapper type around [PollSender] that updates metrics on send. @@ -294,7 +294,7 @@ impl MeteredPollSender { Poll::Ready(Ok(permit)) => Poll::Ready(Ok(permit)), Poll::Ready(Err(error)) => Poll::Ready(Err(error)), Poll::Pending => { - self.metrics.back_pressure.increment(1); + self.metrics.back_pressure_total.increment(1); Poll::Pending } } @@ -305,7 +305,7 @@ impl MeteredPollSender { pub fn send_item(&mut self, item: T) -> Result<(), PollSendError> { match self.sender.send_item(item) { Ok(()) => { - self.metrics.messages_sent.increment(1); + self.metrics.messages_sent_total.increment(1); Ok(()) } Err(error) => Err(error), @@ -324,7 +324,7 @@ impl Clone for MeteredPollSender { #[metrics(dynamic = true)] struct MeteredPollSenderMetrics { /// Number of messages sent - messages_sent: Counter, + messages_sent_total: Counter, /// Number of delayed message deliveries caused by a full channel - back_pressure: Counter, + back_pressure_total: Counter, } diff --git a/crates/rpc/rpc-builder/src/metrics.rs b/crates/rpc/rpc-builder/src/metrics.rs index a0285622beed..dad031d8f5bc 100644 --- a/crates/rpc/rpc-builder/src/metrics.rs +++ b/crates/rpc/rpc-builder/src/metrics.rs @@ -90,7 +90,7 @@ pub(crate) struct RpcRequestMetricsService { impl RpcRequestMetricsService { pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self { // this instance is kept alive for the duration of the connection - metrics.inner.connection_metrics.connections_opened.increment(1); + metrics.inner.connection_metrics.connections_opened_total.increment(1); Self { inner: service, metrics } } } @@ -102,10 +102,10 @@ where type Future = MeteredRequestFuture; fn call(&self, req: Request<'a>) -> Self::Future { - self.metrics.inner.connection_metrics.requests_started.increment(1); + self.metrics.inner.connection_metrics.requests_started_total.increment(1); let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref()); if let Some((_, call_metrics)) = &call_metrics { - call_metrics.started.increment(1); + call_metrics.started_total.increment(1); } MeteredRequestFuture { fut: self.inner.call(req), @@ -119,7 +119,7 @@ where impl Drop for RpcRequestMetricsService { fn drop(&mut self) { // update connection metrics, connection closed - self.metrics.inner.connection_metrics.connections_closed.increment(1); + self.metrics.inner.connection_metrics.connections_closed_total.increment(1); } } @@ -153,7 +153,7 @@ impl> Future for MeteredRequestFuture { let elapsed = this.started_at.elapsed().as_secs_f64(); // update transport metrics - this.metrics.inner.connection_metrics.requests_finished.increment(1); + this.metrics.inner.connection_metrics.requests_finished_total.increment(1); this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed); // update call metrics @@ -162,9 +162,9 @@ impl> Future for MeteredRequestFuture { { call_metrics.time_seconds.record(elapsed); if resp.is_success() { - call_metrics.successful.increment(1); + call_metrics.successful_total.increment(1); } else { - call_metrics.failed.increment(1); + call_metrics.failed_total.increment(1); } } } @@ -202,13 +202,13 @@ impl RpcTransport { #[metrics(scope = "rpc_server.connections")] struct RpcServerConnectionMetrics { /// The number of connections opened - connections_opened: Counter, + connections_opened_total: Counter, /// The number of connections closed - connections_closed: Counter, + connections_closed_total: Counter, /// The number of requests started - requests_started: Counter, + requests_started_total: Counter, /// The number of requests finished - requests_finished: Counter, + requests_finished_total: Counter, /// Response for a single request/response pair request_time_seconds: Histogram, } @@ -218,11 +218,11 @@ struct RpcServerConnectionMetrics { #[metrics(scope = "rpc_server.calls")] struct RpcServerCallMetrics { /// The number of calls started - started: Counter, + started_total: Counter, /// The number of successful calls - successful: Counter, + successful_total: Counter, /// The number of failed calls - failed: Counter, + failed_total: Counter, /// Response for a single call time_seconds: Histogram, } diff --git a/crates/rpc/rpc/src/eth/cache/metrics.rs b/crates/rpc/rpc/src/eth/cache/metrics.rs index eb8058f3969f..c9b18a299da3 100644 --- a/crates/rpc/rpc/src/eth/cache/metrics.rs +++ b/crates/rpc/rpc/src/eth/cache/metrics.rs @@ -9,7 +9,7 @@ pub(crate) struct CacheMetrics { /// The number of queued consumers. pub(crate) queued_consumers_count: Gauge, /// The number of cache hits. - pub(crate) hits: Counter, + pub(crate) hits_total: Counter, /// The number of cache misses. - pub(crate) misses: Counter, + pub(crate) misses_total: Counter, } diff --git a/crates/rpc/rpc/src/eth/cache/multi_consumer.rs b/crates/rpc/rpc/src/eth/cache/multi_consumer.rs index d64eeb0a0cf1..0293840f24a0 100644 --- a/crates/rpc/rpc/src/eth/cache/multi_consumer.rs +++ b/crates/rpc/rpc/src/eth/cache/multi_consumer.rs @@ -72,9 +72,9 @@ where pub fn get(&mut self, key: &K) -> Option<&mut V> { let entry = self.cache.get(key); if entry.is_some() { - self.metrics.hits.increment(1); + self.metrics.hits_total.increment(1); } else { - self.metrics.misses.increment(1); + self.metrics.misses_total.increment(1); } entry } diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 3e526a344f8c..589d0ae5d5c3 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -328,12 +328,14 @@ impl TaskExecutor { let on_shutdown = self.on_shutdown.clone(); // Clone only the specific counter that we need. - let finished_regular_tasks_metrics = self.metrics.finished_regular_tasks.clone(); + let finished_regular_tasks_total_metrics = + self.metrics.finished_regular_tasks_total.clone(); // Wrap the original future to increment the finished tasks counter upon completion let task = { async move { // Create an instance of IncCounterOnDrop with the counter to increment - let _inc_counter_on_drop = IncCounterOnDrop::new(finished_regular_tasks_metrics); + let _inc_counter_on_drop = + IncCounterOnDrop::new(finished_regular_tasks_total_metrics); let fut = pin!(fut); let _ = select(on_shutdown, fut).await; } @@ -405,10 +407,11 @@ impl TaskExecutor { .in_current_span(); // Clone only the specific counter that we need. - let finished_critical_tasks_metrics = self.metrics.finished_critical_tasks.clone(); + let finished_critical_tasks_total_metrics = + self.metrics.finished_critical_tasks_total.clone(); let task = async move { // Create an instance of IncCounterOnDrop with the counter to increment - let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_metrics); + let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics); let task = pin!(task); let _ = select(on_shutdown, task).await; }; diff --git a/crates/tasks/src/metrics.rs b/crates/tasks/src/metrics.rs index 127783cf0bd2..1dc42f517a3a 100644 --- a/crates/tasks/src/metrics.rs +++ b/crates/tasks/src/metrics.rs @@ -9,24 +9,24 @@ use reth_metrics::{metrics::Counter, Metrics}; #[metrics(scope = "executor.spawn")] pub struct TaskExecutorMetrics { /// Number of spawned critical tasks - pub(crate) critical_tasks: Counter, + pub(crate) critical_tasks_total: Counter, /// Number of finished spawned critical tasks - pub(crate) finished_critical_tasks: Counter, + pub(crate) finished_critical_tasks_total: Counter, /// Number of spawned regular tasks - pub(crate) regular_tasks: Counter, + pub(crate) regular_tasks_total: Counter, /// Number of finished spawned regular tasks - pub(crate) finished_regular_tasks: Counter, + pub(crate) finished_regular_tasks_total: Counter, } impl TaskExecutorMetrics { /// Increments the counter for spawned critical tasks. pub(crate) fn inc_critical_tasks(&self) { - self.critical_tasks.increment(1); + self.critical_tasks_total.increment(1); } /// Increments the counter for spawned regular tasks. pub(crate) fn inc_regular_tasks(&self) { - self.regular_tasks.increment(1); + self.regular_tasks_total.increment(1); } } From 1b736b3c3b9c9a0209ca2146154b2a91263fb6e8 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 21 May 2024 12:05:10 +0200 Subject: [PATCH 2/2] update grafana --- etc/grafana/dashboards/overview.json | 4 ++-- etc/grafana/dashboards/reth-mempool.json | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/etc/grafana/dashboards/overview.json b/etc/grafana/dashboards/overview.json index eacc3a25c8e5..e1bc6ddbc923 100644 --- a/etc/grafana/dashboards/overview.json +++ b/etc/grafana/dashboards/overview.json @@ -7342,7 +7342,7 @@ }, "disableTextWrap": false, "editorMode": "code", - "expr": "sum(reth_rpc_server_connections_connections_opened{instance=~\"$instance\"} - reth_rpc_server_connections_connections_closed{instance=~\"$instance\"}) by (transport)", + "expr": "sum(reth_rpc_server_connections_connections_opened_total{instance=~\"$instance\"} - reth_rpc_server_connections_connections_closed_total{instance=~\"$instance\"}) by (transport)", "format": "time_series", "fullMetaSearch": false, "includeNullMetadata": true, @@ -7933,7 +7933,7 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "sum(rate(reth_rpc_server_calls_successful{instance =~ \"$instance\"}[$__rate_interval])) by (method) > 0", + "expr": "sum(rate(reth_rpc_server_calls_successful_total{instance =~ \"$instance\"}[$__rate_interval])) by (method) > 0", "instant": false, "legendFormat": "{{method}}", "range": true, diff --git a/etc/grafana/dashboards/reth-mempool.json b/etc/grafana/dashboards/reth-mempool.json index 3ba499a9a4fb..ff45a5bf03a8 100644 --- a/etc/grafana/dashboards/reth-mempool.json +++ b/etc/grafana/dashboards/reth-mempool.json @@ -1458,7 +1458,7 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "builder", - "expr": "rate(reth_network_pool_transactions_messages_sent{instance=~\"$instance\"}[$__rate_interval])", + "expr": "rate(reth_network_pool_transactions_messages_sent_total{instance=~\"$instance\"}[$__rate_interval])", "hide": false, "instant": false, "legendFormat": "Tx", @@ -1471,7 +1471,7 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "builder", - "expr": "rate(reth_network_pool_transactions_messages_received{instance=~\"$instance\"}[$__rate_interval])", + "expr": "rate(reth_network_pool_transactions_messages_received_total{instance=~\"$instance\"}[$__rate_interval])", "hide": false, "legendFormat": "Rx", "range": true, @@ -1483,7 +1483,7 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "reth_network_pool_transactions_messages_sent{instance=~\"$instance\"} - reth_network_pool_transactions_messages_received{instance=~\"$instance\"}", + "expr": "reth_network_pool_transactions_messages_sent_total{instance=~\"$instance\"} - reth_network_pool_transactions_messages_received_total{instance=~\"$instance\"}", "hide": false, "legendFormat": "Messages in Channel", "range": true, @@ -3087,7 +3087,7 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "builder", - "expr": "rate(reth_network_invalid_messages_received{instance=~\"$instance\"}[$__rate_interval])", + "expr": "rate(reth_network_invalid_messages_received_total{instance=~\"$instance\"}[$__rate_interval])", "hide": false, "legendFormat": "Invalid Messages", "range": true,