Skip to content

Commit

Permalink
[metrics] adding metrics for batch maker (MystenLabs#728)
Browse files Browse the repository at this point in the history
Adding metrics for the size and number of batches produced
  • Loading branch information
akichidis authored Aug 9, 2022
1 parent d46b07b commit 6eaf8ef
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 4 deletions.
20 changes: 16 additions & 4 deletions narwhal/worker/src/batch_maker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::metrics::WorkerMetrics;
use config::Committee;
#[cfg(feature = "benchmark")]
use std::convert::TryInto;
use std::sync::Arc;
use tokio::{
sync::watch,
task::JoinHandle,
Expand Down Expand Up @@ -37,6 +39,8 @@ pub struct BatchMaker {
current_batch: Batch,
/// Holds the size of the current batch (in bytes).
current_batch_size: usize,
/// Metrics handler
node_metrics: Arc<WorkerMetrics>,
}

impl BatchMaker {
Expand All @@ -48,6 +52,7 @@ impl BatchMaker {
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_transaction: Receiver<Transaction>,
tx_message: Sender<Batch>,
node_metrics: Arc<WorkerMetrics>,
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
Expand All @@ -59,6 +64,7 @@ impl BatchMaker {
tx_message,
current_batch: Batch(Vec::with_capacity(batch_size * 2)),
current_batch_size: 0,
node_metrics,
}
.run()
.await;
Expand All @@ -77,15 +83,15 @@ impl BatchMaker {
self.current_batch_size += transaction.len();
self.current_batch.0.push(transaction);
if self.current_batch_size >= self.batch_size {
self.seal().await;
self.seal(false).await;
timer.as_mut().reset(Instant::now() + self.max_batch_delay);
}
},

// If the timer triggers, seal the batch even if it contains few transactions.
() = &mut timer => {
if !self.current_batch.0.is_empty() {
self.seal().await;
self.seal(true).await;
}
timer.as_mut().reset(Instant::now() + self.max_batch_delay);
}
Expand Down Expand Up @@ -114,8 +120,7 @@ impl BatchMaker {
}

/// Seal and broadcast the current batch.
async fn seal(&mut self) {
#[cfg(feature = "benchmark")]
async fn seal(&mut self, timeout: bool) {
let size = self.current_batch_size;

// Look for sample txs (they all start with 0) and gather their txs id (the next 8 bytes).
Expand Down Expand Up @@ -154,6 +159,13 @@ impl BatchMaker {
}
}

let reason = if timeout { "timeout" } else { "size_reached" };

self.node_metrics
.created_batch_size
.with_label_values(&[self.committee.epoch.to_string().as_str(), reason])
.observe(size as f64);

// Send the batch through the deliver channel for further processing.
if self.tx_message.send(batch).await.is_err() {
tracing::debug!("{}", DagError::ShuttingDown);
Expand Down
9 changes: 9 additions & 0 deletions narwhal/worker/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub fn initialise_metrics(metrics_registry: &Registry) -> Metrics {
pub struct WorkerMetrics {
/// Number of elements pending elements in the worker synchronizer
pub pending_elements_worker_synchronizer: IntGaugeVec,
/// Number of created batches from the batch_maker
pub created_batch_size: HistogramVec,
}

impl WorkerMetrics {
Expand All @@ -56,6 +58,13 @@ impl WorkerMetrics {
registry
)
.unwrap(),
created_batch_size: register_histogram_vec_with_registry!(
"created_batch_size",
"Size in bytes of the created batches",
&["epoch", "reason"],
registry
)
.unwrap(),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions narwhal/worker/src/tests/batch_maker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use super::*;
use prometheus::Registry;
use test_utils::{committee, transaction};

#[tokio::test]
Expand All @@ -11,6 +12,7 @@ async fn make_batch() {
watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
let (tx_transaction, rx_transaction) = test_utils::test_channel!(1);
let (tx_message, mut rx_message) = test_utils::test_channel!(1);
let node_metrics = WorkerMetrics::new(&Registry::new());

// Spawn a `BatchMaker` instance.
let _batch_maker_handle = BatchMaker::spawn(
Expand All @@ -21,6 +23,7 @@ async fn make_batch() {
rx_reconfiguration,
rx_transaction,
tx_message,
Arc::new(node_metrics),
);

// Send enough transactions to seal a batch.
Expand All @@ -41,6 +44,7 @@ async fn batch_timeout() {
watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
let (tx_transaction, rx_transaction) = test_utils::test_channel!(1);
let (tx_message, mut rx_message) = test_utils::test_channel!(1);
let node_metrics = WorkerMetrics::new(&Registry::new());

// Spawn a `BatchMaker` instance.
let _batch_maker_handle = BatchMaker::spawn(
Expand All @@ -51,6 +55,7 @@ async fn batch_timeout() {
rx_reconfiguration,
rx_transaction,
tx_message,
Arc::new(node_metrics),
);

// Do not send enough transactions to seal a batch.
Expand Down
3 changes: 3 additions & 0 deletions narwhal/worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl Worker {
let client_flow_handles = worker.handle_clients_transactions(
&tx_reconfigure,
tx_primary.clone(),
node_metrics.clone(),
channel_metrics.clone(),
endpoint_metrics,
network_metrics.clone(),
Expand Down Expand Up @@ -188,6 +189,7 @@ impl Worker {
&self,
tx_reconfigure: &watch::Sender<ReconfigureNotification>,
tx_primary: Sender<WorkerPrimaryMessage>,
node_metrics: Arc<WorkerMetrics>,
channel_metrics: Arc<WorkerChannelMetrics>,
endpoint_metrics: WorkerEndpointMetrics,
network_metrics: Arc<WorkerNetworkMetrics>,
Expand Down Expand Up @@ -225,6 +227,7 @@ impl Worker {
tx_reconfigure.subscribe(),
/* rx_transaction */ rx_batch_maker,
/* tx_message */ tx_quorum_waiter,
node_metrics,
);

// The `QuorumWaiter` waits for 2f authorities to acknowledge reception of the batch. It then forwards
Expand Down

0 comments on commit 6eaf8ef

Please sign in to comment.