diff --git a/narwhal/worker/src/batch_maker.rs b/narwhal/worker/src/batch_maker.rs index fc45ab33b488c..6f5bc04bd0c2c 100644 --- a/narwhal/worker/src/batch_maker.rs +++ b/narwhal/worker/src/batch_maker.rs @@ -1,9 +1,11 @@ // Copyright (c) 2021, Facebook, Inc. and its affiliates // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::WorkerMetrics; use config::Committee; #[cfg(feature = "benchmark")] use std::convert::TryInto; +use std::sync::Arc; use tokio::{ sync::watch, task::JoinHandle, @@ -37,6 +39,8 @@ pub struct BatchMaker { current_batch: Batch, /// Holds the size of the current batch (in bytes). current_batch_size: usize, + /// Metrics handler + node_metrics: Arc, } impl BatchMaker { @@ -48,6 +52,7 @@ impl BatchMaker { rx_reconfigure: watch::Receiver, rx_transaction: Receiver, tx_message: Sender, + node_metrics: Arc, ) -> JoinHandle<()> { tokio::spawn(async move { Self { @@ -59,6 +64,7 @@ impl BatchMaker { tx_message, current_batch: Batch(Vec::with_capacity(batch_size * 2)), current_batch_size: 0, + node_metrics, } .run() .await; @@ -77,7 +83,7 @@ impl BatchMaker { self.current_batch_size += transaction.len(); self.current_batch.0.push(transaction); if self.current_batch_size >= self.batch_size { - self.seal().await; + self.seal(false).await; timer.as_mut().reset(Instant::now() + self.max_batch_delay); } }, @@ -85,7 +91,7 @@ impl BatchMaker { // If the timer triggers, seal the batch even if it contains few transactions. () = &mut timer => { if !self.current_batch.0.is_empty() { - self.seal().await; + self.seal(true).await; } timer.as_mut().reset(Instant::now() + self.max_batch_delay); } @@ -114,8 +120,7 @@ impl BatchMaker { } /// Seal and broadcast the current batch. - async fn seal(&mut self) { - #[cfg(feature = "benchmark")] + async fn seal(&mut self, timeout: bool) { let size = self.current_batch_size; // Look for sample txs (they all start with 0) and gather their txs id (the next 8 bytes). @@ -154,6 +159,13 @@ impl BatchMaker { } } + let reason = if timeout { "timeout" } else { "size_reached" }; + + self.node_metrics + .created_batch_size + .with_label_values(&[self.committee.epoch.to_string().as_str(), reason]) + .observe(size as f64); + // Send the batch through the deliver channel for further processing. if self.tx_message.send(batch).await.is_err() { tracing::debug!("{}", DagError::ShuttingDown); diff --git a/narwhal/worker/src/metrics.rs b/narwhal/worker/src/metrics.rs index 079e7d0935035..e6fe454db4e10 100644 --- a/narwhal/worker/src/metrics.rs +++ b/narwhal/worker/src/metrics.rs @@ -44,6 +44,8 @@ pub fn initialise_metrics(metrics_registry: &Registry) -> Metrics { pub struct WorkerMetrics { /// Number of elements pending elements in the worker synchronizer pub pending_elements_worker_synchronizer: IntGaugeVec, + /// Number of created batches from the batch_maker + pub created_batch_size: HistogramVec, } impl WorkerMetrics { @@ -56,6 +58,13 @@ impl WorkerMetrics { registry ) .unwrap(), + created_batch_size: register_histogram_vec_with_registry!( + "created_batch_size", + "Size in bytes of the created batches", + &["epoch", "reason"], + registry + ) + .unwrap(), } } } diff --git a/narwhal/worker/src/tests/batch_maker_tests.rs b/narwhal/worker/src/tests/batch_maker_tests.rs index 50597aabc0660..d8816ca4346df 100644 --- a/narwhal/worker/src/tests/batch_maker_tests.rs +++ b/narwhal/worker/src/tests/batch_maker_tests.rs @@ -2,6 +2,7 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use super::*; +use prometheus::Registry; use test_utils::{committee, transaction}; #[tokio::test] @@ -11,6 +12,7 @@ async fn make_batch() { watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_transaction, rx_transaction) = test_utils::test_channel!(1); let (tx_message, mut rx_message) = test_utils::test_channel!(1); + let node_metrics = WorkerMetrics::new(&Registry::new()); // Spawn a `BatchMaker` instance. let _batch_maker_handle = BatchMaker::spawn( @@ -21,6 +23,7 @@ async fn make_batch() { rx_reconfiguration, rx_transaction, tx_message, + Arc::new(node_metrics), ); // Send enough transactions to seal a batch. @@ -41,6 +44,7 @@ async fn batch_timeout() { watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_transaction, rx_transaction) = test_utils::test_channel!(1); let (tx_message, mut rx_message) = test_utils::test_channel!(1); + let node_metrics = WorkerMetrics::new(&Registry::new()); // Spawn a `BatchMaker` instance. let _batch_maker_handle = BatchMaker::spawn( @@ -51,6 +55,7 @@ async fn batch_timeout() { rx_reconfiguration, rx_transaction, tx_message, + Arc::new(node_metrics), ); // Do not send enough transactions to seal a batch. diff --git a/narwhal/worker/src/worker.rs b/narwhal/worker/src/worker.rs index d87bd9c306f67..538cdf87f2ab1 100644 --- a/narwhal/worker/src/worker.rs +++ b/narwhal/worker/src/worker.rs @@ -89,6 +89,7 @@ impl Worker { let client_flow_handles = worker.handle_clients_transactions( &tx_reconfigure, tx_primary.clone(), + node_metrics.clone(), channel_metrics.clone(), endpoint_metrics, network_metrics.clone(), @@ -188,6 +189,7 @@ impl Worker { &self, tx_reconfigure: &watch::Sender, tx_primary: Sender, + node_metrics: Arc, channel_metrics: Arc, endpoint_metrics: WorkerEndpointMetrics, network_metrics: Arc, @@ -225,6 +227,7 @@ impl Worker { tx_reconfigure.subscribe(), /* rx_transaction */ rx_batch_maker, /* tx_message */ tx_quorum_waiter, + node_metrics, ); // The `QuorumWaiter` waits for 2f authorities to acknowledge reception of the batch. It then forwards