diff --git a/Cargo.lock b/Cargo.lock index c679cb7b38ff..284b837f1d49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6428,6 +6428,7 @@ name = "polkadot-approval-distribution" version = "0.9.37" dependencies = [ "assert_matches", + "bitvec", "env_logger 0.9.0", "futures", "itertools", @@ -7156,6 +7157,7 @@ dependencies = [ name = "polkadot-node-primitives" version = "0.9.37" dependencies = [ + "bitvec", "bounded-vec", "futures", "parity-scale-codec", @@ -7206,6 +7208,7 @@ name = "polkadot-node-subsystem-types" version = "0.9.37" dependencies = [ "async-trait", + "bitvec", "derive_more", "futures", "orchestra", diff --git a/node/core/approval-voting/src/approval_db/v1/mod.rs b/node/core/approval-voting/src/approval_db/v1/mod.rs index 58781e76ce39..41f7760608f3 100644 --- a/node/core/approval-voting/src/approval_db/v1/mod.rs +++ b/node/core/approval-voting/src/approval_db/v1/mod.rs @@ -17,7 +17,7 @@ //! Version 1 of the DB schema. use parity_scale_codec::{Decode, Encode}; -use polkadot_node_primitives::approval::{AssignmentCertV2, DelayTranche}; +use polkadot_node_primitives::approval::{v2::CoreBitfield, AssignmentCertV2, DelayTranche}; use polkadot_node_subsystem::{SubsystemError, SubsystemResult}; use polkadot_node_subsystem_util::database::{DBTransaction, Database}; use polkadot_primitives::{ @@ -161,11 +161,16 @@ pub struct Config { /// Details pertaining to our assignment on a block. #[derive(Encode, Decode, Debug, Clone, PartialEq)] pub struct OurAssignment { + /// Our assignment certificate. pub cert: AssignmentCertV2, + /// The tranche for which the assignment refers to. pub tranche: DelayTranche, + /// Our validator index for the session in which the candidates were included. pub validator_index: ValidatorIndex, - // Whether the assignment has been triggered already. + /// Whether the assignment has been triggered already. pub triggered: bool, + /// A subset of the core indices obtained from the VRF output. + pub assignment_bitfield: CoreBitfield, } /// Metadata regarding a specific tranche of assignments for a specific candidate. @@ -186,7 +191,7 @@ pub struct ApprovalEntry { pub our_assignment: Option, pub our_approval_sig: Option, // `n_validators` bits. - pub assignments: Bitfield, + pub assigned_validators: Bitfield, pub approved: bool, } diff --git a/node/core/approval-voting/src/criteria.rs b/node/core/approval-voting/src/criteria.rs index 599382acd98e..ec5da22044b7 100644 --- a/node/core/approval-voting/src/criteria.rs +++ b/node/core/approval-voting/src/criteria.rs @@ -18,8 +18,8 @@ use parity_scale_codec::{Decode, Encode}; use polkadot_node_primitives::approval::{ - self as approval_types, AssignmentCert, AssignmentCertKind, AssignmentCertKindV2, - AssignmentCertV2, DelayTranche, RelayVRFStory, + self as approval_types, v2::CoreBitfield, AssignmentCert, AssignmentCertKind, + AssignmentCertKindV2, AssignmentCertV2, DelayTranche, RelayVRFStory, }; use polkadot_primitives::{ AssignmentId, AssignmentPair, CandidateHash, CoreIndex, GroupIndex, IndexedVec, SessionInfo, @@ -44,6 +44,8 @@ pub struct OurAssignment { validator_index: ValidatorIndex, // Whether the assignment has been triggered already. triggered: bool, + // The core indices obtained from the VRF output. + assignment_bitfield: CoreBitfield, } impl OurAssignment { @@ -66,16 +68,20 @@ impl OurAssignment { pub(crate) fn mark_triggered(&mut self) { self.triggered = true; } + + pub(crate) fn assignment_bitfield(&self) -> &CoreBitfield { + &self.assignment_bitfield + } } impl From for OurAssignment { - // TODO: OurAssignment changed -> migration for parachains db approval voting column. fn from(entry: crate::approval_db::v1::OurAssignment) -> Self { OurAssignment { cert: entry.cert, tranche: entry.tranche, validator_index: entry.validator_index, triggered: entry.triggered, + assignment_bitfield: entry.assignment_bitfield, } } } @@ -87,17 +93,41 @@ impl From for crate::approval_db::v1::OurAssignment { tranche: entry.tranche, validator_index: entry.validator_index, triggered: entry.triggered, + assignment_bitfield: entry.assignment_bitfield, } } } -fn relay_vrf_modulo_transcript(relay_vrf_story: RelayVRFStory, sample: u32) -> Transcript { - // combine the relay VRF story with a sample number. - let mut t = Transcript::new(approval_types::RELAY_VRF_MODULO_CONTEXT); - t.append_message(b"RC-VRF", &relay_vrf_story.0); - sample.using_encoded(|s| t.append_message(b"sample", s)); +// Combines the relay VRF story with a sample number if any. +fn relay_vrf_modulo_transcript_inner( + mut transcript: Transcript, + relay_vrf_story: RelayVRFStory, + sample: Option, +) -> Transcript { + transcript.append_message(b"RC-VRF", &relay_vrf_story.0); - t + if let Some(sample) = sample { + sample.using_encoded(|s| transcript.append_message(b"sample", s)); + } + + transcript +} + +fn relay_vrf_modulo_transcript_v1(relay_vrf_story: RelayVRFStory, sample: u32) -> Transcript { + relay_vrf_modulo_transcript_inner( + Transcript::new(approval_types::v1::RELAY_VRF_MODULO_CONTEXT), + relay_vrf_story, + Some(sample), + ) +} + +fn relay_vrf_modulo_transcript_v2(relay_vrf_story: RelayVRFStory) -> Transcript { + // combine the relay VRF story with a sample number. + relay_vrf_modulo_transcript_inner( + Transcript::new(approval_types::v2::RELAY_VRF_MODULO_CONTEXT), + relay_vrf_story, + None, + ) } /// A hard upper bound on num_cores * target_checkers / num_validators @@ -138,7 +168,7 @@ fn relay_vrf_modulo_cores( max_cores: u32, ) -> Vec { vrf_in_out - .make_bytes::(approval_types::CORE_RANDOMNESS_CONTEXT) + .make_bytes::(approval_types::v2::CORE_RANDOMNESS_CONTEXT) .0 .chunks_exact(4) .take(num_samples as usize) @@ -148,7 +178,7 @@ fn relay_vrf_modulo_cores( } fn relay_vrf_modulo_core(vrf_in_out: &VRFInOut, n_cores: u32) -> CoreIndex { - let bytes: [u8; 4] = vrf_in_out.make_bytes(approval_types::CORE_RANDOMNESS_CONTEXT); + let bytes: [u8; 4] = vrf_in_out.make_bytes(approval_types::v1::CORE_RANDOMNESS_CONTEXT); // interpret as little-endian u32. let random_core = u32::from_le_bytes(bytes) % n_cores; @@ -156,7 +186,7 @@ fn relay_vrf_modulo_core(vrf_in_out: &VRFInOut, n_cores: u32) -> CoreIndex { } fn relay_vrf_delay_transcript(relay_vrf_story: RelayVRFStory, core_index: CoreIndex) -> Transcript { - let mut t = Transcript::new(approval_types::RELAY_VRF_DELAY_CONTEXT); + let mut t = Transcript::new(approval_types::v1::RELAY_VRF_DELAY_CONTEXT); t.append_message(b"RC-VRF", &relay_vrf_story.0); core_index.0.using_encoded(|s| t.append_message(b"core", s)); t @@ -167,7 +197,7 @@ fn relay_vrf_delay_tranche( num_delay_tranches: u32, zeroth_delay_tranche_width: u32, ) -> DelayTranche { - let bytes: [u8; 4] = vrf_in_out.make_bytes(approval_types::TRANCHE_RANDOMNESS_CONTEXT); + let bytes: [u8; 4] = vrf_in_out.make_bytes(approval_types::v1::TRANCHE_RANDOMNESS_CONTEXT); // interpret as little-endian u32 and reduce by the number of tranches. let wide_tranche = @@ -178,17 +208,11 @@ fn relay_vrf_delay_tranche( } fn assigned_core_transcript(core_index: CoreIndex) -> Transcript { - let mut t = Transcript::new(approval_types::ASSIGNED_CORE_CONTEXT); + let mut t = Transcript::new(approval_types::v1::ASSIGNED_CORE_CONTEXT); core_index.0.using_encoded(|s| t.append_message(b"core", s)); t } -fn assigned_cores_transcript(core_indices: &Vec) -> Transcript { - let mut t = Transcript::new(approval_types::ASSIGNED_CORE_CONTEXT); - core_indices.using_encoded(|s| t.append_message(b"cores", s)); - t -} - /// Information about the world assignments are being produced in. #[derive(Clone, Debug)] pub(crate) struct Config { @@ -231,14 +255,15 @@ pub(crate) trait AssignmentCriteria { fn check_assignment_cert( &self, - claimed_core_index: Vec, + claimed_core_index: Option, validator_index: ValidatorIndex, config: &Config, relay_vrf_story: RelayVRFStory, assignment: &AssignmentCertV2, - // Backing groups for each assigned core `CoreIndex`. + // Backing groups for each "leaving core". backing_groups: Vec, - ) -> Result; + // TODO: maybe define record or something else than tuple + ) -> Result<(CoreBitfield, DelayTranche), InvalidAssignment>; } pub(crate) struct RealAssignmentCriteria; @@ -251,18 +276,18 @@ impl AssignmentCriteria for RealAssignmentCriteria { config: &Config, leaving_cores: Vec<(CandidateHash, CoreIndex, GroupIndex)>, ) -> HashMap { - compute_assignments(keystore, relay_vrf_story, config, leaving_cores, true) + compute_assignments(keystore, relay_vrf_story, config, leaving_cores, false) } fn check_assignment_cert( &self, - claimed_core_index: Vec, + claimed_core_index: Option, validator_index: ValidatorIndex, config: &Config, relay_vrf_story: RelayVRFStory, assignment: &AssignmentCertV2, backing_groups: Vec, - ) -> Result { + ) -> Result<(CoreBitfield, DelayTranche), InvalidAssignment> { check_assignment_cert( claimed_core_index, validator_index, @@ -396,7 +421,7 @@ fn compute_relay_vrf_modulo_assignments( // into closure. let core = &mut core; assignments_key.vrf_sign_extra_after_check( - relay_vrf_modulo_transcript(relay_vrf_story.clone(), rvm_sample), + relay_vrf_modulo_transcript_v1(relay_vrf_story.clone(), rvm_sample), |vrf_in_out| { *core = relay_vrf_modulo_core(&vrf_in_out, config.n_cores); if let Some((candidate_hash, _)) = @@ -436,6 +461,7 @@ fn compute_relay_vrf_modulo_assignments( tranche: 0, validator_index, triggered: false, + assignment_bitfield: core.into(), }); } } @@ -450,14 +476,10 @@ fn compute_relay_vrf_modulo_assignments_v2( assignments: &mut HashMap, ) { let mut assigned_cores = Vec::new(); - // for rvm_sample in 0..config.relay_vrf_modulo_samples { let maybe_assignment = { let assigned_cores = &mut assigned_cores; - assignments_key.vrf_sign_extra_after_check( - relay_vrf_modulo_transcript( - relay_vrf_story.clone(), - config.relay_vrf_modulo_samples - 1, - ), + assignments_key.vrf_sign_after_check( + relay_vrf_modulo_transcript_v2(relay_vrf_story.clone()), |vrf_in_out| { *assigned_cores = relay_vrf_modulo_cores( &vrf_in_out, @@ -476,12 +498,12 @@ fn compute_relay_vrf_modulo_assignments_v2( ?assigned_cores, ?validator_index, tranche = 0, - "RelayVRFModuloCompact Assignment." + "Produced RelayVRFModuloCompact Assignment." ); - Some(assigned_cores_transcript(assigned_cores)) + true } else { - None + false } }, ) @@ -489,18 +511,24 @@ fn compute_relay_vrf_modulo_assignments_v2( if let Some(assignment) = maybe_assignment.map(|(vrf_in_out, vrf_proof, _)| { let cert = AssignmentCertV2 { - kind: AssignmentCertKindV2::RelayVRFModuloCompact { - sample: config.relay_vrf_modulo_samples - 1, - core_indices: assigned_cores.clone(), - }, + kind: AssignmentCertKindV2::RelayVRFModuloCompact, vrf: ( approval_types::VRFOutput(vrf_in_out.to_output()), approval_types::VRFProof(vrf_proof), ), }; - // All assignments of type RelayVRFModulo have tranche 0. - OurAssignment { cert, tranche: 0, validator_index, triggered: false } + // All assignments of type RelayVRFModuloCompact have tranche 0. + OurAssignment { + cert, + tranche: 0, + validator_index, + triggered: false, + assignment_bitfield: assigned_cores + .clone() + .try_into() + .expect("Just checked `!assigned_cores.is_empty()`; qed"), + } }) { for core_index in assigned_cores { assignments.insert(core_index, assignment.clone()); @@ -534,7 +562,13 @@ fn compute_relay_vrf_delay_assignments( ), }; - let our_assignment = OurAssignment { cert, tranche, validator_index, triggered: false }; + let our_assignment = OurAssignment { + cert, + tranche, + validator_index, + triggered: false, + assignment_bitfield: core.into(), + }; let used = match assignments.entry(core) { Entry::Vacant(e) => { @@ -588,12 +622,14 @@ pub(crate) enum InvalidAssignmentReason { VRFDelayCoreIndexMismatch, VRFDelayOutputMismatch, InvalidArguments, + /// Assignment vrf check resulted in 0 assigned cores. + NullAssignment, } /// Checks the crypto of an assignment cert. Failure conditions: /// * Validator index out of bounds /// * VRF signature check fails -/// * VRF output doesn't match assigned core +/// * VRF output doesn't match assigned cores /// * Core is not covered by extra data in signature /// * Core index out of bounds /// * Sample is out of bounds @@ -601,14 +637,17 @@ pub(crate) enum InvalidAssignmentReason { /// /// This function does not check whether the core is actually a valid assignment or not. That should be done /// outside the scope of this function. +/// +/// For v2 assignments of type `AssignmentCertKindV2::RelayVRFModuloCompact` we don't need to pass +/// `claimed_core_index` it won't be used in the check. pub(crate) fn check_assignment_cert( - claimed_core_indices: Vec, + claimed_core_index: Option, validator_index: ValidatorIndex, config: &Config, relay_vrf_story: RelayVRFStory, assignment: &AssignmentCertV2, backing_groups: Vec, -) -> Result { +) -> Result<(CoreBitfield, DelayTranche), InvalidAssignment> { use InvalidAssignmentReason as Reason; let validator_public = config @@ -619,20 +658,14 @@ pub(crate) fn check_assignment_cert( let public = schnorrkel::PublicKey::from_bytes(validator_public.as_slice()) .map_err(|_| InvalidAssignment(Reason::InvalidAssignmentKey))?; - // Check that we have all backing groups for claimed cores. - if claimed_core_indices.is_empty() && claimed_core_indices.len() != backing_groups.len() { - return Err(InvalidAssignment(Reason::InvalidArguments)) - } - - // Check that the validator was not part of the backing group + // For v1 assignments Check that the validator was not part of the backing group // and not already assigned. - for (claimed_core, backing_group) in claimed_core_indices.iter().zip(backing_groups.iter()) { - if claimed_core.0 >= config.n_cores { + if let Some(claimed_core_index) = claimed_core_index.as_ref() { + if claimed_core_index.0 >= config.n_cores { return Err(InvalidAssignment(Reason::CoreIndexOutOfBounds)) } - let is_in_backing = - is_in_backing_group(&config.validator_groups, validator_index, *backing_group); + is_in_backing_group(&config.validator_groups, validator_index, backing_groups[0]); if is_in_backing { return Err(InvalidAssignment(Reason::IsInBackingGroup)) @@ -641,72 +674,86 @@ pub(crate) fn check_assignment_cert( let &(ref vrf_output, ref vrf_proof) = &assignment.vrf; match &assignment.kind { - AssignmentCertKindV2::RelayVRFModuloCompact { sample, core_indices } => { - if *sample >= config.relay_vrf_modulo_samples { - return Err(InvalidAssignment(Reason::SampleOutOfBounds)) - } - + AssignmentCertKindV2::RelayVRFModuloCompact => { let (vrf_in_out, _) = public - .vrf_verify_extra( - relay_vrf_modulo_transcript(relay_vrf_story, *sample), + .vrf_verify( + relay_vrf_modulo_transcript_v2(relay_vrf_story), &vrf_output.0, &vrf_proof.0, - assigned_cores_transcript(core_indices), ) .map_err(|_| InvalidAssignment(Reason::VRFModuloOutputMismatch))?; - let resulting_cores = relay_vrf_modulo_cores(&vrf_in_out, *sample + 1, config.n_cores); - - // TODO: Enforce that all claimable cores are claimed, or include refused cores. - // Currently validators can opt out of checking specific cores. - // This is the same issue to how validator can opt out and not send their assignments in the first place. + // Get unique core assignments from the VRF wrt `config.n_cores`. + // Some of the core indices might be invalid, as there was no candidate included in the + // relay chain block for that core. + // + // The caller must check if the claimed candidate indices are valid + // and refer to the valid subset of cores outputed by the VRF here. + let vrf_unique_cores = relay_vrf_modulo_cores( + &vrf_in_out, + config.relay_vrf_modulo_samples, + config.n_cores, + ); - // Ensure that the `vrf_in_out` actually includes all of the claimed cores. - if claimed_core_indices + // Filter out cores in which the validator is in the backing group. + let resulting_cores = vrf_unique_cores .iter() - .fold(true, |cores_match, core| cores_match & resulting_cores.contains(core)) - { - Ok(0) - } else { - gum::debug!( - target: LOG_TARGET, - ?resulting_cores, - ?claimed_core_indices, - "Assignment claimed cores mismatch", - ); - Err(InvalidAssignment(Reason::VRFModuloCoreIndexMismatch)) - } + .zip(backing_groups.iter()) + .filter_map(|(core, backing_group)| { + if is_in_backing_group( + &config.validator_groups, + validator_index, + *backing_group, + ) { + None + } else { + Some(*core) + } + }) + .collect::>(); + + CoreBitfield::try_from(resulting_cores) + .map(|bitfield| (bitfield, 0)) + .map_err(|_| InvalidAssignment(Reason::NullAssignment)) }, AssignmentCertKindV2::RelayVRFModulo { sample } => { if *sample >= config.relay_vrf_modulo_samples { return Err(InvalidAssignment(Reason::SampleOutOfBounds)) } + // This is a v1 assignment for which we need the core index. + let claimed_core_index = + claimed_core_index.ok_or(InvalidAssignment(Reason::InvalidArguments))?; + let (vrf_in_out, _) = public .vrf_verify_extra( - relay_vrf_modulo_transcript(relay_vrf_story, *sample), + relay_vrf_modulo_transcript_v1(relay_vrf_story, *sample), &vrf_output.0, &vrf_proof.0, - assigned_core_transcript(claimed_core_indices[0]), + assigned_core_transcript(claimed_core_index), ) .map_err(|_| InvalidAssignment(Reason::VRFModuloOutputMismatch))?; let core = relay_vrf_modulo_core(&vrf_in_out, config.n_cores); // ensure that the `vrf_in_out` actually gives us the claimed core. - if core == claimed_core_indices[0] { - Ok(0) + if core == claimed_core_index { + Ok((core.into(), 0)) } else { gum::debug!( target: LOG_TARGET, ?core, - ?claimed_core_indices, - "Assignment claimed cores mismatch", + ?claimed_core_index, + "Assignment claimed core mismatch", ); Err(InvalidAssignment(Reason::VRFModuloCoreIndexMismatch)) } }, AssignmentCertKindV2::RelayVRFDelay { core_index } => { - if *core_index != claimed_core_indices[0] { + // This is a v1 assignment for which we need the core index. + let claimed_core_index = + claimed_core_index.ok_or(InvalidAssignment(Reason::InvalidArguments))?; + + if *core_index != claimed_core_index { return Err(InvalidAssignment(Reason::VRFDelayCoreIndexMismatch)) } @@ -718,10 +765,13 @@ pub(crate) fn check_assignment_cert( ) .map_err(|_| InvalidAssignment(Reason::VRFDelayOutputMismatch))?; - Ok(relay_vrf_delay_tranche( - &vrf_in_out, - config.n_delay_tranches, - config.zeroth_delay_tranche_width, + Ok(( + (*core_index).into(), + relay_vrf_delay_tranche( + &vrf_in_out, + config.n_delay_tranches, + config.zeroth_delay_tranche_width, + ), )) }, } diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 9777bdab97f1..81f5708fdf86 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -24,8 +24,9 @@ use polkadot_node_jaeger as jaeger; use polkadot_node_primitives::{ approval::{ - AssignmentCertKindV2, AssignmentCertV2, BlockApprovalMeta, DelayTranche, - IndirectAssignmentCertV2, IndirectSignedApprovalVote, + v2::{BitfieldError, CandidateBitfield, CoreBitfield}, + AssignmentCertKindV2, BlockApprovalMeta, DelayTranche, IndirectAssignmentCertV2, + IndirectSignedApprovalVote, }, ValidationResult, APPROVAL_EXECUTION_TIMEOUT, }; @@ -76,6 +77,7 @@ use std::{ }; use approval_checking::RequiredTranches; +use bitvec::{order::Lsb0, vec::BitVec}; use criteria::{AssignmentCriteria, RealAssignmentCriteria}; use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry}; use time::{slot_number_to_tick, Clock, ClockExt, SystemClock, Tick}; @@ -741,11 +743,11 @@ enum Action { tick: Tick, }, LaunchApproval { + claimed_core_indices: CoreBitfield, candidate_hash: CandidateHash, indirect_cert: IndirectAssignmentCertV2, assignment_tranche: DelayTranche, relay_block_hash: Hash, - candidate_index: CandidateIndex, session: SessionIndex, candidate: CandidateReceipt, backing_group: GroupIndex, @@ -956,11 +958,11 @@ async fn handle_actions( actions_iter = next_actions.into_iter(); }, Action::LaunchApproval { + claimed_core_indices, candidate_hash, indirect_cert, assignment_tranche, relay_block_hash, - candidate_index, session, candidate, backing_group, @@ -984,14 +986,27 @@ async fn handle_actions( }, }; - // Get all candidate indices in case this is a compact module vrf assignment. - let candidate_indices = - cores_to_candidate_indices(&block_entry, candidate_index, &indirect_cert.cert); - - ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment( - indirect_cert, - candidate_indices, - )); + // Get an assignment bitfield for the given claimed cores. + match cores_to_candidate_indices(&claimed_core_indices, &block_entry) { + Ok(bitfield) => { + ctx.send_unbounded_message( + ApprovalDistributionMessage::DistributeAssignment( + indirect_cert, + bitfield, + ), + ); + }, + Err(err) => { + // Never happens, it should only happen if no cores are claimed, which is a bug. + gum::warn!( + target: LOG_TARGET, + ?block_hash, + ?err, + "Failed to create assignment bitfield" + ); + continue + }, + }; match approvals_cache.get(&candidate_hash) { Some(ApprovalOutcome::Approved) => { @@ -1049,26 +1064,23 @@ async fn handle_actions( } fn cores_to_candidate_indices( + core_indices: &CoreBitfield, block_entry: &BlockEntry, - candidate_index: CandidateIndex, - cert: &AssignmentCertV2, -) -> Vec { +) -> Result { let mut candidate_indices = Vec::new(); - match &cert.kind { - AssignmentCertKindV2::RelayVRFModuloCompact { sample: _, core_indices } => { - for cert_core_index in core_indices { - if let Some(candidate_index) = block_entry - .candidates() - .iter() - .position(|(core_index, _)| core_index == cert_core_index) - { - candidate_indices.push(candidate_index as _) - } - } - }, - _ => candidate_indices.push(candidate_index as _), + + // Map from core index to candidate index. + for claimed_core_index in core_indices.iter_ones() { + if let Some(candidate_index) = block_entry + .candidates() + .iter() + .position(|(core_index, _)| core_index.0 == claimed_core_index as u32) + { + candidate_indices.push(candidate_index as CandidateIndex); + } } - candidate_indices + + CandidateBitfield::try_from(candidate_indices) } fn distribution_messages_for_activation( @@ -1119,24 +1131,59 @@ fn distribution_messages_for_activation( match approval_entry.local_statements() { (None, None) | (None, Some(_)) => {}, // second is impossible case. (Some(assignment), None) => { - messages.push(ApprovalDistributionMessage::DistributeAssignment( - IndirectAssignmentCertV2 { - block_hash, - validator: assignment.validator_index(), - cert: assignment.cert().clone(), + match cores_to_candidate_indices( + assignment.assignment_bitfield(), + &block_entry, + ) { + Ok(bitfield) => messages.push( + ApprovalDistributionMessage::DistributeAssignment( + IndirectAssignmentCertV2 { + block_hash, + validator: assignment.validator_index(), + cert: assignment.cert().clone(), + }, + bitfield, + ), + ), + Err(err) => { + // Should never happen. If we fail here it means the assignment is null (no cores claimed). + gum::warn!( + target: LOG_TARGET, + ?block_hash, + ?candidate_hash, + ?err, + "Failed to create assignment bitfield", + ); }, - cores_to_candidate_indices(&block_entry, i as _, assignment.cert()), - )); + } }, (Some(assignment), Some(approval_sig)) => { - messages.push(ApprovalDistributionMessage::DistributeAssignment( - IndirectAssignmentCertV2 { - block_hash, - validator: assignment.validator_index(), - cert: assignment.cert().clone(), + match cores_to_candidate_indices( + assignment.assignment_bitfield(), + &block_entry, + ) { + Ok(bitfield) => messages.push( + ApprovalDistributionMessage::DistributeAssignment( + IndirectAssignmentCertV2 { + block_hash, + validator: assignment.validator_index(), + cert: assignment.cert().clone(), + }, + bitfield, + ), + ), + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?block_hash, + ?candidate_hash, + ?err, + "Failed to create assignment bitfield", + ); + // If we didn't send assignment, we don't send approval. + continue }, - cores_to_candidate_indices(&block_entry, i as _, assignment.cert()), - )); + } messages.push(ApprovalDistributionMessage::DistributeApproval( IndirectSignedApprovalVote { @@ -1385,8 +1432,6 @@ async fn handle_approved_ancestor( const MAX_TRACING_WINDOW: usize = 200; const ABNORMAL_DEPTH_THRESHOLD: usize = 5; - use bitvec::{order::Lsb0, vec::BitVec}; - let mut span = jaeger::Span::new(&target, "approved-ancestor").with_stage(jaeger::Stage::ApprovalChecking); @@ -1712,7 +1757,7 @@ fn check_and_import_assignment( state: &State, db: &mut OverlayedBackend<'_, impl Backend>, assignment: IndirectAssignmentCertV2, - candidate_indices: Vec, + candidate_indices: CandidateBitfield, ) -> SubsystemResult<(AssignmentCheckResult, Vec)> { let tick_now = state.clock.tick_now(); @@ -1743,14 +1788,14 @@ fn check_and_import_assignment( let mut claimed_core_indices = Vec::new(); let mut assigned_candidate_hashes = Vec::new(); - for candidate_index in candidate_indices.iter() { + for candidate_index in candidate_indices.iter_ones() { let (claimed_core_index, assigned_candidate_hash) = - match block_entry.candidate(*candidate_index as usize) { + match block_entry.candidate(candidate_index) { Some((c, h)) => (*c, *h), None => return Ok(( AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex( - *candidate_index, + candidate_index as _, )), Vec::new(), )), // no candidate at core. @@ -1761,7 +1806,7 @@ fn check_and_import_assignment( None => return Ok(( AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate( - *candidate_index, + candidate_index as _, assigned_candidate_hash, )), Vec::new(), @@ -1785,9 +1830,18 @@ fn check_and_import_assignment( assigned_candidate_hashes.push(assigned_candidate_hash); } + let claimed_core_index = match assignment.cert.kind { + // TODO: remove CoreIndex from certificates completely. + // https://github.com/paritytech/polkadot/issues/6988 + AssignmentCertKindV2::RelayVRFDelay { .. } => Some(claimed_core_indices[0]), + AssignmentCertKindV2::RelayVRFModulo { .. } => Some(claimed_core_indices[0]), + // VRelayVRFModuloCompact assignment doesn't need the the claimed cores for checking. + AssignmentCertKindV2::RelayVRFModuloCompact => None, + }; + // Check the assignment certificate. let res = state.assignment_criteria.check_assignment_cert( - claimed_core_indices.clone(), + claimed_core_index, assignment.validator, &criteria::Config::from(session_info), block_entry.relay_vrf_story(), @@ -1795,7 +1849,7 @@ fn check_and_import_assignment( backing_groups, ); - let tranche = match res { + let (claimed_core_indices, tranche) = match res { Err(crate::criteria::InvalidAssignment(reason)) => return Ok(( AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert( @@ -1804,7 +1858,7 @@ fn check_and_import_assignment( )), Vec::new(), )), - Ok(tranche) => { + Ok((claimed_core_indices, tranche)) => { let current_tranche = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot()); @@ -1814,7 +1868,7 @@ fn check_and_import_assignment( return Ok((AssignmentCheckResult::TooFarInFuture, Vec::new())) } - tranche + (claimed_core_indices, tranche) }, }; @@ -1823,14 +1877,14 @@ fn check_and_import_assignment( let mut is_duplicate = false; // Import the assignments for all cores in the cert. for (assigned_candidate_hash, candidate_index) in - assigned_candidate_hashes.iter().zip(candidate_indices) + assigned_candidate_hashes.iter().zip(candidate_indices.iter_ones()) { let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? { Some(c) => c, None => return Ok(( AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate( - candidate_index, + candidate_index as _, *assigned_candidate_hash, )), Vec::new(), @@ -2294,34 +2348,28 @@ fn process_wakeup( None }; - if let Some((cert, val_index, tranche)) = maybe_cert { + if let Some((claimed_core_indices, cert, val_index, tranche)) = maybe_cert { let indirect_cert = IndirectAssignmentCertV2 { block_hash: relay_block, validator: val_index, cert }; - let index_in_candidate = - block_entry.candidates().iter().position(|(_, h)| &candidate_hash == h); - - if let Some(i) = index_in_candidate { - gum::trace!( - target: LOG_TARGET, - ?candidate_hash, - para_id = ?candidate_receipt.descriptor.para_id, - block_hash = ?relay_block, - "Launching approval work.", - ); + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + para_id = ?candidate_receipt.descriptor.para_id, + block_hash = ?relay_block, + "Launching approval work.", + ); - // sanity: should always be present. - actions.push(Action::LaunchApproval { - candidate_hash, - indirect_cert, - assignment_tranche: tranche, - relay_block_hash: relay_block, - candidate_index: i as _, - session: block_entry.session(), - candidate: candidate_receipt, - backing_group, - }); - } + actions.push(Action::LaunchApproval { + claimed_core_indices, + candidate_hash, + indirect_cert, + assignment_tranche: tranche, + relay_block_hash: relay_block, + session: block_entry.session(), + candidate: candidate_receipt, + backing_group, + }); } // Although we checked approval earlier in this function, diff --git a/node/core/approval-voting/src/persisted_entries.rs b/node/core/approval-voting/src/persisted_entries.rs index 91e7a381d637..6c5cb0de53a3 100644 --- a/node/core/approval-voting/src/persisted_entries.rs +++ b/node/core/approval-voting/src/persisted_entries.rs @@ -20,16 +20,20 @@ //! Within that context, things are plain-old-data. Within this module, //! data and logic are intertwined. -use polkadot_node_primitives::approval::{AssignmentCertV2, DelayTranche, RelayVRFStory}; +use polkadot_node_primitives::approval::{ + v2::CoreBitfield, AssignmentCertV2, DelayTranche, RelayVRFStory, +}; use polkadot_primitives::{ BlockNumber, CandidateHash, CandidateReceipt, CoreIndex, GroupIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature, }; use sp_consensus_slots::Slot; -use bitvec::{order::Lsb0 as BitOrderLsb0, slice::BitSlice, vec::BitVec}; +use bitvec::{order::Lsb0 as BitOrderLsb0, slice::BitSlice}; use std::collections::BTreeMap; +use crate::approval_db::v1::Bitfield; + use super::{criteria::OurAssignment, time::Tick}; /// Metadata regarding a specific tranche of assignments for a specific candidate. @@ -80,7 +84,7 @@ pub struct ApprovalEntry { our_assignment: Option, our_approval_sig: Option, // `n_validators` bits. - assignments: BitVec, + assigned_validators: Bitfield, approved: bool, } @@ -92,10 +96,17 @@ impl ApprovalEntry { our_assignment: Option, our_approval_sig: Option, // `n_validators` bits. - assignments: BitVec, + assigned_validators: Bitfield, approved: bool, ) -> Self { - Self { tranches, backing_group, our_assignment, our_approval_sig, assignments, approved } + Self { + tranches, + backing_group, + our_assignment, + our_approval_sig, + assigned_validators, + approved, + } } // Access our assignment for this approval entry. @@ -107,7 +118,7 @@ impl ApprovalEntry { pub fn trigger_our_assignment( &mut self, tick_now: Tick, - ) -> Option<(AssignmentCertV2, ValidatorIndex, DelayTranche)> { + ) -> Option<(CoreBitfield, AssignmentCertV2, ValidatorIndex, DelayTranche)> { let our = self.our_assignment.as_mut().and_then(|a| { if a.triggered() { return None @@ -120,7 +131,7 @@ impl ApprovalEntry { our.map(|a| { self.import_assignment(a.tranche(), a.validator_index(), tick_now); - (a.cert().clone(), a.validator_index(), a.tranche()) + (a.assignment_bitfield().clone(), a.cert().clone(), a.validator_index(), a.tranche()) }) } @@ -131,7 +142,10 @@ impl ApprovalEntry { /// Whether a validator is already assigned. pub fn is_assigned(&self, validator_index: ValidatorIndex) -> bool { - self.assignments.get(validator_index.0 as usize).map(|b| *b).unwrap_or(false) + self.assigned_validators + .get(validator_index.0 as usize) + .map(|b| *b) + .unwrap_or(false) } /// Import an assignment. No-op if already assigned on the same tranche. @@ -158,14 +172,14 @@ impl ApprovalEntry { }; self.tranches[idx].assignments.push((validator_index, tick_now)); - self.assignments.set(validator_index.0 as _, true); + self.assigned_validators.set(validator_index.0 as _, true); } // Produce a bitvec indicating the assignments of all validators up to and // including `tranche`. - pub fn assignments_up_to(&self, tranche: DelayTranche) -> BitVec { + pub fn assignments_up_to(&self, tranche: DelayTranche) -> Bitfield { self.tranches.iter().take_while(|e| e.tranche <= tranche).fold( - bitvec::bitvec![u8, BitOrderLsb0; 0; self.assignments.len()], + bitvec::bitvec![u8, BitOrderLsb0; 0; self.assigned_validators.len()], |mut a, e| { for &(v, _) in &e.assignments { a.set(v.0 as _, true); @@ -193,12 +207,12 @@ impl ApprovalEntry { /// Get the number of validators in this approval entry. pub fn n_validators(&self) -> usize { - self.assignments.len() + self.assigned_validators.len() } /// Get the number of assignments by validators, including the local validator. pub fn n_assignments(&self) -> usize { - self.assignments.count_ones() + self.assigned_validators.count_ones() } /// Get the backing group index of the approval entry. @@ -226,7 +240,7 @@ impl From for ApprovalEntry { backing_group: entry.backing_group, our_assignment: entry.our_assignment.map(Into::into), our_approval_sig: entry.our_approval_sig.map(Into::into), - assignments: entry.assignments, + assigned_validators: entry.assigned_validators, approved: entry.approved, } } @@ -239,7 +253,7 @@ impl From for crate::approval_db::v1::ApprovalEntry { backing_group: entry.backing_group, our_assignment: entry.our_assignment.map(Into::into), our_approval_sig: entry.our_approval_sig.map(Into::into), - assignments: entry.assignments, + assigned_validators: entry.assigned_validators, approved: entry.approved, } } @@ -253,7 +267,7 @@ pub struct CandidateEntry { // Assignments are based on blocks, so we need to track assignments separately // based on the block we are looking at. pub block_assignments: BTreeMap, - pub approvals: BitVec, + pub approvals: Bitfield, } impl CandidateEntry { @@ -336,7 +350,7 @@ pub struct BlockEntry { // A bitfield where the i'th bit corresponds to the i'th candidate in `candidates`. // The i'th bit is `true` iff the candidate has been approved in the context of this // block. The block can be considered approved if the bitfield has all bits set to `true`. - pub approved_bitfield: BitVec, + pub approved_bitfield: Bitfield, pub children: Vec, } diff --git a/node/network/approval-distribution/Cargo.toml b/node/network/approval-distribution/Cargo.toml index 3e1069334056..ef8bbfdb778c 100644 --- a/node/network/approval-distribution/Cargo.toml +++ b/node/network/approval-distribution/Cargo.toml @@ -15,6 +15,7 @@ itertools = "0.10.5" futures = "0.3.21" gum = { package = "tracing-gum", path = "../../gum" } +bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } [dev-dependencies] sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 4d1854893651..8be931dee135 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -32,6 +32,7 @@ use polkadot_node_network_protocol::{ Versioned, View, }; use polkadot_node_primitives::approval::{ + v2::{AsBitIndex, CandidateBitfield}, BlockApprovalMeta, IndirectAssignmentCertV2, IndirectSignedApprovalVote, }; use polkadot_node_subsystem::{ @@ -104,8 +105,8 @@ struct ApprovalRouting { struct ApprovalEntry { // The assignment certificate. assignment: IndirectAssignmentCertV2, - // The candidates claimed by the certificate. - candidates: HashSet, + // The candidates claimed by the certificate. A mapping between bit index and candidate index. + candidates: CandidateBitfield, // The approval signatures for each `CandidateIndex` claimed by the assignment certificate. approvals: HashMap, // The validator index of the assignment signer. @@ -114,17 +115,25 @@ struct ApprovalEntry { routing_info: ApprovalRouting, } +#[derive(Debug)] +enum ApprovalEntryError { + InvalidValidatorIndex, + CandidateIndexOutOfBounds, + InvalidCandidateIndex, + DuplicateApproval, +} + impl ApprovalEntry { pub fn new( assignment: IndirectAssignmentCertV2, - candidates: Vec, + candidates: CandidateBitfield, routing_info: ApprovalRouting, ) -> ApprovalEntry { Self { validator_index: assignment.validator, assignment, approvals: HashMap::with_capacity(candidates.len()), - candidates: HashSet::from_iter(candidates.into_iter()), + candidates, routing_info, } } @@ -132,23 +141,19 @@ impl ApprovalEntry { // Create a `MessageSubject` to reference the assignment. pub fn create_assignment_knowledge(&self, block_hash: Hash) -> (MessageSubject, MessageKind) { ( - MessageSubject( - block_hash, - self.candidates.iter().cloned().collect::>(), - self.validator_index, - ), + MessageSubject(block_hash, self.candidates.clone(), self.validator_index), MessageKind::Assignment, ) } - // Create a `MessageSubject` to reference the assignment. + // Create a `MessageSubject` to reference the approval. pub fn create_approval_knowledge( &self, block_hash: Hash, candidate_index: CandidateIndex, ) -> (MessageSubject, MessageKind) { ( - MessageSubject(block_hash, vec![candidate_index], self.validator_index), + MessageSubject(block_hash, candidate_index.into(), self.validator_index), MessageKind::Approval, ) } @@ -169,28 +174,37 @@ impl ApprovalEntry { } // Records a new approval. Returns false if the claimed candidate is not found or we already have received the approval. - // TODO: use specific errors instead of `bool`. - pub fn note_approval(&mut self, approval: IndirectSignedApprovalVote) -> bool { + pub fn note_approval( + &mut self, + approval: IndirectSignedApprovalVote, + ) -> Result<(), ApprovalEntryError> { // First do some sanity checks: // - check validator index matches // - check claimed candidate // - check for duplicate approval if self.validator_index != approval.validator { - return false + return Err(ApprovalEntryError::InvalidValidatorIndex) } - if !self.candidates.contains(&approval.candidate_index) || - self.approvals.contains_key(&approval.candidate_index) - { - return false + if self.candidates.len() <= approval.candidate_index as usize { + return Err(ApprovalEntryError::CandidateIndexOutOfBounds) } - self.approvals.insert(approval.candidate_index, approval).is_none() + if !self.candidates.bit_at(approval.candidate_index.as_bit_index()) { + return Err(ApprovalEntryError::InvalidCandidateIndex) + } + + if self.approvals.contains_key(&approval.candidate_index) { + return Err(ApprovalEntryError::DuplicateApproval) + } + + self.approvals.insert(approval.candidate_index, approval); + Ok(()) } // Get the assignment certiticate and claimed candidates. - pub fn get_assignment(&self) -> (IndirectAssignmentCertV2, Vec) { - (self.assignment.clone(), self.candidates.iter().cloned().collect::>()) + pub fn get_assignment(&self) -> (IndirectAssignmentCertV2, CandidateBitfield) { + (self.assignment.clone(), self.candidates.clone()) } // Get all approvals for all candidates claimed by the assignment. @@ -248,7 +262,7 @@ enum MessageKind { // Assignments can span multiple candidates, while approvals refer to only one candidate. // #[derive(Debug, Clone, Hash, PartialEq, Eq)] -struct MessageSubject(Hash, pub Vec, ValidatorIndex); +struct MessageSubject(Hash, pub CandidateBitfield, ValidatorIndex); #[derive(Debug, Clone, Default)] struct Knowledge { @@ -290,14 +304,18 @@ impl Knowledge { // In case of succesful insertion of multiple candidate assignments create additional // entries for each assigned candidate. This fakes knowledge of individual assignments, but // we need to share the same `MessageSubject` with the followup approval candidate index. - if kind == MessageKind::Assignment && success && message.1.len() > 1 { - message.1.iter().fold(success, |success, candidate_index| { - success & - self.insert( - MessageSubject(message.0.clone(), vec![*candidate_index], message.2), - kind, - ) - }) + if kind == MessageKind::Assignment && success && message.1.count_ones() > 1 { + message + .1 + .iter_ones() + .map(|candidate_index| candidate_index as CandidateIndex) + .fold(success, |success, candidate_index| { + success & + self.insert( + MessageSubject(message.0, candidate_index.into(), message.2), + kind, + ) + }) } else { success } @@ -336,7 +354,7 @@ struct BlockEntry { pub session: SessionIndex, /// Approval entries for whole block. These also contain all approvals in the cae of multiple candidates /// being claimed by assignments. - approval_entries: HashMap<(ValidatorIndex, Vec), ApprovalEntry>, + approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>, } impl BlockEntry { @@ -349,13 +367,13 @@ impl BlockEntry { // First map one entry per candidate to the same key we will use in `approval_entries`. // Key is (Validator_index, Vec) that links the `ApprovalEntry` to the (K,V) // entry in `candidate_entry.messages`. - for claimed_candidate_index in &entry.candidates { - match self.candidates.get_mut(*claimed_candidate_index as usize) { + for claimed_candidate_index in entry.candidates.iter_ones() { + match self.candidates.get_mut(claimed_candidate_index) { Some(candidate_entry) => { candidate_entry .messages .entry(entry.get_validator_index()) - .or_insert(entry.candidates.iter().cloned().collect::>()); + .or_insert(entry.candidates.clone()); }, None => { // This should never happen, but if it happens, it means the subsystem is broken. @@ -370,10 +388,7 @@ impl BlockEntry { } self.approval_entries - .entry(( - entry.validator_index, - entry.candidates.clone().into_iter().collect::>(), - )) + .entry((entry.validator_index, entry.candidates.clone())) .or_insert(entry) } @@ -441,7 +456,7 @@ impl BlockEntry { #[derive(Debug, Default)] struct CandidateEntry { // The value represents part of the lookup key in `approval_entries` to fetch the assignment and existing votes. - messages: HashMap>, + messages: HashMap, } #[derive(Debug, Clone, PartialEq)] @@ -460,7 +475,7 @@ impl MessageSource { } enum PendingMessage { - Assignment(IndirectAssignmentCertV2, Vec), + Assignment(IndirectAssignmentCertV2, CandidateBitfield), Approval(IndirectSignedApprovalVote), } @@ -686,7 +701,7 @@ impl State { ctx: &mut Context, metrics: &Metrics, peer_id: PeerId, - assignments: Vec<(IndirectAssignmentCertV2, Vec)>, + assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>, rng: &mut R, ) where R: CryptoRng + Rng, @@ -761,7 +776,7 @@ impl State { peer_id, assignments .into_iter() - .map(|(cert, candidate)| (cert.into(), vec![candidate])) + .map(|(cert, candidate)| (cert.into(), candidate.into())) .collect::>(), rng, ) @@ -907,7 +922,7 @@ impl State { metrics: &Metrics, source: MessageSource, assignment: IndirectAssignmentCertV2, - claimed_candidate_indices: Vec, + claimed_candidate_indices: CandidateBitfield, rng: &mut R, ) where R: CryptoRng + Rng, @@ -955,6 +970,15 @@ impl State { "Duplicate assignment", ); modify_reputation(ctx.sender(), peer_id, COST_DUPLICATE_MESSAGE).await; + } else { + gum::trace!( + target: LOG_TARGET, + ?peer_id, + hash = ?block_hash, + ?validator_index, + ?message_subject, + "We sent the message to the peer while peer was sending it to us. Known race condition.", + ); } return } @@ -1112,7 +1136,7 @@ impl State { .as_ref() .map(|t| t.local_grid_neighbors().route_to_peer(required_routing, &peer)) { - peers.push(peer.clone()); + peers.push(peer); continue } @@ -1124,7 +1148,7 @@ impl State { if route_random { approval_entry.routing_info_mut().random_routing.inc_sent(); - peers.push(peer.clone()); + peers.push(peer); } } @@ -1149,9 +1173,7 @@ impl State { let peers = peers .iter() .filter_map(|peer_id| { - self.peer_views - .get(peer_id) - .map(|peer_entry| (peer_id.clone(), peer_entry.version)) + self.peer_views.get(peer_id).map(|peer_entry| (*peer_id, peer_entry.version)) }) .collect::>(); @@ -1183,7 +1205,7 @@ impl State { }; // compute metadata on the assignment. - let message_subject = MessageSubject(block_hash, vec![candidate_index], validator_index); + let message_subject = MessageSubject(block_hash, candidate_index.into(), validator_index); let message_kind = MessageKind::Approval; if let Some(peer_id) = source.peer_id() { @@ -1317,7 +1339,7 @@ impl State { // Invariant: to our knowledge, none of the peers except for the `source` know about the approval. metrics.on_approval_imported(); - if !approval_entry.note_approval(vote.clone()) { + if let Err(err) = approval_entry.note_approval(vote.clone()) { // this would indicate a bug in approval-voting: // - validator index mismatch // - candidate index mismatch @@ -1327,7 +1349,8 @@ impl State { hash = ?block_hash, ?candidate_index, ?validator_index, - "Possible bug: Vote import failed: validator/candidate index mismatch or duplicate", + ?err, + "Possible bug: Vote import failed", ); return @@ -1429,13 +1452,12 @@ impl State { let sigs = block_entry .get_approval_entries(index) .into_iter() - .map(|approval_entry| { + .flat_map(|approval_entry| { approval_entry .get_approvals() .into_iter() .map(|approval| (approval.validator, approval.signature)) }) - .flatten() .collect::>(); all_sigs.extend(sigs); } @@ -1942,7 +1964,7 @@ pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero( // Low level helper for sending assignments. async fn send_assignments_batched_inner( sender: &mut impl overseer::ApprovalDistributionSenderTrait, - batch: Vec<(IndirectAssignmentCertV2, Vec)>, + batch: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>, peers: &Vec, // TODO: use `ValidationVersion`. peer_version: u32, @@ -1962,7 +1984,18 @@ async fn send_assignments_batched_inner( // `IndirectAssignmentCertV2` -> `IndirectAssignmentCert` let batch = batch .into_iter() - .filter_map(|(cert, candidates)| cert.try_into().ok().map(|cert| (cert, candidates[0]))) + .filter_map(|(cert, candidates)| { + cert.try_into().ok().map(|cert| { + ( + cert, + // First 1 bit index is the candidate index. + candidates + .first_one() + .map(|index| index as CandidateIndex) + .expect("Assignment was checked for not being empty; qed"), + ) + }) + }) .collect(); sender .send_message(NetworkBridgeTxMessage::SendValidationMessage( @@ -1982,7 +2015,7 @@ async fn send_assignments_batched_inner( /// of assignments and can `select!` other tasks. pub(crate) async fn send_assignments_batched( sender: &mut impl overseer::ApprovalDistributionSenderTrait, - v2_assignments: Vec<(IndirectAssignmentCertV2, Vec)>, + v2_assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>, peers: &Vec<(PeerId, ProtocolVersion)>, ) { let v1_peers = filter_by_peer_version(peers, ValidationVersion::V1.into()); @@ -1991,7 +2024,7 @@ pub(crate) async fn send_assignments_batched( if v1_peers.len() > 0 { let mut v1_assignments = v2_assignments.clone(); // Older peers(v1) do not understand `AssignmentsV2` messages, so we have to filter these out. - v1_assignments.retain(|(_, candidates)| candidates.len() == 1); + v1_assignments.retain(|(_, candidates)| candidates.count_ones() == 1); let mut v1_batches = v1_assignments.into_iter().peekable(); diff --git a/node/network/bridge/src/rx/mod.rs b/node/network/bridge/src/rx/mod.rs index 7408a0b25c71..d480f9c4a781 100644 --- a/node/network/bridge/src/rx/mod.rs +++ b/node/network/bridge/src/rx/mod.rs @@ -755,7 +755,7 @@ fn update_our_view( shared .validation_peers .iter() - .map(|(peer_id, peer_data)| (peer_id.clone(), peer_data.version)) + .map(|(peer_id, peer_data)| (*peer_id, peer_data.version)) .collect::>(), shared.collation_peers.keys().cloned().collect::>(), ) diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 90807558b255..46f432997c8b 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -428,11 +428,9 @@ impl_versioned_try_from!( pub mod vstaging { use parity_scale_codec::{Decode, Encode}; use polkadot_node_primitives::approval::{ - IndirectAssignmentCertV2, IndirectSignedApprovalVote, + v2::CandidateBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVote, }; - use polkadot_primitives::CandidateIndex; - // Re-export stuff that has not changed since v1. pub use crate::v1::{ declare_signature_payload, BitfieldDistributionMessage, CollationProtocol, @@ -461,10 +459,12 @@ pub mod vstaging { #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum ApprovalDistributionMessage { /// Assignments for candidates in recent, unfinalized blocks. + /// We use a bitfield to reference claimed candidates, where the bit index is equal to candidate index. /// /// Actually checking the assignment may yield a different result. + /// TODO: Look at getting rid of bitfield in the future. #[codec(index = 0)] - Assignments(Vec<(IndirectAssignmentCertV2, Vec)>), + Assignments(Vec<(IndirectAssignmentCertV2, CandidateBitfield)>), /// Approvals for candidates in some recent, unfinalized block. #[codec(index = 1)] Approvals(Vec), diff --git a/node/primitives/Cargo.toml b/node/primitives/Cargo.toml index c6812d2cc02c..0919a99d3ffb 100644 --- a/node/primitives/Cargo.toml +++ b/node/primitives/Cargo.toml @@ -20,6 +20,7 @@ polkadot-parachain = { path = "../../parachain", default-features = false } schnorrkel = "0.9.1" thiserror = "1.0.31" serde = { version = "1.0.137", features = ["derive"] } +bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } [target.'cfg(not(target_os = "unknown"))'.dependencies] zstd = { version = "0.11.2", default-features = false } diff --git a/node/primitives/src/approval.rs b/node/primitives/src/approval.rs index e51111e6e4c8..d7e7be861aab 100644 --- a/node/primitives/src/approval.rs +++ b/node/primitives/src/approval.rs @@ -31,24 +31,168 @@ use sp_consensus_babe as babe_primitives; /// Earlier tranches of validators check first, with later tranches serving as backup. pub type DelayTranche = u32; -/// A static context used to compute the Relay VRF story based on the -/// VRF output included in the header-chain. -pub const RELAY_VRF_STORY_CONTEXT: &[u8] = b"A&V RC-VRF"; +/// Static contexts use to generate randomness for v1 assignments. +pub mod v1 { + /// A static context used to compute the Relay VRF story based on the + /// VRF output included in the header-chain. + pub const RELAY_VRF_STORY_CONTEXT: &[u8] = b"A&V RC-VRF"; -/// A static context used for all relay-vrf-modulo VRFs. -pub const RELAY_VRF_MODULO_CONTEXT: &[u8] = b"A&V MOD"; + /// A static context used for all relay-vrf-modulo VRFs. + pub const RELAY_VRF_MODULO_CONTEXT: &[u8] = b"A&V MOD"; -/// A static context used for all relay-vrf-modulo VRFs. -pub const RELAY_VRF_DELAY_CONTEXT: &[u8] = b"A&V DELAY"; + /// A static context used for all relay-vrf-modulo VRFs. + pub const RELAY_VRF_DELAY_CONTEXT: &[u8] = b"A&V DELAY"; -/// A static context used for transcripts indicating assigned availability core. -pub const ASSIGNED_CORE_CONTEXT: &[u8] = b"A&V ASSIGNED"; + /// A static context used for transcripts indicating assigned availability core. + pub const ASSIGNED_CORE_CONTEXT: &[u8] = b"A&V ASSIGNED"; -/// A static context associated with producing randomness for a core. -pub const CORE_RANDOMNESS_CONTEXT: &[u8] = b"A&V CORE"; + /// A static context associated with producing randomness for a core. + pub const CORE_RANDOMNESS_CONTEXT: &[u8] = b"A&V CORE"; -/// A static context associated with producing randomness for a tranche. -pub const TRANCHE_RANDOMNESS_CONTEXT: &[u8] = b"A&V TRANCHE"; + /// A static context associated with producing randomness for a tranche. + pub const TRANCHE_RANDOMNESS_CONTEXT: &[u8] = b"A&V TRANCHE"; +} + +/// A list of primitives introduced by v2. +pub mod v2 { + use parity_scale_codec::{Decode, Encode}; + use std::ops::BitOr; + + use super::{CandidateIndex, CoreIndex}; + use bitvec::{prelude::Lsb0, vec::BitVec}; + + /// A static context associated with producing randomness for a core. + pub const CORE_RANDOMNESS_CONTEXT: &[u8] = b"A&V CORE v2"; + /// A static context associated with producing randomness for v2 multi-core assignments. + pub const ASSIGNED_CORE_CONTEXT: &[u8] = b"A&V ASSIGNED v2"; + /// A static context used for all relay-vrf-modulo VRFs for v2 multi-core assignments. + pub const RELAY_VRF_MODULO_CONTEXT: &[u8] = b"A&V MOD v2"; + /// A read-only bitvec wrapper + #[derive(Clone, Debug, Encode, Decode, Hash, PartialEq, Eq)] + pub struct Bitfield(BitVec, std::marker::PhantomData); + + /// A `read-only`, `non-zero` bitfield. + /// Each 1 bit identifies a candidate by the bitfield bit index. + pub type CandidateBitfield = Bitfield; + /// A bitfield of core assignments. + pub type CoreBitfield = Bitfield; + + /// Errors that can occur when creating and manipulating bitfields. + #[derive(Debug)] + pub enum BitfieldError { + /// All bits are zero. + NullAssignment, + } + + /// A bit index in `Bitfield`. + #[cfg_attr(test, derive(PartialEq, Clone))] + pub struct BitIndex(pub usize); + + /// Helper trait to convert primitives to `BitIndex`. + pub trait AsBitIndex { + /// Returns the index of the corresponding bit in `Bitfield`. + fn as_bit_index(&self) -> BitIndex; + } + + impl Bitfield { + /// Returns the bit value at specified `index`. If `index` is greater than bitfield size, + /// returns `false`. + pub fn bit_at(&self, index: BitIndex) -> bool { + if self.0.len() <= index.0 { + false + } else { + self.0[index.0] + } + } + + /// Returns number of bits. + pub fn len(&self) -> usize { + self.0.len() + } + + /// Returns the number of 1 bits. + pub fn count_ones(&self) -> usize { + self.0.count_ones() + } + + /// Returns the index of the first 1 bit. + pub fn first_one(&self) -> Option { + self.0.first_one() + } + + /// Returns an iterator over inner bits. + pub fn iter_ones(&self) -> bitvec::slice::IterOnes { + self.0.iter_ones() + } + + /// For testing purpose, we want a inner mutable ref. + #[cfg(test)] + pub fn inner_mut(&mut self) -> &mut BitVec { + &mut self.0 + } + } + + impl AsBitIndex for CandidateIndex { + fn as_bit_index(&self) -> BitIndex { + BitIndex(*self as usize) + } + } + + impl AsBitIndex for CoreIndex { + fn as_bit_index(&self) -> BitIndex { + BitIndex(self.0 as usize) + } + } + + impl AsBitIndex for usize { + fn as_bit_index(&self) -> BitIndex { + BitIndex(*self) + } + } + + impl From for Bitfield + where + T: AsBitIndex, + { + fn from(value: T) -> Self { + Self( + { + let mut bv = bitvec::bitvec![u8, Lsb0; 0; value.as_bit_index().0 + 1]; + bv.set(value.as_bit_index().0, true); + bv + }, + Default::default(), + ) + } + } + + impl TryFrom> for Bitfield + where + T: Into>, + { + type Error = BitfieldError; + + fn try_from(mut value: Vec) -> Result { + if value.is_empty() { + return Err(BitfieldError::NullAssignment) + } + + let initial_bitfield = + value.pop().expect("Just checked above it's not empty; qed").into(); + + Ok(Self( + value.into_iter().fold(initial_bitfield.0, |initial_bitfield, element| { + let mut bitfield: Bitfield = element.into(); + bitfield + .0 + .resize(std::cmp::max(initial_bitfield.len(), bitfield.0.len()), false); + bitfield.0.bitor(initial_bitfield) + }), + Default::default(), + )) + } + } +} /// random bytes derived from the VRF submitted within the block by the /// block author as a credential and used as input to approval assignment criteria. @@ -84,25 +228,20 @@ pub enum AssignmentCertKindV2 { /// An assignment story based on the VRF that authorized the relay-chain block where the /// candidate was included combined with a sample number. /// - /// The context used to produce bytes is [`RELAY_VRF_MODULO_CONTEXT`] + /// The context used to produce bytes is [`v2::RELAY_VRF_MODULO_CONTEXT`] RelayVRFModulo { /// The sample number used in this cert. sample: u32, }, /// Multiple assignment stories based on the VRF that authorized the relay-chain block where the - /// candidate was included combined with a sample number. + /// candidates were included. /// - /// The context used to produce bytes is [`RELAY_VRF_MODULO_CONTEXT`] - RelayVRFModuloCompact { - /// The number of samples. - sample: u32, - /// The assigned cores. - core_indices: Vec, - }, + /// The context is [`v2::RELAY_VRF_MODULO_CONTEXT`] + RelayVRFModuloCompact, /// An assignment story based on the VRF that authorized the relay-chain block where the /// candidate was included combined with the index of a particular core. /// - /// The context is [`RELAY_VRF_DELAY_CONTEXT`] + /// The context is [`v2::RELAY_VRF_DELAY_CONTEXT`] RelayVRFDelay { /// The core index chosen in this cert. core_index: CoreIndex, @@ -288,7 +427,7 @@ impl UnsafeVRFOutput { .0 .attach_input_hash(&pubkey, transcript) .map_err(ApprovalError::SchnorrkelSignature)?; - Ok(RelayVRFStory(inout.make_bytes(RELAY_VRF_STORY_CONTEXT))) + Ok(RelayVRFStory(inout.make_bytes(v1::RELAY_VRF_STORY_CONTEXT))) } } @@ -318,3 +457,58 @@ pub fn babe_unsafe_vrf_info(header: &Header) -> Option { None } + +#[cfg(test)] +mod test { + use super::{ + v2::{BitIndex, Bitfield}, + *, + }; + + #[test] + fn test_assignment_bitfield_from_vec() { + let candidate_indices = vec![1u32, 7, 3, 10, 45, 8, 200, 2]; + let max_index = *candidate_indices.iter().max().unwrap(); + let bitfield = Bitfield::try_from(candidate_indices.clone()).unwrap(); + let candidate_indices = + candidate_indices.into_iter().map(|i| BitIndex(i as usize)).collect::>(); + + // Test 1 bits. + for index in candidate_indices.clone() { + assert!(bitfield.bit_at(index)); + } + + // Test 0 bits. + for index in 0..max_index { + if candidate_indices.contains(&BitIndex(index as usize)) { + continue + } + assert!(!bitfield.bit_at(BitIndex(index as usize))); + } + } + + #[test] + fn test_assignment_bitfield_invariant_msb() { + let core_indices = vec![CoreIndex(1), CoreIndex(3), CoreIndex(10), CoreIndex(20)]; + let mut bitfield = Bitfield::try_from(core_indices.clone()).unwrap(); + assert!(bitfield.inner_mut().pop().unwrap()); + + for i in 0..1024 { + assert!(Bitfield::try_from(CoreIndex(i)).unwrap().inner_mut().pop().unwrap()); + assert!(Bitfield::try_from(i).unwrap().inner_mut().pop().unwrap()); + } + } + + #[test] + fn test_assignment_bitfield_basic() { + let bitfield = Bitfield::try_from(CoreIndex(0)).unwrap(); + assert!(bitfield.bit_at(BitIndex(0))); + assert!(!bitfield.bit_at(BitIndex(1))); + assert_eq!(bitfield.len(), 1); + + let mut bitfield = Bitfield::try_from(20 as CandidateIndex).unwrap(); + assert!(bitfield.bit_at(BitIndex(20))); + assert_eq!(bitfield.inner_mut().count_ones(), 1); + assert_eq!(bitfield.len(), 21); + } +} diff --git a/node/service/src/parachains_db/upgrade.rs b/node/service/src/parachains_db/upgrade.rs index 01d4fb62f7f6..7aee05f7d7c5 100644 --- a/node/service/src/parachains_db/upgrade.rs +++ b/node/service/src/parachains_db/upgrade.rs @@ -15,6 +15,8 @@ #![cfg(feature = "full-node")] +use kvdb::DBTransaction; + use super::{columns, other_io_error, DatabaseKind, LOG_TARGET}; use std::{ fs, io, @@ -28,7 +30,8 @@ type Version = u32; const VERSION_FILE_NAME: &'static str = "parachain_db_version"; /// Current db version. -const CURRENT_VERSION: Version = 2; +/// Version 3 changes approval db format for `OurAssignment`. +const CURRENT_VERSION: Version = 3; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -58,6 +61,8 @@ pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<() Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?, // 1 -> 2 migration Some(1) => migrate_from_version_1_to_2(db_path, db_kind)?, + // 2 -> 3 migration + Some(2) => migrate_from_version_2_to_3(db_path, db_kind)?, // Already at current version, do nothing. Some(CURRENT_VERSION) => (), // This is an arbitrary future version, we don't handle it. @@ -127,6 +132,19 @@ fn migrate_from_version_1_to_2(path: &Path, db_kind: DatabaseKind) -> Result<(), }) } +fn migrate_from_version_2_to_3(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> { + gum::info!(target: LOG_TARGET, "Migrating parachains db from version 2 to version 3 ..."); + + match db_kind { + DatabaseKind::ParityDB => paritydb_migrate_from_version_2_to_3(path), + DatabaseKind::RocksDB => rocksdb_migrate_from_version_2_to_3(path), + } + .and_then(|result| { + gum::info!(target: LOG_TARGET, "Migration complete! "); + Ok(result) + }) +} + /// Migration from version 0 to version 1: /// * the number of columns has changed from 3 to 5; fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { @@ -278,6 +296,41 @@ fn paritydb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> { Ok(()) } +/// Migration from version 2 to version 3. +/// Clears the approval voting db column which changed format and cannot be migrated. +fn paritydb_migrate_from_version_2_to_3(path: &Path) -> Result<(), Error> { + parity_db::clear_column( + path, + super::columns::v2::COL_APPROVAL_DATA.try_into().expect("Invalid column ID"), + ) + .map_err(|e| other_io_error(format!("Error clearing column {:?}", e)))?; + + Ok(()) +} + +/// Migration from version 2 to version 3. +/// Clears the approval voting db column because `OurAssignment` changed format. Not all +/// instances of it can be converted to new version so we need to wipe it clean. +fn rocksdb_migrate_from_version_2_to_3(path: &Path) -> Result<(), Error> { + use kvdb::DBOp; + use kvdb_rocksdb::{Database, DatabaseConfig}; + + let db_path = path + .to_str() + .ok_or_else(|| super::other_io_error("Invalid database path".into()))?; + let db_cfg = DatabaseConfig::with_columns(super::columns::v2::NUM_COLUMNS); + let db = Database::open(&db_cfg, db_path)?; + + // Wipe all entries in one operation. + let ops = vec![DBOp::DeletePrefix { + col: super::columns::v2::COL_APPROVAL_DATA, + prefix: kvdb::DBKey::from_slice(b""), + }]; + + let transaction = DBTransaction { ops }; + db.write(transaction)?; + Ok(()) +} #[cfg(test)] mod tests { use super::{columns::v2::*, *}; diff --git a/node/subsystem-types/Cargo.toml b/node/subsystem-types/Cargo.toml index 22528503ccc4..78dfa86f7c85 100644 --- a/node/subsystem-types/Cargo.toml +++ b/node/subsystem-types/Cargo.toml @@ -22,3 +22,4 @@ smallvec = "1.8.0" substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } thiserror = "1.0.31" async-trait = "0.1.57" +bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 078b9b5ed049..de24369af0db 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -33,7 +33,10 @@ use polkadot_node_network_protocol::{ UnifiedReputationChange, }; use polkadot_node_primitives::{ - approval::{BlockApprovalMeta, IndirectAssignmentCertV2, IndirectSignedApprovalVote}, + approval::{ + v2::CandidateBitfield, BlockApprovalMeta, IndirectAssignmentCertV2, + IndirectSignedApprovalVote, + }, AvailableData, BabeEpoch, BlockWeight, CandidateVotes, CollationGenerationConfig, CollationSecondedSignal, DisputeMessage, DisputeStatus, ErasureChunk, PoV, SignedDisputeStatement, SignedFullStatement, ValidationResult, @@ -770,7 +773,7 @@ pub enum ApprovalVotingMessage { /// Should not be sent unless the block hash is known. CheckAndImportAssignment( IndirectAssignmentCertV2, - Vec, + CandidateBitfield, oneshot::Sender, ), /// Check if the approval vote is valid and can be accepted by our view of the @@ -805,7 +808,7 @@ pub enum ApprovalDistributionMessage { NewBlocks(Vec), /// Distribute an assignment cert from the local validator. The cert is assumed /// to be valid, relevant, and for the given relay-parent and validator index. - DistributeAssignment(IndirectAssignmentCertV2, Vec), + DistributeAssignment(IndirectAssignmentCertV2, CandidateBitfield), /// Distribute an approval vote for the local validator. The approval vote is assumed to be /// valid, relevant, and the corresponding approval already issued. /// If not, the subsystem is free to drop the message. diff --git a/roadmap/implementers-guide/src/node/approval/approval-voting.md b/roadmap/implementers-guide/src/node/approval/approval-voting.md index 2761f21b1c2c..3f1eeedfc27a 100644 --- a/roadmap/implementers-guide/src/node/approval/approval-voting.md +++ b/roadmap/implementers-guide/src/node/approval/approval-voting.md @@ -54,6 +54,8 @@ struct OurAssignment { tranche: DelayTranche, validator_index: ValidatorIndex, triggered: bool, + /// A subset of the core indices obtained from the VRF output. + pub assignment_bitfield: AssignmentBitfield, } struct ApprovalEntry {