Skip to content

Commit

Permalink
Bullshark (#385)
Browse files Browse the repository at this point in the history
Implement Bullshark
  • Loading branch information
asonnino authored Jun 22, 2022
1 parent a403183 commit 2d1ffe3
Show file tree
Hide file tree
Showing 24 changed files with 1,283 additions and 359 deletions.
7 changes: 7 additions & 0 deletions narwhal/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ impl<PublicKey: VerifyingKey> Committee<PublicKey> {
(total_votes + 2) / 3
}

/// Returns a leader node in a round-robin fashion.
pub fn leader(&self, seed: usize) -> PublicKey {
let mut keys: Vec<_> = self.authorities.load().keys().cloned().collect();
keys.sort();
keys[seed % self.size()].clone()
}

/// Returns the primary addresses of the target primary.
pub fn primary(&self, to: &PublicKey) -> Result<PrimaryAddresses, ConfigError> {
self.authorities
Expand Down
18 changes: 11 additions & 7 deletions narwhal/consensus/benches/process_certificates.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use consensus::tusk::*;
use consensus::{
bullshark::Bullshark,
consensus::{ConsensusProtocol, ConsensusState},
};
use criterion::{
criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode, Throughput,
};
Expand Down Expand Up @@ -35,25 +38,26 @@ pub fn process_certificates(c: &mut Criterion) {
let store_path = temp_dir();
let store = make_consensus_store(&store_path);

let mut state =
consensus::tusk::State::new(Certificate::genesis(&mock_committee(&keys[..])));
let mut state = ConsensusState::new(Certificate::genesis(&mock_committee(&keys[..])));

let data_size: usize = certificates
.iter()
.map(|cert| bincode::serialize(&cert).unwrap().len())
.sum();
consensus_group.throughput(Throughput::Bytes(data_size as u64));

let mut ordering_engine = Bullshark {
committee,
store,
gc_depth,
};
consensus_group.bench_with_input(
BenchmarkId::new("batched", certificates.len()),
&certificates,
|b, i| {
b.iter(|| {
for cert in i {
let _ = Consensus::process_certificate(
&committee,
&store,
gc_depth,
let _ = ordering_engine.process_certificate(
&mut state,
/* consensus_index */ 0,
cert.clone(),
Expand Down
150 changes: 150 additions & 0 deletions narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
consensus::{ConsensusProtocol, ConsensusState, Dag},
utils, ConsensusOutput,
};
use config::{Committee, SharedCommittee, Stake};
use crypto::{
traits::{EncodeDecodeBase64, VerifyingKey},
Hash,
};
use std::{collections::HashMap, sync::Arc};
use tracing::debug;
use types::{Certificate, CertificateDigest, ConsensusStore, Round, SequenceNumber, StoreResult};

#[cfg(test)]
#[path = "tests/bullshark_tests.rs"]
pub mod bullshark_tests;

pub struct Bullshark<PublicKey: VerifyingKey> {
/// The committee information.
pub committee: SharedCommittee<PublicKey>,
/// Persistent storage to safe ensure crash-recovery.
pub store: Arc<ConsensusStore<PublicKey>>,
/// The depth of the garbage collector.
pub gc_depth: Round,
}

impl<PublicKey: VerifyingKey> ConsensusProtocol<PublicKey> for Bullshark<PublicKey> {
fn process_certificate(
&mut self,
state: &mut ConsensusState<PublicKey>,
consensus_index: SequenceNumber,
certificate: Certificate<PublicKey>,
) -> StoreResult<Vec<ConsensusOutput<PublicKey>>> {
debug!("Processing {:?}", certificate);
let round = certificate.round();
let mut consensus_index = consensus_index;

// Add the new certificate to the local storage.
state
.dag
.entry(round)
.or_insert_with(HashMap::new)
.insert(certificate.origin(), (certificate.digest(), certificate));

// Try to order the dag to commit. Start from the highest round for which we have at least
// f+1 certificates. This is because we need them to reveal the common coin.
let r = round - 1;

// We only elect leaders for even round numbers.
if r % 2 != 0 || r < 2 {
return Ok(Vec::new());
}

// Get the certificate's digest of the leader. If we already ordered this leader,
// there is nothing to do.
let leader_round = r;
if leader_round <= state.last_committed_round {
return Ok(Vec::new());
}
let (leader_digest, leader) = match Self::leader(&self.committee, leader_round, &state.dag)
{
Some(x) => x,
None => return Ok(Vec::new()),
};

// Check if the leader has f+1 support from its children (ie. round r-1).
let stake: Stake = state
.dag
.get(&round)
.expect("We should have the whole history by now")
.values()
.filter(|(_, x)| x.header.parents.contains(leader_digest))
.map(|(_, x)| self.committee.stake(&x.origin()))
.sum();

// If it is the case, we can commit the leader. But first, we need to recursively go back to
// the last committed leader, and commit all preceding leaders in the right order. Committing
// a leader block means committing all its dependencies.
if stake < self.committee.validity_threshold() {
debug!("Leader {:?} does not have enough support", leader);
return Ok(Vec::new());
}

// Get an ordered list of past leaders that are linked to the current leader.
debug!("Leader {:?} has enough support", leader);
let mut sequence = Vec::new();
for leader in utils::order_leaders(&self.committee, leader, state, Self::leader)
.iter()
.rev()
{
// Starting from the oldest leader, flatten the sub-dag referenced by the leader.
for x in utils::order_dag(self.gc_depth, leader, state) {
let digest = x.digest();

// Update and clean up internal state.
state.update(&x, self.gc_depth);

// Add the certificate to the sequence.
sequence.push(ConsensusOutput {
certificate: x,
consensus_index,
});

// Increase the global consensus index.
consensus_index += 1;

// Persist the update.
// TODO [issue #116]: Ensure this is not a performance bottleneck.
self.store.write_consensus_state(
&state.last_committed,
&consensus_index,
&digest,
)?;
}
}

// Log the latest committed round of every authority (for debug).
// Performance note: if tracing at the debug log level is disabled, this is cheap, see
// https://github.com/tokio-rs/tracing/pull/326
for (name, round) in &state.last_committed {
debug!("Latest commit of {}: Round {}", name.encode_base64(), round);
}

Ok(sequence)
}
}

impl<PublicKey: VerifyingKey> Bullshark<PublicKey> {
/// Returns the certificate (and the certificate's digest) originated by the leader of the
/// specified round (if any).
fn leader<'a>(
committee: &Committee<PublicKey>,
round: Round,
dag: &'a Dag<PublicKey>,
) -> Option<&'a (CertificateDigest, Certificate<PublicKey>)> {
#[cfg(test)]
let seed = 0;
#[cfg(not(test))]
let seed = round;

// Elect the leader in a round-robin fashion.
let leader = committee.leader(seed as usize);

// Return its certificate and the certificate's digest.
dag.get(&round).and_then(|x| x.get(&leader))
}
}
174 changes: 174 additions & 0 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{ConsensusOutput, SequenceNumber};
use config::SharedCommittee;
use crypto::{traits::VerifyingKey, Hash};
use std::{cmp::max, collections::HashMap, sync::Arc};
use tokio::{
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use types::{Certificate, CertificateDigest, ConsensusStore, Round, StoreResult};

/// The representation of the DAG in memory.
pub type Dag<PublicKey> =
HashMap<Round, HashMap<PublicKey, (CertificateDigest, Certificate<PublicKey>)>>;

/// The state that needs to be persisted for crash-recovery.
pub struct ConsensusState<PublicKey: VerifyingKey> {
/// The last committed round.
pub last_committed_round: Round,
// Keeps the last committed round for each authority. This map is used to clean up the dag and
// ensure we don't commit twice the same certificate.
pub last_committed: HashMap<PublicKey, Round>,
/// Keeps the latest committed certificate (and its parents) for every authority. Anything older
/// must be regularly cleaned up through the function `update`.
pub dag: Dag<PublicKey>,
}

impl<PublicKey: VerifyingKey> ConsensusState<PublicKey> {
pub fn new(genesis: Vec<Certificate<PublicKey>>) -> Self {
let genesis = genesis
.into_iter()
.map(|x| (x.origin(), (x.digest(), x)))
.collect::<HashMap<_, _>>();

Self {
last_committed_round: 0,
last_committed: genesis
.iter()
.map(|(x, (_, y))| (x.clone(), y.round()))
.collect(),
dag: [(0, genesis)]
.iter()
.cloned()
.collect::<HashMap<_, HashMap<_, _>>>(),
}
}

/// Update and clean up internal state base on committed certificates.
pub fn update(&mut self, certificate: &Certificate<PublicKey>, gc_depth: Round) {
self.last_committed
.entry(certificate.origin())
.and_modify(|r| *r = max(*r, certificate.round()))
.or_insert_with(|| certificate.round());

let last_committed_round = *std::iter::Iterator::max(self.last_committed.values()).unwrap();
self.last_committed_round = last_committed_round;

// We purge all certificates past the gc depth
self.dag.retain(|r, _| r + gc_depth >= last_committed_round);
for (name, round) in &self.last_committed {
self.dag.retain(|r, authorities| {
// We purge certificates for `name` prior to its latest commit
if r < round {
authorities.retain(|n, _| n != name);
}
!authorities.is_empty()
});
}
}
}

/// Describe how to sequence input certificates.
pub trait ConsensusProtocol<PublicKey: VerifyingKey> {
fn process_certificate(
&mut self,
// The state of the consensus protocol.
state: &mut ConsensusState<PublicKey>,
// The latest consensus index.
consensus_index: SequenceNumber,
// The new certificate.
certificate: Certificate<PublicKey>,
) -> StoreResult<Vec<ConsensusOutput<PublicKey>>>;
}

pub struct Consensus<PublicKey: VerifyingKey, ConsensusProtocol> {
/// Receives new certificates from the primary. The primary should send us new certificates only
/// if it already sent us its whole history.
rx_primary: Receiver<Certificate<PublicKey>>,
/// Outputs the sequence of ordered certificates to the primary (for cleanup and feedback).
tx_primary: Sender<Certificate<PublicKey>>,
/// Outputs the sequence of ordered certificates to the application layer.
tx_output: Sender<ConsensusOutput<PublicKey>>,

/// The genesis certificates.
genesis: Vec<Certificate<PublicKey>>,
/// The (global) consensus index. We assign one index to each sequenced certificate. this is
/// helpful for clients.
consensus_index: SequenceNumber,

/// The consensus protocol to run.
protocol: ConsensusProtocol,
}

impl<PublicKey, Protocol> Consensus<PublicKey, Protocol>
where
PublicKey: VerifyingKey,
Protocol: ConsensusProtocol<PublicKey> + Send + 'static,
{
pub fn spawn(
committee: SharedCommittee<PublicKey>,
store: Arc<ConsensusStore<PublicKey>>,
rx_primary: Receiver<Certificate<PublicKey>>,
tx_primary: Sender<Certificate<PublicKey>>,
tx_output: Sender<ConsensusOutput<PublicKey>>,
protocol: Protocol,
) -> JoinHandle<StoreResult<()>> {
tokio::spawn(async move {
let consensus_index = store.read_last_consensus_index()?;
let genesis = Certificate::genesis(&committee);
Self {
rx_primary,
tx_primary,
tx_output,
genesis,
consensus_index,
protocol,
}
.run()
.await
})
}

async fn run(&mut self) -> StoreResult<()> {
// The consensus state (everything else is immutable).
let mut state = ConsensusState::new(self.genesis.clone());

// Listen to incoming certificates.
while let Some(certificate) = self.rx_primary.recv().await {
let sequence =
self.protocol
.process_certificate(&mut state, self.consensus_index, certificate)?;

// Update the consensus index.
self.consensus_index += sequence.len() as u64;

// Output the sequence in the right order.
for output in sequence {
let certificate = &output.certificate;
#[cfg(not(feature = "benchmark"))]
if output.consensus_index % 5_000 == 0 {
tracing::debug!("Committed {}", certificate.header);
}

#[cfg(feature = "benchmark")]
for digest in certificate.header.payload.keys() {
// NOTE: This log entry is used to compute performance.
tracing::info!("Committed {} -> {:?}", certificate.header, digest);
}

self.tx_primary
.send(certificate.clone())
.await
.expect("Failed to send certificate to primary");

if let Err(e) = self.tx_output.send(output).await {
tracing::warn!("Failed to output certificate: {e}");
}
}
}
Ok(())
}
}
Loading

0 comments on commit 2d1ffe3

Please sign in to comment.