diff --git a/prdoc/pr_3435.prdoc b/prdoc/pr_3435.prdoc new file mode 100644 index 0000000000000..7d7896bff7d40 --- /dev/null +++ b/prdoc/pr_3435.prdoc @@ -0,0 +1,16 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Fix BEEFY-related gossip messages error logs + +doc: + - audience: Node Operator + description: | + Added logic to pump the gossip engine while waiting for other things + to make sure gossiped messages get consumed (practically discarded + until worker is fully initialized). + This fixes an issue where node operators saw error logs, and fixes + potential RAM bloat when BEEFY initialization takes a long time + (i.e. during clean sync). +crates: + - name: sc-consensus-beefy diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index 8a0b0a74308f1..eb43c9173d751 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -56,6 +56,8 @@ pub(super) enum Action { Keep(H, ReputationChange), // discard, applying cost/benefit to originator. Discard(ReputationChange), + // ignore, no cost/benefit applied to originator. + DiscardNoReport, } /// An outcome of examining a message. @@ -68,7 +70,7 @@ enum Consider { /// Message is from the future. Reject. RejectFuture, /// Message cannot be evaluated. Reject. - RejectOutOfScope, + CannotEvaluate, } /// BEEFY gossip message type that gets encoded and sent on the network. @@ -168,18 +170,14 @@ impl Filter { .as_ref() .map(|f| // only from current set and only [filter.start, filter.end] - if set_id < f.validator_set.id() { + if set_id < f.validator_set.id() || round < f.start { Consider::RejectPast - } else if set_id > f.validator_set.id() { - Consider::RejectFuture - } else if round < f.start { - Consider::RejectPast - } else if round > f.end { + } else if set_id > f.validator_set.id() || round > f.end { Consider::RejectFuture } else { Consider::Accept }) - .unwrap_or(Consider::RejectOutOfScope) + .unwrap_or(Consider::CannotEvaluate) } /// Return true if `round` is >= than `max(session_start, best_beefy)`, @@ -199,7 +197,7 @@ impl Filter { Consider::Accept } ) - .unwrap_or(Consider::RejectOutOfScope) + .unwrap_or(Consider::CannotEvaluate) } /// Add new _known_ `round` to the set of seen valid justifications. @@ -244,7 +242,7 @@ where pub(crate) fn new( known_peers: Arc>>, ) -> (GossipValidator, TracingUnboundedReceiver) { - let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 10_000); + let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000); let val = GossipValidator { votes_topic: votes_topic::(), justifs_topic: proofs_topic::(), @@ -289,7 +287,9 @@ where match filter.consider_vote(round, set_id) { Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE), Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE), - Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE), + // When we can't evaluate, it's our fault (e.g. filter not initialized yet), we + // discard the vote without punishing or rewarding the sending peer. + Consider::CannotEvaluate => return Action::DiscardNoReport, Consider::Accept => {}, } @@ -330,7 +330,9 @@ where match guard.consider_finality_proof(round, set_id) { Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE), Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE), - Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE), + // When we can't evaluate, it's our fault (e.g. filter not initialized yet), we + // discard the proof without punishing or rewarding the sending peer. + Consider::CannotEvaluate => return Action::DiscardNoReport, Consider::Accept => {}, } @@ -357,7 +359,9 @@ where Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF) } }) - .unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)) + // When we can't evaluate, it's our fault (e.g. filter not initialized yet), we + // discard the proof without punishing or rewarding the sending peer. + .unwrap_or(Action::DiscardNoReport) }; if matches!(action, Action::Keep(_, _)) { self.gossip_filter.write().mark_round_as_proven(round); @@ -404,6 +408,7 @@ where self.report(*sender, cb); ValidationResult::Discard }, + Action::DiscardNoReport => ValidationResult::Discard, } } @@ -579,8 +584,8 @@ pub(crate) mod tests { // filter not initialized let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); - expected_report.cost_benefit = cost::OUT_OF_SCOPE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + // nothing reported + assert!(report_stream.try_recv().is_err()); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); // nothing in cache first time diff --git a/substrate/client/consensus/beefy/src/communication/mod.rs b/substrate/client/consensus/beefy/src/communication/mod.rs index 3827559057dde..6fda63688e695 100644 --- a/substrate/client/consensus/beefy/src/communication/mod.rs +++ b/substrate/client/consensus/beefy/src/communication/mod.rs @@ -90,8 +90,6 @@ mod cost { pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature"); // Message received with vote from voter not in validator set. pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter"); - // A message received that cannot be evaluated relative to our current state. - pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message"); // Message containing invalid proof. pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit"); // Reputation cost per signature checked for invalid proof. diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs index fc19ecc301422..ed8ed68c4e8d0 100644 --- a/substrate/client/consensus/beefy/src/import.rs +++ b/substrate/client/consensus/beefy/src/import.rs @@ -159,7 +159,7 @@ where // The proof is valid and the block is imported and final, we can import. debug!( target: LOG_TARGET, - "🥩 import justif {:?} for block number {:?}.", proof, number + "🥩 import justif {} for block number {:?}.", proof, number ); // Send the justification to the BEEFY voter for processing. self.justification_sender diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index 11a105561e47f..323af1bc8305c 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -31,7 +31,7 @@ use crate::{ import::BeefyBlockImport, metrics::register_metrics, }; -use futures::{stream::Fuse, StreamExt}; +use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, warn}; use parking_lot::Mutex; use prometheus::Registry; @@ -40,17 +40,21 @@ use sc_consensus::BlockImport; use sc_network::{NetworkRequest, NotificationService, ProtocolName}; use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing}; use sp_api::ProvideRuntimeApi; -use sp_blockchain::{ - Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult, -}; +use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend}; use sp_consensus::{Error as ConsensusError, SyncOracle}; use sp_consensus_beefy::{ - ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet, + ecdsa_crypto::AuthorityId, BeefyApi, ConsensusLog, MmrRootHash, PayloadProvider, ValidatorSet, + BEEFY_ENGINE_ID, }; use sp_keystore::KeystorePtr; use sp_mmr_primitives::MmrApi; use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero}; -use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, VecDeque}, + marker::PhantomData, + sync::Arc, + time::Duration, +}; mod aux_schema; mod error; @@ -63,9 +67,19 @@ pub mod communication; pub mod import; pub mod justification; +use crate::{ + communication::{gossip::GossipValidator, peers::PeerReport}, + justification::BeefyVersionedFinalityProof, + keystore::BeefyKeystore, + metrics::VoterMetrics, + round::Rounds, + worker::{BeefyWorker, PersistedState}, +}; pub use communication::beefy_protocol_name::{ gossip_protocol_name, justifications_protocol_name as justifs_protocol_name, }; +use sc_utils::mpsc::TracingUnboundedReceiver; +use sp_runtime::generic::OpaqueDigestItemId; #[cfg(test)] mod tests; @@ -209,6 +223,247 @@ pub struct BeefyParams { /// Handler for incoming BEEFY justifications requests from a remote peer. pub on_demand_justifications_handler: BeefyJustifsRequestHandler, } +/// Helper object holding BEEFY worker communication/gossip components. +/// +/// These are created once, but will be reused if worker is restarted/reinitialized. +pub(crate) struct BeefyComms { + pub gossip_engine: GossipEngine, + pub gossip_validator: Arc>, + pub gossip_report_stream: TracingUnboundedReceiver, + pub on_demand_justifications: OnDemandJustificationsEngine, +} + +/// Helper builder object for building [worker::BeefyWorker]. +/// +/// It has to do it in two steps: initialization and build, because the first step can sleep waiting +/// for certain chain and backend conditions, and while sleeping we still need to pump the +/// GossipEngine. Once initialization is done, the GossipEngine (and other pieces) are added to get +/// the complete [worker::BeefyWorker] object. +pub(crate) struct BeefyWorkerBuilder { + // utilities + backend: Arc, + runtime: Arc, + key_store: BeefyKeystore, + // voter metrics + metrics: Option, + persisted_state: PersistedState, +} + +impl BeefyWorkerBuilder +where + B: Block + codec::Codec, + BE: Backend, + R: ProvideRuntimeApi, + R::Api: BeefyApi, +{ + /// This will wait for the chain to enable BEEFY (if not yet enabled) and also wait for the + /// backend to sync all headers required by the voter to build a contiguous chain of mandatory + /// justifications. Then it builds the initial voter state using a combination of previously + /// persisted state in AUX DB and latest chain information/progress. + /// + /// Returns a sane `BeefyWorkerBuilder` that can build the `BeefyWorker`. + pub async fn async_initialize( + backend: Arc, + runtime: Arc, + key_store: BeefyKeystore, + metrics: Option, + min_block_delta: u32, + gossip_validator: Arc>, + finality_notifications: &mut Fuse>, + ) -> Result { + // Wait for BEEFY pallet to be active before starting voter. + let (beefy_genesis, best_grandpa) = + wait_for_runtime_pallet(&*runtime, finality_notifications).await?; + + let persisted_state = Self::load_or_init_state( + beefy_genesis, + best_grandpa, + min_block_delta, + backend.clone(), + runtime.clone(), + &key_store, + &metrics, + ) + .await?; + // Update the gossip validator with the right starting round and set id. + persisted_state + .gossip_filter_config() + .map(|f| gossip_validator.update_filter(f))?; + + Ok(BeefyWorkerBuilder { backend, runtime, key_store, metrics, persisted_state }) + } + + /// Takes rest of missing pieces as params and builds the `BeefyWorker`. + pub fn build( + self, + payload_provider: P, + sync: Arc, + comms: BeefyComms, + links: BeefyVoterLinks, + pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, + ) -> BeefyWorker { + BeefyWorker { + backend: self.backend, + runtime: self.runtime, + key_store: self.key_store, + metrics: self.metrics, + persisted_state: self.persisted_state, + payload_provider, + sync, + comms, + links, + pending_justifications, + } + } + + // If no persisted state present, walk back the chain from first GRANDPA notification to either: + // - latest BEEFY finalized block, or if none found on the way, + // - BEEFY pallet genesis; + // Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to + // finalize. + async fn init_state( + beefy_genesis: NumberFor, + best_grandpa: ::Header, + min_block_delta: u32, + backend: Arc, + runtime: Arc, + ) -> Result, Error> { + let blockchain = backend.blockchain(); + + let beefy_genesis = runtime + .runtime_api() + .beefy_genesis(best_grandpa.hash()) + .ok() + .flatten() + .filter(|genesis| *genesis == beefy_genesis) + .ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?; + // Walk back the imported blocks and initialize voter either, at the last block with + // a BEEFY justification, or at pallet genesis block; voter will resume from there. + let mut sessions = VecDeque::new(); + let mut header = best_grandpa.clone(); + let state = loop { + if let Some(true) = blockchain + .justifications(header.hash()) + .ok() + .flatten() + .map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some()) + { + debug!( + target: LOG_TARGET, + "🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.", + *header.number() + ); + let best_beefy = *header.number(); + // If no session boundaries detected so far, just initialize new rounds here. + if sessions.is_empty() { + let active_set = + expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?; + let mut rounds = Rounds::new(best_beefy, active_set); + // Mark the round as already finalized. + rounds.conclude(best_beefy); + sessions.push_front(rounds); + } + let state = PersistedState::checked_new( + best_grandpa, + best_beefy, + sessions, + min_block_delta, + beefy_genesis, + ) + .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?; + break state + } + + if *header.number() == beefy_genesis { + // We've reached BEEFY genesis, initialize voter here. + let genesis_set = + expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?; + info!( + target: LOG_TARGET, + "🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \ + Starting voting rounds at block {:?}, genesis validator set {:?}.", + beefy_genesis, + genesis_set, + ); + + sessions.push_front(Rounds::new(beefy_genesis, genesis_set)); + break PersistedState::checked_new( + best_grandpa, + Zero::zero(), + sessions, + min_block_delta, + beefy_genesis, + ) + .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))? + } + + if let Some(active) = find_authorities_change::(&header) { + debug!( + target: LOG_TARGET, + "🥩 Marking block {:?} as BEEFY Mandatory.", + *header.number() + ); + sessions.push_front(Rounds::new(*header.number(), active)); + } + + // Move up the chain. + header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?; + }; + + aux_schema::write_current_version(backend.as_ref())?; + aux_schema::write_voter_state(backend.as_ref(), &state)?; + Ok(state) + } + + async fn load_or_init_state( + beefy_genesis: NumberFor, + best_grandpa: ::Header, + min_block_delta: u32, + backend: Arc, + runtime: Arc, + key_store: &BeefyKeystore, + metrics: &Option, + ) -> Result, Error> { + // Initialize voter state from AUX DB if compatible. + if let Some(mut state) = crate::aux_schema::load_persistent(backend.as_ref())? + // Verify state pallet genesis matches runtime. + .filter(|state| state.pallet_genesis() == beefy_genesis) + { + // Overwrite persisted state with current best GRANDPA block. + state.set_best_grandpa(best_grandpa.clone()); + // Overwrite persisted data with newly provided `min_block_delta`. + state.set_min_block_delta(min_block_delta); + debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state); + + // Make sure that all the headers that we need have been synced. + let mut new_sessions = vec![]; + let mut header = best_grandpa.clone(); + while *header.number() > state.best_beefy() { + if state.voting_oracle().can_add_session(*header.number()) { + if let Some(active) = find_authorities_change::(&header) { + new_sessions.push((active, *header.number())); + } + } + header = + wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?; + } + + // Make sure we didn't miss any sessions during node restart. + for (validator_set, new_session_start) in new_sessions.drain(..).rev() { + debug!( + target: LOG_TARGET, + "🥩 Handling missed BEEFY session after node restart: {:?}.", + new_session_start + ); + state.init_session_at(new_session_start, validator_set, key_store, metrics); + } + return Ok(state) + } + + // No valid voter-state persisted, re-initialize from pallet genesis. + Self::init_state(beefy_genesis, best_grandpa, min_block_delta, backend, runtime).await + } +} /// Start the BEEFY gadget. /// @@ -277,7 +532,7 @@ pub async fn start_beefy_gadget( known_peers, prometheus_registry.clone(), ); - let mut beefy_comms = worker::BeefyComms { + let mut beefy_comms = BeefyComms { gossip_engine, gossip_validator, gossip_report_stream, @@ -287,57 +542,45 @@ pub async fn start_beefy_gadget( // We re-create and re-run the worker in this loop in order to quickly reinit and resume after // select recoverable errors. loop { - // Wait for BEEFY pallet to be active before starting voter. - let (beefy_genesis, best_grandpa) = match wait_for_runtime_pallet( - &*runtime, - &mut beefy_comms.gossip_engine, - &mut finality_notifications, - ) - .await - { - Ok(res) => res, - Err(e) => { - error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); - return - }, - }; - - let mut worker_base = worker::BeefyWorkerBase { - backend: backend.clone(), - runtime: runtime.clone(), - key_store: key_store.clone().into(), - metrics: metrics.clone(), - _phantom: Default::default(), - }; - - let persisted_state = match worker_base - .load_or_init_state(beefy_genesis, best_grandpa, min_block_delta) - .await - { - Ok(state) => state, - Err(e) => { - error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); - return - }, + // Make sure to pump gossip engine while waiting for initialization conditions. + let worker_builder = loop { + futures::select! { + builder_init_result = BeefyWorkerBuilder::async_initialize( + backend.clone(), + runtime.clone(), + key_store.clone().into(), + metrics.clone(), + min_block_delta, + beefy_comms.gossip_validator.clone(), + &mut finality_notifications, + ).fuse() => { + match builder_init_result { + Ok(builder) => break builder, + Err(e) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e); + return + }, + } + }, + // Pump peer reports + _ = &mut beefy_comms.gossip_report_stream.next() => { + continue + }, + // Pump gossip engine. + _ = &mut beefy_comms.gossip_engine => { + error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); + return + } + } }; - // Update the gossip validator with the right starting round and set id. - if let Err(e) = persisted_state - .gossip_filter_config() - .map(|f| beefy_comms.gossip_validator.update_filter(f)) - { - error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); - return - } - let worker = worker::BeefyWorker { - base: worker_base, - payload_provider: payload_provider.clone(), - sync: sync.clone(), - comms: beefy_comms, - links: links.clone(), - pending_justifications: BTreeMap::new(), - persisted_state, - }; + let worker = worker_builder.build( + payload_provider.clone(), + sync.clone(), + beefy_comms, + links.clone(), + BTreeMap::new(), + ); match futures::future::select( Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)), @@ -404,9 +647,8 @@ where /// Should be called only once during worker initialization. async fn wait_for_runtime_pallet( runtime: &R, - mut gossip_engine: &mut GossipEngine, finality: &mut Fuse>, -) -> ClientResult<(NumberFor, ::Header)> +) -> Result<(NumberFor, ::Header), Error> where B: Block, R: ProvideRuntimeApi, @@ -414,33 +656,24 @@ where { info!(target: LOG_TARGET, "🥩 BEEFY gadget waiting for BEEFY pallet to become available..."); loop { - futures::select! { - notif = finality.next() => { - let notif = match notif { - Some(notif) => notif, - None => break - }; - let at = notif.header.hash(); - if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() { - if *notif.header.number() >= start { - // Beefy pallet available, return header for best grandpa at the time. - info!( - target: LOG_TARGET, - "🥩 BEEFY pallet available: block {:?} beefy genesis {:?}", - notif.header.number(), start - ); - return Ok((start, notif.header)) - } - } - }, - _ = gossip_engine => { - break + let notif = finality.next().await.ok_or_else(|| { + let err_msg = "🥩 Finality stream has unexpectedly terminated.".into(); + error!(target: LOG_TARGET, "{}", err_msg); + Error::Backend(err_msg) + })?; + let at = notif.header.hash(); + if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() { + if *notif.header.number() >= start { + // Beefy pallet available, return header for best grandpa at the time. + info!( + target: LOG_TARGET, + "🥩 BEEFY pallet available: block {:?} beefy genesis {:?}", + notif.header.number(), start + ); + return Ok((start, notif.header)) } } } - let err_msg = "🥩 Gossip engine has unexpectedly terminated.".into(); - error!(target: LOG_TARGET, "{}", err_msg); - Err(ClientError::Backend(err_msg)) } /// Provides validator set active `at_header`. It tries to get it from state, otherwise falls @@ -474,7 +707,7 @@ where if let Ok(Some(active)) = runtime.runtime_api().validator_set(header.hash()) { return Ok(active) } else { - match worker::find_authorities_change::(&header) { + match find_authorities_change::(&header) { Some(active) => return Ok(active), // Move up the chain. Ultimately we'll get it from chain genesis state, or error out // there. @@ -486,3 +719,18 @@ where } } } + +/// Scan the `header` digest log for a BEEFY validator set change. Return either the new +/// validator set or `None` in case no validator set change has been signaled. +pub(crate) fn find_authorities_change(header: &B::Header) -> Option> +where + B: Block, +{ + let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID); + + let filter = |log: ConsensusLog| match log { + ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set), + _ => None, + }; + header.digest().convert_first(|l| l.try_to(id).and_then(filter)) +} diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index 0e6ff210b158c..d106c9dcd8816 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -32,8 +32,8 @@ use crate::{ gossip_protocol_name, justification::*, wait_for_runtime_pallet, - worker::{BeefyWorkerBase, PersistedState}, - BeefyRPCLinks, BeefyVoterLinks, KnownPeers, + worker::PersistedState, + BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers, }; use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use parking_lot::Mutex; @@ -368,27 +368,19 @@ async fn voter_init_setup( api: &TestApi, ) -> Result, Error> { let backend = net.peer(0).client().as_backend(); - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let (gossip_validator, _) = GossipValidator::new(known_peers); - let gossip_validator = Arc::new(gossip_validator); - let mut gossip_engine = sc_network_gossip::GossipEngine::new( - net.peer(0).network_service().clone(), - net.peer(0).sync_service().clone(), - net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(), - "/beefy/whatever", - gossip_validator, - None, - ); - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { + let (beefy_genesis, best_grandpa) = wait_for_runtime_pallet(api, finality).await.unwrap(); + let key_store = None.into(); + let metrics = None; + BeefyWorkerBuilder::load_or_init_state( + beefy_genesis, + best_grandpa, + 1, backend, - runtime: Arc::new(api.clone()), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await + Arc::new(api.clone()), + &key_store, + &metrics, + ) + .await } // Spawns beefy voters. Returns a future to spawn on the runtime. @@ -1065,32 +1057,7 @@ async fn should_initialize_voter_at_custom_genesis() { net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap(); // load persistent state - nothing in DB, should init at genesis - // - // NOTE: code from `voter_init_setup()` is moved here because the new network event system - // doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the - // first `GossipEngine` - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let (gossip_validator, _) = GossipValidator::new(known_peers); - let gossip_validator = Arc::new(gossip_validator); - let mut gossip_engine = sc_network_gossip::GossipEngine::new( - net.peer(0).network_service().clone(), - net.peer(0).sync_service().clone(), - net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(), - "/beefy/whatever", - gossip_validator, - None, - ); - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { - backend: backend.clone(), - runtime: Arc::new(api), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - let persisted_state = - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with single session starting at block `custom_pallet_genesis` (7) @@ -1120,18 +1087,7 @@ async fn should_initialize_voter_at_custom_genesis() { net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap(); // load persistent state - state preset in DB, but with different pallet genesis - // the network state persists and uses the old `GossipEngine` initialized for `peer(0)` - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { - backend: backend.clone(), - runtime: Arc::new(api), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - let new_persisted_state = - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); // verify voter initialized with single session starting at block `new_pallet_genesis` (10) let sessions = new_persisted_state.voting_oracle().sessions(); @@ -1322,32 +1278,7 @@ async fn should_catch_up_when_loading_saved_voter_state() { let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at genesis - // - // NOTE: code from `voter_init_setup()` is moved here because the new network event system - // doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the - // first `GossipEngine` - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let (gossip_validator, _) = GossipValidator::new(known_peers); - let gossip_validator = Arc::new(gossip_validator); - let mut gossip_engine = sc_network_gossip::GossipEngine::new( - net.peer(0).network_service().clone(), - net.peer(0).sync_service().clone(), - net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(), - "/beefy/whatever", - gossip_validator, - None, - ); - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { - backend: backend.clone(), - runtime: Arc::new(api.clone()), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - let persisted_state = - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with two sessions starting at blocks 1 and 10 @@ -1374,18 +1305,7 @@ async fn should_catch_up_when_loading_saved_voter_state() { // finalize 25 without justifications net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap(); // load persistent state - state preset in DB - // the network state persists and uses the old `GossipEngine` initialized for `peer(0)` - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { - backend: backend.clone(), - runtime: Arc::new(api), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - let persisted_state = - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); // Verify voter initialized with old sessions plus a new one starting at block 20. // There shouldn't be any duplicates. diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index d7a229003cc46..c8eb19621ba5a 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -17,46 +17,42 @@ // along with this program. If not, see . use crate::{ - aux_schema, communication::{ - gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator}, + gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage}, peers::PeerReport, - request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo}, + request_response::outgoing_requests_engine::ResponseInfo, }, error::Error, - expect_validator_set, + find_authorities_change, justification::BeefyVersionedFinalityProof, keystore::BeefyKeystore, metric_inc, metric_set, metrics::VoterMetrics, round::{Rounds, VoteImportResult}, - wait_for_parent_header, BeefyVoterLinks, HEADER_SYNC_DELAY, LOG_TARGET, + BeefyComms, BeefyVoterLinks, LOG_TARGET, }; use codec::{Codec, Decode, DecodeAll, Encode}; use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, log_enabled, trace, warn}; use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend}; -use sc_network_gossip::GossipEngine; -use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver}; +use sc_utils::notification::NotificationReceiver; use sp_api::ProvideRuntimeApi; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; -use sp_blockchain::Backend as BlockchainBackend; use sp_consensus::SyncOracle; use sp_consensus_beefy::{ check_equivocation_proof, ecdsa_crypto::{AuthorityId, Signature}, - BeefyApi, BeefySignatureHasher, Commitment, ConsensusLog, EquivocationProof, PayloadProvider, - ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, + BeefyApi, BeefySignatureHasher, Commitment, EquivocationProof, PayloadProvider, ValidatorSet, + VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, }; use sp_runtime::{ - generic::{BlockId, OpaqueDigestItemId}, + generic::BlockId, traits::{Block, Header, NumberFor, Zero}, SaturatedConversion, }; use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, fmt::Debug, - marker::PhantomData, sync::Arc, }; @@ -180,8 +176,8 @@ impl VoterOracle { } } - // Check if an observed session can be added to the Oracle. - fn can_add_session(&self, session_start: NumberFor) -> bool { + /// Check if an observed session can be added to the Oracle. + pub fn can_add_session(&self, session_start: NumberFor) -> bool { let latest_known_session_start = self.sessions.back().map(|session| session.session_start()); Some(session_start) > latest_known_session_start @@ -319,229 +315,28 @@ impl PersistedState { self.voting_oracle.best_grandpa_block_header = best_grandpa; } + pub fn voting_oracle(&self) -> &VoterOracle { + &self.voting_oracle + } + pub(crate) fn gossip_filter_config(&self) -> Result, Error> { let (start, end) = self.voting_oracle.accepted_interval()?; let validator_set = self.voting_oracle.current_validator_set()?; Ok(GossipFilterCfg { start, end, validator_set }) } -} - -/// Helper object holding BEEFY worker communication/gossip components. -/// -/// These are created once, but will be reused if worker is restarted/reinitialized. -pub(crate) struct BeefyComms { - pub gossip_engine: GossipEngine, - pub gossip_validator: Arc>, - pub gossip_report_stream: TracingUnboundedReceiver, - pub on_demand_justifications: OnDemandJustificationsEngine, -} - -pub(crate) struct BeefyWorkerBase { - // utilities - pub backend: Arc, - pub runtime: Arc, - pub key_store: BeefyKeystore, - - /// BEEFY client metrics. - pub metrics: Option, - - pub _phantom: PhantomData, -} - -impl BeefyWorkerBase -where - B: Block + Codec, - BE: Backend, - R: ProvideRuntimeApi, - R::Api: BeefyApi, -{ - // If no persisted state present, walk back the chain from first GRANDPA notification to either: - // - latest BEEFY finalized block, or if none found on the way, - // - BEEFY pallet genesis; - // Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to - // finalize. - async fn init_state( - &self, - beefy_genesis: NumberFor, - best_grandpa: ::Header, - min_block_delta: u32, - ) -> Result, Error> { - let blockchain = self.backend.blockchain(); - - let beefy_genesis = self - .runtime - .runtime_api() - .beefy_genesis(best_grandpa.hash()) - .ok() - .flatten() - .filter(|genesis| *genesis == beefy_genesis) - .ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?; - // Walk back the imported blocks and initialize voter either, at the last block with - // a BEEFY justification, or at pallet genesis block; voter will resume from there. - let mut sessions = VecDeque::new(); - let mut header = best_grandpa.clone(); - let state = loop { - if let Some(true) = blockchain - .justifications(header.hash()) - .ok() - .flatten() - .map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some()) - { - debug!( - target: LOG_TARGET, - "🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.", - *header.number() - ); - let best_beefy = *header.number(); - // If no session boundaries detected so far, just initialize new rounds here. - if sessions.is_empty() { - let active_set = - expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header) - .await?; - let mut rounds = Rounds::new(best_beefy, active_set); - // Mark the round as already finalized. - rounds.conclude(best_beefy); - sessions.push_front(rounds); - } - let state = PersistedState::checked_new( - best_grandpa, - best_beefy, - sessions, - min_block_delta, - beefy_genesis, - ) - .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?; - break state - } - - if *header.number() == beefy_genesis { - // We've reached BEEFY genesis, initialize voter here. - let genesis_set = - expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header) - .await?; - info!( - target: LOG_TARGET, - "🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \ - Starting voting rounds at block {:?}, genesis validator set {:?}.", - beefy_genesis, - genesis_set, - ); - - sessions.push_front(Rounds::new(beefy_genesis, genesis_set)); - break PersistedState::checked_new( - best_grandpa, - Zero::zero(), - sessions, - min_block_delta, - beefy_genesis, - ) - .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))? - } - - if let Some(active) = find_authorities_change::(&header) { - debug!( - target: LOG_TARGET, - "🥩 Marking block {:?} as BEEFY Mandatory.", - *header.number() - ); - sessions.push_front(Rounds::new(*header.number(), active)); - } - - // Move up the chain. - header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?; - }; - - aux_schema::write_current_version(self.backend.as_ref())?; - aux_schema::write_voter_state(self.backend.as_ref(), &state)?; - Ok(state) - } - - pub async fn load_or_init_state( - &mut self, - beefy_genesis: NumberFor, - best_grandpa: ::Header, - min_block_delta: u32, - ) -> Result, Error> { - // Initialize voter state from AUX DB if compatible. - if let Some(mut state) = crate::aux_schema::load_persistent(self.backend.as_ref())? - // Verify state pallet genesis matches runtime. - .filter(|state| state.pallet_genesis() == beefy_genesis) - { - // Overwrite persisted state with current best GRANDPA block. - state.set_best_grandpa(best_grandpa.clone()); - // Overwrite persisted data with newly provided `min_block_delta`. - state.set_min_block_delta(min_block_delta); - debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state); - - // Make sure that all the headers that we need have been synced. - let mut new_sessions = vec![]; - let mut header = best_grandpa.clone(); - while *header.number() > state.best_beefy() { - if state.voting_oracle.can_add_session(*header.number()) { - if let Some(active) = find_authorities_change::(&header) { - new_sessions.push((active, *header.number())); - } - } - header = - wait_for_parent_header(self.backend.blockchain(), header, HEADER_SYNC_DELAY) - .await?; - } - - // Make sure we didn't miss any sessions during node restart. - for (validator_set, new_session_start) in new_sessions.drain(..).rev() { - debug!( - target: LOG_TARGET, - "🥩 Handling missed BEEFY session after node restart: {:?}.", - new_session_start - ); - self.init_session_at(&mut state, validator_set, new_session_start); - } - return Ok(state) - } - - // No valid voter-state persisted, re-initialize from pallet genesis. - self.init_state(beefy_genesis, best_grandpa, min_block_delta).await - } - - /// Verify `active` validator set for `block` against the key store - /// - /// We want to make sure that we have _at least one_ key in our keystore that - /// is part of the validator set, that's because if there are no local keys - /// then we can't perform our job as a validator. - /// - /// Note that for a non-authority node there will be no keystore, and we will - /// return an error and don't check. The error can usually be ignored. - fn verify_validator_set( - &self, - block: &NumberFor, - active: &ValidatorSet, - ) -> Result<(), Error> { - let active: BTreeSet<&AuthorityId> = active.validators().iter().collect(); - - let public_keys = self.key_store.public_keys()?; - let store: BTreeSet<&AuthorityId> = public_keys.iter().collect(); - - if store.intersection(&active).count() == 0 { - let msg = "no authority public key found in store".to_string(); - debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg); - metric_inc!(self.metrics, beefy_no_authority_found_in_store); - Err(Error::Keystore(msg)) - } else { - Ok(()) - } - } /// Handle session changes by starting new voting round for mandatory blocks. - fn init_session_at( + pub fn init_session_at( &mut self, - persisted_state: &mut PersistedState, - validator_set: ValidatorSet, new_session_start: NumberFor, + validator_set: ValidatorSet, + key_store: &BeefyKeystore, + metrics: &Option, ) { debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set); // BEEFY should finalize a mandatory block during each session. - if let Ok(active_session) = persisted_state.voting_oracle.active_rounds() { + if let Ok(active_session) = self.voting_oracle.active_rounds() { if !active_session.mandatory_done() { debug!( target: LOG_TARGET, @@ -549,20 +344,20 @@ where validator_set.id(), active_session.validator_set_id(), ); - metric_inc!(self.metrics, beefy_lagging_sessions); + metric_inc!(metrics, beefy_lagging_sessions); } } if log_enabled!(target: LOG_TARGET, log::Level::Debug) { // verify the new validator set - only do it if we're also logging the warning - let _ = self.verify_validator_set(&new_session_start, &validator_set); + if verify_validator_set::(&new_session_start, &validator_set, key_store).is_err() { + metric_inc!(metrics, beefy_no_authority_found_in_store); + } } let id = validator_set.id(); - persisted_state - .voting_oracle - .add_session(Rounds::new(new_session_start, validator_set)); - metric_set!(self.metrics, beefy_validator_set_id, id); + self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set)); + metric_set!(metrics, beefy_validator_set_id, id); info!( target: LOG_TARGET, "🥩 New Rounds for validator set id: {:?} with session_start {:?}", @@ -574,9 +369,10 @@ where /// A BEEFY worker/voter that follows the BEEFY protocol pub(crate) struct BeefyWorker { - pub base: BeefyWorkerBase, - - // utils + // utilities + pub backend: Arc, + pub runtime: Arc, + pub key_store: BeefyKeystore, pub payload_provider: P, pub sync: Arc, @@ -592,6 +388,8 @@ pub(crate) struct BeefyWorker { pub pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, /// Persisted voter state. pub persisted_state: PersistedState, + /// BEEFY voter metrics + pub metrics: Option, } impl BeefyWorker @@ -622,8 +420,12 @@ where validator_set: ValidatorSet, new_session_start: NumberFor, ) { - self.base - .init_session_at(&mut self.persisted_state, validator_set, new_session_start); + self.persisted_state.init_session_at( + new_session_start, + validator_set, + &self.key_store, + &self.metrics, + ); } fn handle_finality_notification( @@ -639,8 +441,7 @@ where notification.tree_route, ); - self.base - .runtime + self.runtime .runtime_api() .beefy_genesis(header.hash()) .ok() @@ -654,7 +455,7 @@ where self.persisted_state.set_best_grandpa(header.clone()); // Check all (newly) finalized blocks for new session(s). - let backend = self.base.backend.clone(); + let backend = self.backend.clone(); for header in notification .tree_route .iter() @@ -673,7 +474,7 @@ where } if new_session_added { - crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string()))?; } @@ -707,7 +508,7 @@ where true, ); }, - RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_votes), + RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_votes), RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote), }; Ok(()) @@ -727,23 +528,23 @@ where match self.voting_oracle().triage_round(block_num)? { RoundAction::Process => { debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num); - metric_inc!(self.base.metrics, beefy_imported_justifications); + metric_inc!(self.metrics, beefy_imported_justifications); self.finalize(justification)? }, RoundAction::Enqueue => { debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num); if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS { self.pending_justifications.entry(block_num).or_insert(justification); - metric_inc!(self.base.metrics, beefy_buffered_justifications); + metric_inc!(self.metrics, beefy_buffered_justifications); } else { - metric_inc!(self.base.metrics, beefy_buffered_justifications_dropped); + metric_inc!(self.metrics, beefy_buffered_justifications_dropped); warn!( target: LOG_TARGET, "🥩 Buffer justification dropped for round: {:?}.", block_num ); } }, - RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_justifications), + RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_justifications), }; Ok(()) } @@ -765,7 +566,7 @@ where // We created the `finality_proof` and know to be valid. // New state is persisted after finalization. self.finalize(finality_proof.clone())?; - metric_inc!(self.base.metrics, beefy_good_votes_processed); + metric_inc!(self.metrics, beefy_good_votes_processed); return Ok(Some(finality_proof)) }, VoteImportResult::Ok => { @@ -776,20 +577,17 @@ where .map(|(mandatory_num, _)| mandatory_num == block_number) .unwrap_or(false) { - crate::aux_schema::write_voter_state( - &*self.base.backend, - &self.persisted_state, - ) - .map_err(|e| Error::Backend(e.to_string()))?; + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) + .map_err(|e| Error::Backend(e.to_string()))?; } - metric_inc!(self.base.metrics, beefy_good_votes_processed); + metric_inc!(self.metrics, beefy_good_votes_processed); }, VoteImportResult::Equivocation(proof) => { - metric_inc!(self.base.metrics, beefy_equivocation_votes); + metric_inc!(self.metrics, beefy_equivocation_votes); self.report_equivocation(proof)?; }, - VoteImportResult::Invalid => metric_inc!(self.base.metrics, beefy_invalid_votes), - VoteImportResult::Stale => metric_inc!(self.base.metrics, beefy_stale_votes), + VoteImportResult::Invalid => metric_inc!(self.metrics, beefy_invalid_votes), + VoteImportResult::Stale => metric_inc!(self.metrics, beefy_stale_votes), }; Ok(None) } @@ -816,15 +614,14 @@ where // Set new best BEEFY block number. self.persisted_state.set_best_beefy(block_num); - crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string()))?; - metric_set!(self.base.metrics, beefy_best_block, block_num); + metric_set!(self.metrics, beefy_best_block, block_num); self.comms.on_demand_justifications.cancel_requests_older_than(block_num); if let Err(e) = self - .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(block_num)) @@ -834,8 +631,7 @@ where .notify(|| Ok::<_, ()>(hash)) .expect("forwards closure result; the closure always returns Ok; qed."); - self.base - .backend + self.backend .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode())) }) { debug!( @@ -872,13 +668,13 @@ where for (num, justification) in justifs_to_process.into_iter() { debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num); - metric_inc!(self.base.metrics, beefy_imported_justifications); + metric_inc!(self.metrics, beefy_imported_justifications); if let Err(err) = self.finalize(justification) { error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err); } } metric_set!( - self.base.metrics, + self.metrics, beefy_buffered_justifications, self.pending_justifications.len() ); @@ -890,7 +686,7 @@ where fn try_to_vote(&mut self) -> Result<(), Error> { // Vote if there's now a new vote target. if let Some(target) = self.voting_oracle().voting_target() { - metric_set!(self.base.metrics, beefy_should_vote_on, target); + metric_set!(self.metrics, beefy_should_vote_on, target); if target > self.persisted_state.best_voted { self.do_vote(target)?; } @@ -910,7 +706,6 @@ where self.persisted_state.voting_oracle.best_grandpa_block_header.clone() } else { let hash = self - .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(target_number)) @@ -922,7 +717,7 @@ where Error::Backend(err_msg) })?; - self.base.backend.blockchain().expect_header(hash).map_err(|err| { + self.backend.blockchain().expect_header(hash).map_err(|err| { let err_msg = format!( "Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..", target_number, hash, err @@ -942,7 +737,7 @@ where let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?; let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); - let authority_id = if let Some(id) = self.base.key_store.authority_id(validators) { + let authority_id = if let Some(id) = self.key_store.authority_id(validators) { debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id); id } else { @@ -956,7 +751,7 @@ where let commitment = Commitment { payload, block_number: target_number, validator_set_id }; let encoded_commitment = commitment.encode(); - let signature = match self.base.key_store.sign(&authority_id, &encoded_commitment) { + let signature = match self.key_store.sign(&authority_id, &encoded_commitment) { Ok(sig) => sig, Err(err) => { warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err); @@ -981,7 +776,7 @@ where .gossip_engine .gossip_message(proofs_topic::(), encoded_proof, true); } else { - metric_inc!(self.base.metrics, beefy_votes_sent); + metric_inc!(self.metrics, beefy_votes_sent); debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote); let encoded_vote = GossipMessage::::Vote(vote).encode(); self.comms.gossip_engine.gossip_message(votes_topic::(), encoded_vote, false); @@ -989,8 +784,8 @@ where // Persist state after vote to avoid double voting in case of voter restarts. self.persisted_state.best_voted = target_number; - metric_set!(self.base.metrics, beefy_best_voted, target_number); - crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) + metric_set!(self.metrics, beefy_best_voted, target_number); + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string())) } @@ -1164,7 +959,7 @@ where if !check_equivocation_proof::<_, _, BeefySignatureHasher>(&proof) { debug!(target: LOG_TARGET, "🥩 Skip report for bad equivocation {:?}", proof); return Ok(()) - } else if let Some(local_id) = self.base.key_store.authority_id(validators) { + } else if let Some(local_id) = self.key_store.authority_id(validators) { if offender_id == local_id { warn!(target: LOG_TARGET, "🥩 Skip equivocation report for own equivocation"); return Ok(()) @@ -1173,7 +968,6 @@ where let number = *proof.round_number(); let hash = self - .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(number)) @@ -1184,7 +978,7 @@ where ); Error::Backend(err_msg) })?; - let runtime_api = self.base.runtime.runtime_api(); + let runtime_api = self.runtime.runtime_api(); // generate key ownership proof at that block let key_owner_proof = match runtime_api .generate_key_ownership_proof(hash, validator_set_id, offender_id) @@ -1201,7 +995,7 @@ where }; // submit equivocation report at **best** block - let best_block_hash = self.base.backend.blockchain().info().best_hash; + let best_block_hash = self.backend.blockchain().info().best_hash; runtime_api .submit_report_equivocation_unsigned_extrinsic(best_block_hash, proof, key_owner_proof) .map_err(Error::RuntimeApi)?; @@ -1210,21 +1004,6 @@ where } } -/// Scan the `header` digest log for a BEEFY validator set change. Return either the new -/// validator set or `None` in case no validator set change has been signaled. -pub(crate) fn find_authorities_change(header: &B::Header) -> Option> -where - B: Block, -{ - let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID); - - let filter = |log: ConsensusLog| match log { - ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set), - _ => None, - }; - header.digest().convert_first(|l| l.try_to(id).and_then(filter)) -} - /// Calculate next block number to vote on. /// /// Return `None` if there is no votable target yet. @@ -1261,11 +1040,42 @@ where } } +/// Verify `active` validator set for `block` against the key store +/// +/// We want to make sure that we have _at least one_ key in our keystore that +/// is part of the validator set, that's because if there are no local keys +/// then we can't perform our job as a validator. +/// +/// Note that for a non-authority node there will be no keystore, and we will +/// return an error and don't check. The error can usually be ignored. +fn verify_validator_set( + block: &NumberFor, + active: &ValidatorSet, + key_store: &BeefyKeystore, +) -> Result<(), Error> { + let active: BTreeSet<&AuthorityId> = active.validators().iter().collect(); + + let public_keys = key_store.public_keys()?; + let store: BTreeSet<&AuthorityId> = public_keys.iter().collect(); + + if store.intersection(&active).count() == 0 { + let msg = "no authority public key found in store".to_string(); + debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg); + Err(Error::Keystore(msg)) + } else { + Ok(()) + } +} + #[cfg(test)] pub(crate) mod tests { use super::*; use crate::{ - communication::notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}, + communication::{ + gossip::GossipValidator, + notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}, + request_response::outgoing_requests_engine::OnDemandJustificationsEngine, + }, tests::{ create_beefy_keystore, get_beefy_streams, make_beefy_ids, BeefyPeer, BeefyTestNet, TestApi, @@ -1275,6 +1085,7 @@ pub(crate) mod tests { use futures::{future::poll_fn, task::Poll}; use parking_lot::Mutex; use sc_client_api::{Backend as BackendT, HeaderBackend}; + use sc_network_gossip::GossipEngine; use sc_network_sync::SyncingService; use sc_network_test::TestNetFactory; use sp_blockchain::Backend as BlockchainBackendT; @@ -1283,7 +1094,7 @@ pub(crate) mod tests { known_payloads::MMR_ROOT_ID, mmr::MmrRootProvider, test_utils::{generate_equivocation_proof, Keyring}, - Payload, SignedCommitment, + ConsensusLog, Payload, SignedCommitment, }; use sp_runtime::traits::{Header as HeaderT, One}; use substrate_test_runtime_client::{ @@ -1292,10 +1103,6 @@ pub(crate) mod tests { }; impl PersistedState { - pub fn voting_oracle(&self) -> &VoterOracle { - &self.voting_oracle - } - pub fn active_round(&self) -> Result<&Rounds, Error> { self.voting_oracle.active_rounds() } @@ -1391,13 +1198,10 @@ pub(crate) mod tests { on_demand_justifications, }; BeefyWorker { - base: BeefyWorkerBase { - backend, - runtime: api, - key_store: Some(keystore).into(), - metrics, - _phantom: Default::default(), - }, + backend, + runtime: api, + key_store: Some(keystore).into(), + metrics, payload_provider, sync: Arc::new(sync), links, @@ -1675,19 +1479,22 @@ pub(crate) mod tests { let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); // keystore doesn't contain other keys than validators' - assert_eq!(worker.base.verify_validator_set(&1, &validator_set), Ok(())); + assert_eq!(verify_validator_set::(&1, &validator_set, &worker.key_store), Ok(())); // unknown `Bob` key let keys = &[Keyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let err_msg = "no authority public key found in store".to_string(); let expected = Err(Error::Keystore(err_msg)); - assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected); + assert_eq!(verify_validator_set::(&1, &validator_set, &worker.key_store), expected); // worker has no keystore - worker.base.key_store = None.into(); + worker.key_store = None.into(); let expected_err = Err(Error::Keystore("no Keystore".into())); - assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected_err); + assert_eq!( + verify_validator_set::(&1, &validator_set, &worker.key_store), + expected_err + ); } #[tokio::test] @@ -1839,7 +1646,7 @@ pub(crate) mod tests { let mut net = BeefyTestNet::new(1); let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); - worker.base.runtime = api_alice.clone(); + worker.runtime = api_alice.clone(); // let there be a block with num = 1: let _ = net.peer(0).push_blocks(1, false); diff --git a/substrate/primitives/consensus/beefy/src/commitment.rs b/substrate/primitives/consensus/beefy/src/commitment.rs index 1f0fb34ebf10b..335c6b604f044 100644 --- a/substrate/primitives/consensus/beefy/src/commitment.rs +++ b/substrate/primitives/consensus/beefy/src/commitment.rs @@ -97,6 +97,19 @@ pub struct SignedCommitment { pub signatures: Vec>, } +impl sp_std::fmt::Display + for SignedCommitment +{ + fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result { + let signatures_count = self.signatures.iter().filter(|s| s.is_some()).count(); + write!( + f, + "SignedCommitment(commitment: {:?}, signatures_count: {})", + self.commitment, signatures_count + ) + } +} + impl SignedCommitment { /// Return the number of collected signatures. pub fn no_of_signatures(&self) -> usize { @@ -241,6 +254,14 @@ pub enum VersionedFinalityProof { V1(SignedCommitment), } +impl sp_std::fmt::Display for VersionedFinalityProof { + fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result { + match self { + VersionedFinalityProof::V1(sc) => write!(f, "VersionedFinalityProof::V1({})", sc), + } + } +} + impl From> for VersionedFinalityProof { fn from(commitment: SignedCommitment) -> Self { VersionedFinalityProof::V1(commitment)