Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to receive N previously-processed consensus commits at startup #21310

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use arc_swap::ArcSwap;
use consensus_config::Committee as ConsensusCommittee;
use consensus_core::{CommitConsumerMonitor, TransactionIndex, VerifiedBlock};
use consensus_core::{CommitConsumerMonitor, CommitIndex, TransactionIndex, VerifiedBlock};
use lru::LruCache;
use mysten_common::debug_fatal;
use mysten_metrics::{
Expand Down Expand Up @@ -212,6 +212,15 @@ impl<C> ConsensusHandler<C> {
}

impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
/// Called during startup to allow us to observe commits we previously processed, for crash recovery.
/// Any state computed here must be a pure function of the commits observed, it cannot depend on any
/// state recorded in the epoch db.
fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
// TODO: this will be used to recover state computed from previous commits at startup.
let round = consensus_commit.leader_round();
info!("Ignoring prior consensus commit for round {:?}", round);
}

#[instrument(level = "debug", skip_all)]
async fn handle_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
// This may block until one of two conditions happens:
Expand Down Expand Up @@ -507,6 +516,7 @@ pub(crate) struct MysticetiConsensusHandler {

impl MysticetiConsensusHandler {
pub(crate) fn new(
last_processed_commit_at_startup: CommitIndex,
mut consensus_handler: ConsensusHandler<CheckpointService>,
consensus_transaction_handler: ConsensusTransactionHandler,
mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
Expand All @@ -518,10 +528,14 @@ impl MysticetiConsensusHandler {
// TODO: pause when execution is overloaded, so consensus can detect the backpressure.
while let Some(consensus_commit) = commit_receiver.recv().await {
let commit_index = consensus_commit.commit_ref.index;
consensus_handler
.handle_consensus_commit(consensus_commit)
.await;
commit_consumer_monitor.set_highest_handled_commit(commit_index);
if commit_index <= last_processed_commit_at_startup {
consensus_handler.handle_prior_consensus_commit(consensus_commit);
} else {
consensus_handler
.handle_consensus_commit(consensus_commit)
.await;
commit_consumer_monitor.set_highest_handled_commit(commit_index);
}
}
}));
if consensus_transaction_handler.enabled() {
Expand Down
8 changes: 7 additions & 1 deletion crates/sui-core/src/consensus_manager/mysticeti_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,13 @@ impl ConsensusManagerTrait for MysticetiManager {
let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();

let consensus_handler = consensus_handler_initializer.new_consensus_handler();

let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
let last_processed_commit = consensus_handler.last_processed_subdag_index() as CommitIndex;
let starting_commit = last_processed_commit.saturating_sub(num_prior_commits);

let (commit_consumer, commit_receiver, transaction_receiver) =
CommitConsumer::new(consensus_handler.last_processed_subdag_index() as CommitIndex);
CommitConsumer::new(starting_commit);
let monitor = commit_consumer.monitor();

// If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its
Expand Down Expand Up @@ -204,6 +209,7 @@ impl ConsensusManagerTrait for MysticetiManager {
consensus_handler_initializer.metrics().clone(),
);
let handler = MysticetiConsensusHandler::new(
last_processed_commit,
consensus_handler,
consensus_transaction_handler,
commit_receiver,
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-protocol-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,12 @@ impl ProtocolConfig {
pub fn enable_nitro_attestation(&self) -> bool {
self.feature_flags.enable_nitro_attestation
}

pub fn consensus_num_requested_prior_commits_at_startup(&self) -> u32 {
// TODO: this will eventually be the max of some number of other
// parameters.
0
}
}

#[cfg(not(msim))]
Expand Down
Loading