diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 9ee0b01df36..b6256c7827d 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -475,7 +475,11 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future attestations for later processing. - verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?; + verify_attestation_propagation_slot_range::( + &chain.slot_clock, + attestation.data().slot, + &chain.spec, + )?; // Check the attestation's epoch matches its target. if attestation.data().slot.epoch(T::EthSpec::slots_per_epoch()) @@ -538,7 +542,12 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // // Attestations must be for a known block. If the block is unknown, we simply drop the // attestation and do not delay consideration for later. - let head_block = verify_head_block_is_known(chain, attestation, None)?; + let head_block = verify_head_block_is_known( + chain, + attestation.data().beacon_block_root, + attestation.data().slot, + None, + )?; // Check the attestation target root is consistent with the head root. // @@ -547,7 +556,11 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // // Whilst this attestation *technically* could be used to add value to a block, it is // invalid in the spirit of the protocol. Here we choose safety over profit. - verify_attestation_target_root::(&head_block, attestation)?; + verify_attestation_target_root::( + &head_block, + attestation.data().target.root, + attestation.data().slot, + )?; // Ensure that the attestation has participants. if attestation.is_aggregation_bits_zero() { @@ -807,7 +820,11 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future attestations for later processing. - verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?; + verify_attestation_propagation_slot_range::( + &chain.slot_clock, + attestation.data().slot, + &chain.spec, + )?; // Check to ensure that the attestation is "unaggregated". I.e., it has exactly one // aggregation bit set. @@ -823,11 +840,19 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { // attestation and do not delay consideration for later. // // Enforce a maximum skip distance for unaggregated attestations. - let head_block = - verify_head_block_is_known(chain, attestation, chain.config.import_max_skip_slots)?; + let head_block = verify_head_block_is_known( + chain, + attestation.data().beacon_block_root, + attestation.data().slot, + chain.config.import_max_skip_slots, + )?; // Check the attestation target root is consistent with the head root. - verify_attestation_target_root::(&head_block, attestation)?; + verify_attestation_target_root::( + &head_block, + attestation.data().target.root, + attestation.data().slot, + )?; Ok(()) } @@ -1072,36 +1097,37 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { /// Case (1) is the exact thing we're trying to detect. However case (2) is a little different, but /// it's still fine to reject here because there's no need for us to handle attestations that are /// already finalized. -fn verify_head_block_is_known( +pub fn verify_head_block_is_known( chain: &BeaconChain, - attestation: AttestationRef, + attestation_beacon_block_root: Hash256, + attestation_slot: Slot, max_skip_slots: Option, ) -> Result { let block_opt = chain .canonical_head .fork_choice_read_lock() - .get_block(&attestation.data().beacon_block_root) + .get_block(&attestation_beacon_block_root) .or_else(|| { chain .early_attester_cache - .get_proto_block(attestation.data().beacon_block_root) + .get_proto_block(attestation_beacon_block_root) }); if let Some(block) = block_opt { // Reject any block that exceeds our limit on skipped slots. if let Some(max_skip_slots) = max_skip_slots { - if attestation.data().slot > block.slot + max_skip_slots { + if attestation_slot > block.slot + max_skip_slots { return Err(Error::TooManySkippedSlots { head_block_slot: block.slot, - attestation_slot: attestation.data().slot, + attestation_slot, }); } } Ok(block) - } else if chain.is_pre_finalization_block(attestation.data().beacon_block_root)? { + } else if chain.is_pre_finalization_block(attestation_beacon_block_root)? { Err(Error::HeadBlockFinalized { - beacon_block_root: attestation.data().beacon_block_root, + beacon_block_root: attestation_beacon_block_root, }) } else { // The block is either: @@ -1111,21 +1137,20 @@ fn verify_head_block_is_known( // 2) A post-finalization block that we don't know about yet. We'll queue // the attestation until the block becomes available (or we time out). Err(Error::UnknownHeadBlock { - beacon_block_root: attestation.data().beacon_block_root, + beacon_block_root: attestation_beacon_block_root, }) } } -/// Verify that the `attestation` is within the acceptable gossip propagation range, with reference +/// Verify that the `attestation_slot` is within the acceptable gossip propagation range, with reference /// to the current slot of the `chain`. /// /// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. -pub fn verify_propagation_slot_range( +pub fn verify_attestation_propagation_slot_range( slot_clock: &S, - attestation: AttestationRef, + attestation_slot: Slot, spec: &ChainSpec, ) -> Result<(), Error> { - let attestation_slot = attestation.data().slot; let latest_permissible_slot = slot_clock .now_with_future_tolerance(spec.maximum_gossip_clock_disparity()) .ok_or(BeaconChainError::UnableToReadSlot)?; @@ -1203,11 +1228,12 @@ pub fn verify_attestation_signature( /// `attestation.data.beacon_block_root`. pub fn verify_attestation_target_root( head_block: &ProtoBlock, - attestation: AttestationRef, + attestation_target_root: Hash256, + attestation_slot: Slot, ) -> Result<(), Error> { // Check the attestation target root. let head_block_epoch = head_block.slot.epoch(E::slots_per_epoch()); - let attestation_epoch = attestation.data().slot.epoch(E::slots_per_epoch()); + let attestation_epoch = attestation_slot.epoch(E::slots_per_epoch()); if head_block_epoch > attestation_epoch { // The epoch references an invalid head block from a future epoch. // @@ -1220,7 +1246,7 @@ pub fn verify_attestation_target_root( // Reference: // https://github.com/ethereum/eth2.0-specs/pull/2001#issuecomment-699246659 return Err(Error::InvalidTargetRoot { - attestation: attestation.data().target.root, + attestation: attestation_target_root, // It is not clear what root we should expect in this case, since the attestation is // fundamentally invalid. expected: None, @@ -1239,9 +1265,9 @@ pub fn verify_attestation_target_root( }; // Reject any attestation with an invalid target root. - if target_root != attestation.data().target.root { + if target_root != attestation_target_root { return Err(Error::InvalidTargetRoot { - attestation: attestation.data().target.root, + attestation: attestation_target_root, expected: Some(target_root), }); } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a78ae266e5a..eada2495de1 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -60,6 +60,7 @@ use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_B use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; +use crate::single_attestation_verification::SingleAttestationVerification; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -72,6 +73,7 @@ use crate::{ kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead, }; +use attestation::SingleAttestation; use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes}; use execution_layer::{ BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer, @@ -2042,6 +2044,35 @@ impl BeaconChain { ) } + /// Accepts some `SingleAttestation` from the network and attempts to verify it, returning `Ok(_)` if + /// it is valid to be (re)broadcast on the gossip network. + pub fn verify_single_attestation_for_gossip( + &self, + single_attestation: &SingleAttestation, + subnet_id: Option, + ) -> Result { + metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); + let _timer = + metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); + + let verify_attestation = + SingleAttestationVerification::verify(single_attestation, subnet_id, self)?; + + // This method is called for API and gossip attestations, so this covers all unaggregated attestation events + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_single_attestation_subscribers() { + // TODO(single-attestation) we should also emit the old attestation event? + event_handler.register(EventKind::SingleAttestation(Box::new( + verify_attestation.single_attestation.clone(), + ))); + } + } + + metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); + + Ok(verify_attestation) + } + /// Performs the same validation as `Self::verify_aggregated_attestation_for_gossip`, but for /// multiple attestations using batch BLS verification. Batch verification can provide /// significant CPU-time savings compared to individual verification. @@ -2169,6 +2200,23 @@ impl BeaconChain { }) } + /// Accepts a `SingleATtestation` object and attempts to verify it in the context of fork + /// choice. If it is valid it is applied to `self.fork_choice`. + pub fn apply_single_attestation_to_fork_choice( + &self, + single_attestation: &SingleAttestation, + ) -> Result<(), Error> { + self.canonical_head + .fork_choice_write_lock() + .on_attestation( + self.slot()?, + single_attestation.data.clone(), + vec![single_attestation.attester_index as u64], + AttestationFromBlock::False, + ) + .map_err(Into::into) + } + /// Accepts some attestation-type object and attempts to verify it in the context of fork /// choice. If it is valid it is applied to `self.fork_choice`. /// @@ -2178,13 +2226,15 @@ impl BeaconChain { /// - `VerifiedAggregatedAttestation` pub fn apply_attestation_to_fork_choice( &self, - verified: &impl VerifiedAttestation, + attestation_data: AttestationData, + attesting_indices: Vec, ) -> Result<(), Error> { self.canonical_head .fork_choice_write_lock() .on_attestation( self.slot()?, - verified.indexed_attestation().to_ref(), + attestation_data, + attesting_indices, AttestationFromBlock::False, ) .map_err(Into::into) @@ -2200,12 +2250,10 @@ impl BeaconChain { /// and no error is returned. pub fn add_to_naive_aggregation_pool( &self, - unaggregated_attestation: &impl VerifiedAttestation, + attestation: AttestationRef, ) -> Result<(), AttestationError> { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_AGG_POOL); - let attestation = unaggregated_attestation.attestation(); - match self.naive_aggregation_pool.write().insert(attestation) { Ok(outcome) => trace!( self.log, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 3ae19430aad..c27625f724f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1655,7 +1655,8 @@ impl ExecutionPendingBlock { match fork_choice.on_attestation( current_slot, - indexed_attestation, + indexed_attestation.data().clone(), + indexed_attestation.attesting_indices_to_vec(), AttestationFromBlock::True, ) { Ok(()) => Ok(()), diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 267d56220c9..93141ff1ba8 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -8,6 +8,7 @@ const DEFAULT_CHANNEL_CAPACITY: usize = 16; pub struct ServerSentEventHandler { attestation_tx: Sender>, + single_attestation_tx: Sender>, block_tx: Sender>, blob_sidecar_tx: Sender>, finalized_tx: Sender>, @@ -37,6 +38,7 @@ impl ServerSentEventHandler { pub fn new_with_capacity(log: Logger, capacity: usize) -> Self { let (attestation_tx, _) = broadcast::channel(capacity); + let (single_attestation_tx, _) = broadcast::channel(capacity); let (block_tx, _) = broadcast::channel(capacity); let (blob_sidecar_tx, _) = broadcast::channel(capacity); let (finalized_tx, _) = broadcast::channel(capacity); @@ -56,6 +58,7 @@ impl ServerSentEventHandler { Self { attestation_tx, + single_attestation_tx, block_tx, blob_sidecar_tx, finalized_tx, @@ -154,6 +157,10 @@ impl ServerSentEventHandler { .block_gossip_tx .send(kind) .map(|count| log_count("block gossip", count)), + EventKind::SingleAttestation(_) => self + .single_attestation_tx + .send(kind) + .map(|count| log_count("single_attestation", count)), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -164,6 +171,10 @@ impl ServerSentEventHandler { self.attestation_tx.subscribe() } + pub fn subscribe_single_attestation(&self) -> Receiver> { + self.single_attestation_tx.subscribe() + } + pub fn subscribe_block(&self) -> Receiver> { self.block_tx.subscribe() } @@ -232,6 +243,10 @@ impl ServerSentEventHandler { self.attestation_tx.receiver_count() > 0 } + pub fn has_single_attestation_subscribers(&self) -> bool { + self.single_attestation_tx.receiver_count() > 0 + } + pub fn has_block_subscribers(&self) -> bool { self.block_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 2953516fb1a..43de3577d76 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -54,6 +54,7 @@ mod pre_finalization_cache; pub mod proposer_prep_service; pub mod schema_change; pub mod shuffling_cache; +pub mod single_attestation_verification; pub mod state_advance_timer; pub mod sync_committee_rewards; pub mod sync_committee_verification; diff --git a/beacon_node/beacon_chain/src/single_attestation_verification.rs b/beacon_node/beacon_chain/src/single_attestation_verification.rs new file mode 100644 index 00000000000..78f87bf7e50 --- /dev/null +++ b/beacon_node/beacon_chain/src/single_attestation_verification.rs @@ -0,0 +1,345 @@ +use std::borrow::Cow; + +use crate::attestation_verification::verify_attestation_target_root; +use crate::attestation_verification::verify_head_block_is_known; +use crate::attestation_verification::Error; +use crate::metrics; +use crate::{ + attestation_verification::verify_attestation_propagation_slot_range, BeaconChain, + BeaconChainError, BeaconChainTypes, +}; +use eth2::types::attestation::SingleAttestation; +use state_processing::signature_sets::single_attestation_signature_set_from_pubkeys; +use types::{EthSpec, SubnetId}; + +pub struct SingleAttestationVerification { + pub single_attestation: SingleAttestation, + pub subnet_id: SubnetId, +} + +impl SingleAttestationVerification { + pub fn verify_early_checks( + single_attestation: &SingleAttestation, + chain: &BeaconChain, + ) -> Result<(), Error> { + let attestation_epoch = single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()); + + // Check the attestation's epoch matches its target. + if attestation_epoch != single_attestation.data.target.epoch { + return Err(Error::InvalidTargetEpoch { + slot: single_attestation.data.slot, + epoch: single_attestation.data.target.epoch, + }); + } + + // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). + // + // We do not queue future attestations for later processing. + verify_attestation_propagation_slot_range::( + &chain.slot_clock, + single_attestation.data.slot, + &chain.spec, + )?; + + // Sanity check to ensure the attestation index is set to zero post Electra. + if single_attestation.data.index != 0 { + return Err(Error::CommitteeIndexNonZero( + single_attestation.data.index as usize, + )); + } + + // Attestations must be for a known block. If the block is unknown, we simply drop the + // attestation and do not delay consideration for later. + // + // Enforce a maximum skip distance for unaggregated attestations. + let head_block = verify_head_block_is_known( + chain, + single_attestation.data.beacon_block_root, + single_attestation.data.slot, + chain.config.import_max_skip_slots, + )?; + + // Check the attestation target root is consistent with the head root. + verify_attestation_target_root::( + &head_block, + single_attestation.data.target.root, + single_attestation.data.slot, + )?; + + Ok(()) + } + + /// Run the checks that apply to the indexed attestation before the signature is checked. + pub fn verify_middle_checks( + single_attestation: &SingleAttestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result<(u64, SubnetId), Error> { + let attestation_epoch = single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()); + + let committees_per_slot = chain.with_committee_cache( + single_attestation.data.target.root, + attestation_epoch, + |committee_cache, _| { + let beacon_committee = committee_cache + .get_beacon_committee( + single_attestation.data.slot, + single_attestation.attester_index as u64, + ) + .ok_or(Error::NoCommitteeForSlotAndIndex { + slot: single_attestation.data.slot, + index: single_attestation.committee_index, + }) + .map_err(|_| BeaconChainError::AttestationCommitteeIndexNotSet)?; + + if !beacon_committee + .committee + .contains(&(single_attestation.committee_index as usize)) + { + // TODO(single-attestation) return a error + todo!() + } + Ok(committee_cache.committees_per_slot()) + }, + )?; + + let expected_subnet_id = SubnetId::compute_subnet_for_single_attestation::( + single_attestation, + committees_per_slot, + &chain.spec, + ) + .map_err(BeaconChainError::from)?; + + // If a subnet was specified, ensure that subnet is correct. + if let Some(subnet_id) = subnet_id { + if subnet_id != expected_subnet_id { + return Err(Error::InvalidSubnetId { + received: subnet_id, + expected: expected_subnet_id, + }); + } + }; + + /* + * The attestation is the first valid attestation received for the participating validator + * for the slot, attestation.data.slot. + */ + if chain + .observed_gossip_attesters + .read() + .validator_has_been_observed( + single_attestation.data.target.epoch, + single_attestation.attester_index, + ) + .map_err(BeaconChainError::from)? + { + return Err(Error::PriorAttestationKnown { + validator_index: single_attestation.attester_index as u64, + epoch: single_attestation.data.target.epoch, + }); + } + + Ok((single_attestation.attester_index as u64, expected_subnet_id)) + } + + /// Verify the attestation, producing extra information about whether it might be slashable. + pub fn verify_slashable<'a, T: BeaconChainTypes>( + single_attestation: &'a SingleAttestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result> { + if let Err(e) = Self::verify_early_checks(single_attestation, chain) { + return Err(SingleAttestationSlashInfo::SignatureNotChecked( + single_attestation, + e, + )); + } + + if let Err(e) = Self::verify_signature(single_attestation, chain) { + return Err(SingleAttestationSlashInfo::SignatureInvalid(e)); + } + + // TODO(single-attestation) what to do with these? + let (_validator_index, expected_subnet_id) = + match Self::verify_middle_checks(single_attestation, subnet_id, chain) { + Ok(t) => t, + Err(e) => { + return Err(SingleAttestationSlashInfo::SignatureValid( + single_attestation, + e, + )) + } + }; + + if let Err(e) = Self::verify_late_checks(single_attestation, chain) { + return Err(SingleAttestationSlashInfo::SignatureValid( + single_attestation, + e, + )); + } + + Ok(SingleAttestationVerification { + single_attestation: single_attestation.clone(), + subnet_id: expected_subnet_id, + }) + } + + pub fn verify( + single_attestation: &SingleAttestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result { + Self::verify_slashable(single_attestation, subnet_id, chain) + .inspect(|_verified_unaggregated| { + if let Some(_slasher) = chain.slasher.as_ref() { + // TODO(single-attestation) add to the slasher queue + // slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); + } + }) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + pub fn verify_signature( + single_attestation: &SingleAttestation, + chain: &BeaconChain, + ) -> Result<(), Error> { + let signature_setup_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_SETUP_TIMES); + let pubkey_cache = chain.validator_pubkey_cache.read(); + let pubkey = pubkey_cache + .get(single_attestation.attester_index) + .map(Cow::Borrowed) + .ok_or(Error::InvalidSignature)?; + + let fork = chain + .spec + .fork_at_epoch(single_attestation.data.target.epoch); + + let signature_set = single_attestation_signature_set_from_pubkeys( + pubkey, + single_attestation, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?; + + metrics::stop_timer(signature_setup_timer); + let _signature_verification_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_TIMES); + + if signature_set.verify() { + Ok(()) + } else { + Err(Error::InvalidSignature) + } + } + + /// Run the checks that apply after the signature has been checked. + fn verify_late_checks( + single_attestation: &SingleAttestation, + chain: &BeaconChain, + ) -> Result<(), Error> { + // Now that the attestation has been fully verified, store that we have received a valid + // attestation from this validator. + // + // It's important to double check that the attestation still hasn't been observed, since + // there can be a race-condition if we receive two attestations at the same time and + // process them in different threads. + if chain + .observed_gossip_attesters + .write() + .observe_validator( + single_attestation.data.target.epoch, + single_attestation.attester_index, + ) + .map_err(BeaconChainError::from)? + { + return Err(Error::PriorAttestationKnown { + validator_index: single_attestation.attester_index as u64, + epoch: single_attestation.data.target.epoch, + }); + } + Ok(()) + } +} + +/// Information about invalid attestations which might still be slashable despite being invalid. +pub enum SingleAttestationSlashInfo<'a, TErr> { + /// The attestation is invalid, but its signature wasn't checked. + SignatureNotChecked(&'a SingleAttestation, TErr), + /// The attestation's signature is invalid, so it will never be slashable. + SignatureInvalid(TErr), + /// The signature is valid but the attestation is invalid in some other way. + SignatureValid(&'a SingleAttestation, TErr), +} + +/// After processing an attestation normally, optionally process it further for the slasher. +/// +/// This maps an `AttestationSlashInfo` error back into a regular `Error`, performing signature +/// checks on attestations that failed verification for other reasons. +/// +/// No substantial extra work will be done if there is no slasher configured. +fn process_slash_info( + _slash_info: SingleAttestationSlashInfo, + _chain: &BeaconChain, +) -> Error { + todo!() + // use AttestationSlashInfo::*; + + // if let Some(slasher) = chain.slasher.as_ref() { + // let (indexed_attestation, check_signature, err) = match slash_info { + // SignatureNotChecked(attestation, err) => { + // if let Error::UnknownHeadBlock { .. } = err { + // if attestation.data().beacon_block_root == attestation.data().target.root { + // return err; + // } + // } + // match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) { + // Ok((indexed, _)) => (indexed, true, err), + // Err(e) => { + // debug!( + // chain.log, + // "Unable to obtain indexed form of attestation for slasher"; + // "attestation_root" => format!("{:?}", attestation.tree_hash_root()), + // "error" => format!("{:?}", e) + // ); + // return err; + // } + // } + // } + // SignatureNotCheckedIndexed(indexed, err) => (indexed, true, err), + // SignatureInvalid(e) => return e, + // SignatureValid(indexed, err) => (indexed, false, err), + // }; + + // if check_signature { + // if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { + // debug!( + // chain.log, + // "Signature verification for slasher failed"; + // "error" => format!("{:?}", e), + // ); + // return err; + // } + // } + + // // Supply to slasher. + // slasher.accept_attestation(indexed_attestation); + + // err + // } else { + // match slash_info { + // SignatureNotChecked(_, e) + // | SignatureNotCheckedIndexed(_, e) + // | SignatureInvalid(e) + // | SignatureValid(_, e) => e, + // } + // } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 093ee0c44b4..9f7f115ce06 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,3 +1,4 @@ +use crate::attestation_verification::VerifiedAttestation; use crate::block_verification_types::{AsBlock, RpcBlock}; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_operations::ObservationOutcome; @@ -2088,7 +2089,9 @@ where .unwrap() { let verified = result.unwrap(); - self.chain.add_to_naive_aggregation_pool(&verified).unwrap(); + self.chain + .add_to_naive_aggregation_pool(verified.attestation()) + .unwrap(); } for result in self @@ -2098,7 +2101,10 @@ where { let verified = result.unwrap(); self.chain - .apply_attestation_to_fork_choice(&verified) + .apply_attestation_to_fork_choice( + verified.attestation().data().clone(), + verified.indexed_attestation().attesting_indices_to_vec(), + ) .unwrap(); self.chain.add_to_block_inclusion_pool(verified).unwrap(); } diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index f8a483c6214..95ace7c501f 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -25,10 +25,10 @@ use types::consts::altair::{ TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX, }; use types::{ - Attestation, AttestationData, AttesterSlashingRef, BeaconBlockRef, BeaconState, - BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, - IndexedAttestationRef, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, - SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit, + attestation::SingleAttestation, Attestation, AttestationData, AttesterSlashingRef, + BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, + IndexedAttestation, IndexedAttestationRef, ProposerSlashing, PublicKeyBytes, + SignedAggregateAndProof, SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit, }; /// Used for Prometheus labels. @@ -1201,13 +1201,15 @@ impl ValidatorMonitor { pub fn register_gossip_unaggregated_attestation( &self, seen_timestamp: Duration, - indexed_attestation: &IndexedAttestation, + attestation_data: &AttestationData, + attesting_indices: Vec, slot_clock: &S, ) { self.register_unaggregated_attestation( "gossip", seen_timestamp, - indexed_attestation, + attestation_data, + attesting_indices, slot_clock, ) } @@ -1222,7 +1224,24 @@ impl ValidatorMonitor { self.register_unaggregated_attestation( "api", seen_timestamp, - indexed_attestation, + indexed_attestation.data(), + indexed_attestation.attesting_indices(), + slot_clock, + ) + } + + /// Register an attestation seen on the HTTP API. + pub fn register_api_single_attestation( + &self, + seen_timestamp: Duration, + single_attestation: &SingleAttestation, + slot_clock: &S, + ) { + self.register_unaggregated_attestation( + "api", + seen_timestamp, + &single_attestation.data, + vec![single_attestation.attester_index as u64], slot_clock, ) } @@ -1231,10 +1250,10 @@ impl ValidatorMonitor { &self, src: &str, seen_timestamp: Duration, - indexed_attestation: &IndexedAttestation, + data: &AttestationData, + attesting_indices: Vec, slot_clock: &S, ) { - let data = indexed_attestation.data(); let epoch = data.slot.epoch(E::slots_per_epoch()); let delay = get_message_delay_ms( seen_timestamp, @@ -1243,7 +1262,7 @@ impl ValidatorMonitor { slot_clock, ); - indexed_attestation.attesting_indices_iter().for_each(|i| { + attesting_indices.iter().for_each(|i| { if let Some(validator) = self.get_validator(*i) { let id = &validator.id; diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index a43310ac834..2bb218a6889 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -739,8 +739,8 @@ impl ReprocessQueue { "hint" => "system may be overloaded", "parent_root" => ?parent_root, "block_root" => ?block_root, - "failed_count" => failed_to_send_count, - "sent_count" => sent_count, + "failed_count" => ?failed_to_send_count, + "sent_count" => ?sent_count, ); } } @@ -782,8 +782,8 @@ impl ReprocessQueue { "Ignored scheduled sampling requests for block"; "hint" => "system may be overloaded", "block_root" => ?block_root, - "failed_count" => failed_to_send_count, - "sent_count" => sent_count, + "failed_count" => ?failed_to_send_count, + "sent_count" => ?sent_count, ); } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fe05f55a01a..dd75a102599 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -45,9 +45,9 @@ use builder_states::get_next_withdrawals; use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; use eth2::types::{ - self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode, - LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId, - ValidatorStatus, ValidatorsRequestBody, + self as api_types, attestation::SingleAttestation, BroadcastValidation, EndpointVersion, + ForkChoice, ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, + ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus, ValidatorsRequestBody, }; use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; @@ -1825,6 +1825,12 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()); + let beacon_pool_path_v2 = eth_v2 + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()); + let beacon_pool_path_any = any_version .and(warp::path("beacon")) .and(warp::path("pool")) @@ -1832,20 +1838,19 @@ pub fn serve( .and(chain_filter.clone()); // POST beacon/pool/attestations - let post_beacon_pool_attestations = beacon_pool_path_any + let post_beacon_pool_attestations_v1 = beacon_pool_path .clone() .and(warp::path("attestations")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) - .and(reprocess_send_filter) + .and(reprocess_send_filter.clone()) .and(log_filter.clone()) .then( // V1 and V2 are identical except V2 has a consensus version header in the request. // We only require this header for SSZ deserialization, which isn't supported for // this endpoint presently. - |_endpoint_version: EndpointVersion, - task_spawner: TaskSpawner, + |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, network_tx: UnboundedSender>, @@ -1865,6 +1870,39 @@ pub fn serve( }, ); + // POST beacon/pool/attestations + let post_beacon_pool_attestations_v2 = beacon_pool_path_v2 + .clone() + .and(warp::path("attestations")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(network_tx_filter.clone()) + .and(reprocess_send_filter) + .and(log_filter.clone()) + .then( + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |task_spawner: TaskSpawner, + chain: Arc>, + attestations: Vec, + network_tx: UnboundedSender>, + reprocess_tx: Option>, + log: Logger| async move { + let result = crate::publish_attestations::publish_single_attestations( + task_spawner, + chain, + attestations, + network_tx, + reprocess_tx, + log, + ) + .await + .map(|()| warp::reply::json(&())); + convert_rejection(result).await + }, + ); + // GET beacon/pool/attestations?committee_index,slot let get_beacon_pool_attestations = beacon_pool_path_any .clone() @@ -3553,7 +3591,7 @@ pub fn serve( // Import aggregate attestations for (index, verified_aggregate) in verified_aggregates { - if let Err(e) = chain.apply_attestation_to_fork_choice(&verified_aggregate) { + if let Err(e) = chain.apply_attestation_to_fork_choice(verified_aggregate.attestation().data().clone(), verified_aggregate.indexed_attestation().attesting_indices_to_vec()) { error!(log, "Failure applying verified aggregate attestation to fork choice"; "error" => format!("{:?}", e), @@ -4505,6 +4543,9 @@ pub fn serve( api_types::EventTopic::Attestation => { event_handler.subscribe_attestation() } + api_types::EventTopic::SingleAttestation => { + event_handler.subscribe_single_attestation() + } api_types::EventTopic::VoluntaryExit => { event_handler.subscribe_exit() } @@ -4731,7 +4772,8 @@ pub fn serve( .uor(post_beacon_blinded_blocks) .uor(post_beacon_blocks_v2) .uor(post_beacon_blinded_blocks_v2) - .uor(post_beacon_pool_attestations) + .uor(post_beacon_pool_attestations_v1) + .uor(post_beacon_pool_attestations_v2) .uor(post_beacon_pool_attester_slashings) .uor(post_beacon_pool_proposer_slashings) .uor(post_beacon_pool_voluntary_exits) diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 00654765325..1645f5c3fbd 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -50,7 +50,7 @@ use tokio::sync::{ mpsc::{Sender, UnboundedSender}, oneshot, }; -use types::Attestation; +use types::{attestation::SingleAttestation, Attestation, EthSpec}; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] @@ -71,27 +71,131 @@ enum PublishAttestationResult { Failure(Error), } -fn verify_and_publish_attestation( +fn verify_and_publish_single_attestation( chain: &Arc>, - attestation: &Attestation, + attestation: &SingleAttestation, seen_timestamp: Duration, network_tx: &UnboundedSender>, log: &Logger, ) -> Result<(), Error> { let attestation = chain - .verify_unaggregated_attestation_for_gossip(attestation, None) + .verify_single_attestation_for_gossip(attestation, None) .map_err(Error::Validation)?; - // Publish. network_tx .send(NetworkMessage::Publish { - messages: vec![PubsubMessage::Attestation(Box::new(( - attestation.subnet_id(), - attestation.attestation().clone_as_attestation(), + messages: vec![PubsubMessage::SingleAttestation(Box::new(( + attestation.subnet_id, + attestation.single_attestation.clone(), )))], }) .map_err(|_| Error::Publication)?; + // Notify the validator monitor. + chain + .validator_monitor + .read() + .register_api_single_attestation( + seen_timestamp, + &attestation.single_attestation, + &chain.slot_clock, + ); + + let attestation_epoch = attestation + .single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()); + + // TODO(single-attestation) UNWRAP + chain + .with_committee_cache( + attestation.single_attestation.data.target.root, + attestation_epoch, + |committee_cache, _| { + let fc_result = + chain.apply_single_attestation_to_fork_choice(&attestation.single_attestation); + + let beacon_committees = committee_cache + .get_beacon_committees_at_slot(attestation.single_attestation.data.slot) + .unwrap_or_else(|_| vec![]); + + let Ok(attestation) = attestation + .single_attestation + .to_attestation(&beacon_committees) + else { + warn!( + log, + "Attestation invalid for fork choice"; + ); + // TODO(single-attestation) better log message, better error type + todo!() + //return Err(Error::Publication) + }; + + let naive_aggregation_result = + chain.add_to_naive_aggregation_pool(attestation.to_ref()); + + if let Err(e) = &fc_result { + warn!( + log, + "Attestation invalid for fork choice"; + "err" => ?e, + ); + } + if let Err(e) = &naive_aggregation_result { + warn!( + log, + "Attestation invalid for aggregation"; + "err" => ?e + ); + } + + if let Err(_e) = &fc_result { + todo!() + //Err(Error::ForkChoice(e)) + } else if let Err(_e) = naive_aggregation_result { + todo!() + //Err(Error::AggregationPool(e)) + } else { + Ok(()) + } + }, + ) + .unwrap(); + + Ok(()) +} + +fn verify_and_publish_attestation( + chain: &Arc>, + attestation: &Attestation, + seen_timestamp: Duration, + network_tx: &UnboundedSender>, + log: &Logger, +) -> Result<(), Error> { + let attestation = chain + .verify_unaggregated_attestation_for_gossip(attestation, None) + .map_err(Error::Validation)?; + + match attestation.attestation() { + types::AttestationRef::Base(_) => { + // Publish. + network_tx + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::Attestation(Box::new(( + attestation.subnet_id(), + attestation.attestation().clone_as_attestation(), + )))], + }) + .map_err(|_| Error::Publication)?; + } + types::AttestationRef::Electra(_) => { + // TODO(single-attestation) this should be a no-op or error + todo!() + } + }; + // Notify the validator monitor. chain .validator_monitor @@ -102,8 +206,11 @@ fn verify_and_publish_attestation( &chain.slot_clock, ); - let fc_result = chain.apply_attestation_to_fork_choice(&attestation); - let naive_aggregation_result = chain.add_to_naive_aggregation_pool(&attestation); + let fc_result = chain.apply_attestation_to_fork_choice( + attestation.attestation().data().clone(), + attestation.indexed_attestation().attesting_indices_to_vec(), + ); + let naive_aggregation_result = chain.add_to_naive_aggregation_pool(attestation.attestation()); if let Err(e) = &fc_result { warn!( @@ -129,6 +236,195 @@ fn verify_and_publish_attestation( } } +pub async fn publish_single_attestations( + task_spawner: TaskSpawner, + chain: Arc>, + attestations: Vec, + network_tx: UnboundedSender>, + reprocess_send: Option>, + log: Logger, +) -> Result<(), warp::Rejection> { + // Collect metadata about attestations which we'll use to report failures. We need to + // move the `attestations` vec into the blocking task, so this small overhead is unavoidable. + let attestation_metadata = attestations + .iter() + .map(|att| (att.data.slot, att.committee_index)) + .collect::>(); + + // Gossip validate and publish attestations that can be immediately processed. + let seen_timestamp = timestamp_now(); + let inner_log = log.clone(); + let mut prelim_results = task_spawner + .blocking_task(Priority::P0, move || { + Ok(attestations + .into_iter() + .map(|attestation| { + match verify_and_publish_single_attestation( + &chain, + &attestation, + seen_timestamp, + &network_tx, + &inner_log, + ) { + Ok(()) => PublishAttestationResult::Success, + Err(Error::Validation(AttestationError::UnknownHeadBlock { + beacon_block_root, + })) => { + let Some(reprocess_tx) = &reprocess_send else { + return PublishAttestationResult::Failure(Error::ReprocessDisabled); + }; + // Re-process. + let (tx, rx) = oneshot::channel(); + let reprocess_chain = chain.clone(); + let reprocess_network_tx = network_tx.clone(); + let reprocess_log = inner_log.clone(); + let reprocess_fn = move || { + let result = verify_and_publish_single_attestation( + &reprocess_chain, + &attestation, + seen_timestamp, + &reprocess_network_tx, + &reprocess_log, + ); + // Ignore failure on the oneshot that reports the result. This + // shouldn't happen unless some catastrophe befalls the waiting + // thread which causes it to drop. + let _ = tx.send(result); + }; + let reprocess_msg = + ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(reprocess_fn), + }); + if reprocess_tx.try_send(reprocess_msg).is_err() { + PublishAttestationResult::Failure(Error::ReprocessFull) + } else { + PublishAttestationResult::Reprocessing(rx) + } + } + Err(Error::Validation(AttestationError::PriorAttestationKnown { + .. + })) => PublishAttestationResult::AlreadyKnown, + Err(e) => PublishAttestationResult::Failure(e), + } + }) + .map(Some) + .collect::>()) + }) + .await?; + + // Asynchronously wait for re-processing of attestations to unknown blocks. This avoids blocking + // any of the beacon processor workers while we wait for reprocessing. + let (reprocess_indices, reprocess_futures): (Vec<_>, Vec<_>) = prelim_results + .iter_mut() + .enumerate() + .filter_map(|(i, opt_result)| { + if let Some(PublishAttestationResult::Reprocessing(..)) = &opt_result { + let PublishAttestationResult::Reprocessing(rx) = opt_result.take()? else { + // Unreachable. + return None; + }; + Some((i, rx)) + } else { + None + } + }) + .unzip(); + let reprocess_results = futures::future::join_all(reprocess_futures).await; + + // Join everything back together and construct a response. + // This part should be quick so we just stay in the Tokio executor's async task. + for (i, reprocess_result) in reprocess_indices.into_iter().zip(reprocess_results) { + let Some(result_entry) = prelim_results.get_mut(i) else { + error!( + log, + "Unreachable case in attestation publishing"; + "case" => "prelim out of bounds", + "request_index" => i, + ); + continue; + }; + *result_entry = Some(match reprocess_result { + Ok(Ok(())) => PublishAttestationResult::Success, + // Attestation failed processing on re-process. + Ok(Err(Error::Validation(AttestationError::PriorAttestationKnown { .. }))) => { + PublishAttestationResult::AlreadyKnown + } + Ok(Err(e)) => PublishAttestationResult::Failure(e), + // Oneshot was dropped, indicating that the attestation either timed out in the + // reprocess queue or was dropped due to some error. + Err(_) => PublishAttestationResult::Failure(Error::ReprocessTimeout), + }); + } + + // Construct the response. + let mut failures = vec![]; + let mut num_already_known = 0; + + for (index, result) in prelim_results.iter().enumerate() { + match result { + Some(PublishAttestationResult::Success) => {} + Some(PublishAttestationResult::AlreadyKnown) => num_already_known += 1, + Some(PublishAttestationResult::Failure(e)) => { + if let Some((slot, committee_index)) = attestation_metadata.get(index) { + error!( + log, + "Failure verifying attestation for gossip"; + "error" => ?e, + "request_index" => index, + "committee_index" => committee_index, + "attestation_slot" => slot, + ); + failures.push(Failure::new(index, format!("{e:?}"))); + } else { + error!( + log, + "Unreachable case in attestation publishing"; + "case" => "out of bounds", + "request_index" => index + ); + failures.push(Failure::new(index, "metadata logic error".into())); + } + } + Some(PublishAttestationResult::Reprocessing(_)) => { + error!( + log, + "Unreachable case in attestation publishing"; + "case" => "reprocessing", + "request_index" => index + ); + failures.push(Failure::new(index, "reprocess logic error".into())); + } + None => { + error!( + log, + "Unreachable case in attestation publishing"; + "case" => "result is None", + "request_index" => index + ); + failures.push(Failure::new(index, "result logic error".into())); + } + } + } + + if num_already_known > 0 { + debug!( + log, + "Some unagg attestations already known"; + "count" => num_already_known + ); + } + + if failures.is_empty() { + Ok(()) + } else { + Err(warp_utils::reject::indexed_bad_request( + "error processing attestations".to_string(), + failures, + )) + } +} + pub async fn publish_attestations( task_spawner: TaskSpawner, chain: Arc>, diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 9f68278e284..38011626c72 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -6,15 +6,16 @@ use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; +use types::attestation::SingleAttestation; use types::{ - Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase, - AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, - ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, - SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, - SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, + Attestation, AttestationBase, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, + BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, + LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, + SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, + SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, + SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, + SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, + SyncCommitteeMessage, SyncSubnetId, }; #[derive(Debug, Clone, PartialEq)] @@ -29,6 +30,8 @@ pub enum PubsubMessage { AggregateAndProofAttestation(Box>), /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. Attestation(Box<(SubnetId, Attestation)>), + /// Gossipsub message providining notification of a `SingleAttestation` with its shard id. + SingleAttestation(Box<(SubnetId, SingleAttestation)>), /// Gossipsub message providing notification of a voluntary exit. VoluntaryExit(Box), /// Gossipsub message providing notification of a new proposer slashing. @@ -128,6 +131,9 @@ impl PubsubMessage { PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) } + PubsubMessage::SingleAttestation(attestation_data) => { + GossipKind::Attestation(attestation_data.0) + } PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit, PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing, PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing, @@ -192,10 +198,10 @@ impl PubsubMessage { match fork_context.from_context_bytes(gossip_topic.fork_digest) { Some(&fork_name) => { if fork_name.electra_enabled() { - Attestation::Electra( - AttestationElectra::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ) + return Err(format!( + "Unknown gossipsub fork digest: {:?}", + gossip_topic.fork_digest + )); } else { Attestation::Base( AttestationBase::from_ssz_bytes(data) @@ -411,6 +417,7 @@ impl PubsubMessage { PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), + PubsubMessage::SingleAttestation(data) => data.1.as_ssz_bytes(), PubsubMessage::SignedContributionAndProof(data) => data.as_ssz_bytes(), PubsubMessage::SyncCommitteeMessage(data) => data.1.as_ssz_bytes(), PubsubMessage::BlsToExecutionChange(data) => data.as_ssz_bytes(), @@ -455,6 +462,11 @@ impl std::fmt::Display for PubsubMessage { data.1.data().slot, data.1.committee_index(), ), + PubsubMessage::SingleAttestation(data) => write!( + f, + "SingleAttestation: subnet_id: {}, attestation_slot: {}, attestation_index: {:?}", + *data.0, data.1.data.slot, data.1.committee_index + ), PubsubMessage::VoluntaryExit(_data) => write!(f, "Voluntary Exit"), PubsubMessage::ProposerSlashing(_data) => write!(f, "Proposer Slashing"), PubsubMessage::AttesterSlashing(_data) => write!(f, "Attester Slashing"), diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index e92f4504762..535f9873655 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -22,7 +22,7 @@ use beacon_chain::{ use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn, Logger}; -use slot_clock::SlotClock; +use slot_clock::{ManualSlotClock, SlotClock}; use ssz::Encode; use std::fs; use std::io::Write; @@ -32,10 +32,10 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; use types::{ - beacon_block::BlockImportSource, Attestation, AttestationRef, AttesterSlashing, BlobSidecar, - DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation, - LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, + attestation::SingleAttestation, beacon_block::BlockImportSource, Attestation, AttestationData, + AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, + Hash256, IndexedAttestation, LightClientFinalityUpdate, LightClientOptimisticUpdate, + ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -52,32 +52,32 @@ use beacon_processor::{ /// messages. const STRICT_LATE_MESSAGE_PENALTIES: bool = false; -/// An attestation that has been validated by the `BeaconChain`. -/// -/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to -/// construct this from components which have not passed `BeaconChain` validation. -struct VerifiedUnaggregate { - attestation: Box>, - indexed_attestation: IndexedAttestation, -} +// /// An attestation that has been validated by the `BeaconChain`. +// /// +// /// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to +// /// construct this from components which have not passed `BeaconChain` validation. +// struct VerifiedUnaggregate { +// attestation: Box>, +// indexed_attestation: IndexedAttestation, +// } /// This implementation allows `Self` to be imported to fork choice and other functions on the /// `BeaconChain`. -impl VerifiedAttestation for VerifiedUnaggregate { - fn attestation(&self) -> AttestationRef { - self.attestation.to_ref() - } - - fn indexed_attestation(&self) -> &IndexedAttestation { - &self.indexed_attestation - } - - fn into_attestation_and_indices(self) -> (Attestation, Vec) { - let attestation = *self.attestation; - let attesting_indices = self.indexed_attestation.attesting_indices_to_vec(); - (attestation, attesting_indices) - } -} +// impl VerifiedAttestation for VerifiedUnaggregate { +// fn attestation(&self) -> AttestationRef { +// self.attestation.to_ref() +// } + +// fn indexed_attestation(&self) -> &IndexedAttestation { +// &self.indexed_attestation +// } + +// fn into_attestation_and_indices(self) -> (Attestation, Vec) { +// let attestation = *self.attestation; +// let attesting_indices = self.indexed_attestation.attesting_indices_to_vec(); +// (attestation, attesting_indices) +// } +// } /// An attestation that failed validation by the `BeaconChain`. struct RejectedUnaggregate { @@ -85,6 +85,12 @@ struct RejectedUnaggregate { error: AttnError, } +/// A `SingleAttestation` that failed validation by the `BeaconChain`. +struct RejectedSingleAttestation { + attestation: Box, + error: AttnError, +} + /// An aggregate that has been validated by the `BeaconChain`. /// /// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to @@ -127,6 +133,12 @@ enum FailedAtt { should_import: bool, seen_timestamp: Duration, }, + Single { + attestation: Box, + subnet_id: SubnetId, + should_import: bool, + seen_timestamp: Duration, + }, Aggregate { attestation: Box>, seen_timestamp: Duration, @@ -134,21 +146,23 @@ enum FailedAtt { } impl FailedAtt { - pub fn beacon_block_root(&self) -> &Hash256 { - &self.attestation().data().beacon_block_root + pub fn beacon_block_root(&self) -> Hash256 { + self.attestation_data().beacon_block_root } pub fn kind(&self) -> &'static str { match self { FailedAtt::Unaggregate { .. } => "unaggregated", + FailedAtt::Single { .. } => "single", FailedAtt::Aggregate { .. } => "aggregated", } } - pub fn attestation(&self) -> AttestationRef { + pub fn attestation_data(&self) -> &AttestationData { match self { - FailedAtt::Unaggregate { attestation, .. } => attestation.to_ref(), - FailedAtt::Aggregate { attestation, .. } => attestation.message().aggregate(), + FailedAtt::Unaggregate { attestation, .. } => attestation.data(), + FailedAtt::Single { attestation, .. } => &attestation.data, + FailedAtt::Aggregate { attestation, .. } => attestation.message().aggregate().data(), } } } @@ -187,6 +201,36 @@ impl NetworkBeaconProcessor { /* Processing functions */ + #[allow(clippy::too_many_arguments)] + pub fn process_gossip_single_attestation( + self: Arc, + message_id: MessageId, + peer_id: PeerId, + attestation: Box, + subnet_id: SubnetId, + should_import: bool, + reprocess_tx: Option>, + seen_timestamp: Duration, + ) { + let result = match self + .chain + .verify_single_attestation_for_gossip(&attestation, Some(subnet_id)) + { + Ok(_) => Ok(attestation), + Err(error) => Err(RejectedSingleAttestation { attestation, error }), + }; + + self.process_single_attestation_result( + result, + message_id, + peer_id, + subnet_id, + reprocess_tx, + should_import, + seen_timestamp, + ); + } + /// Process the unaggregated attestation received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -209,10 +253,7 @@ impl NetworkBeaconProcessor { .chain .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) { - Ok(verified_attestation) => Ok(VerifiedUnaggregate { - indexed_attestation: verified_attestation.into_indexed_attestation(), - attestation, - }), + Ok(_) => Ok(attestation.to_ref()), Err(error) => Err(RejectedUnaggregate { attestation, error }), }; @@ -227,6 +268,46 @@ impl NetworkBeaconProcessor { ); } + /// Process the unaggregated attestation received from the gossip network and: + /// + /// - If it passes gossip propagation criteria, tell the network thread to forward it. + /// - Attempt to apply it to fork choice. + /// - Attempt to add it to the naive aggregation pool. + /// + /// Raises a log if there are errors. + // #[allow(clippy::too_many_arguments)] + // pub fn process_gossip_single_attestation( + // self: Arc, + // message_id: MessageId, + // peer_id: PeerId, + // attestation: Box, + // subnet_id: SubnetId, + // should_import: bool, + // reprocess_tx: Option>, + // seen_timestamp: Duration, + // ) { + // let result = match self + // .chain + // .verify_single_attestation_for_gossip(&attestation, Some(subnet_id)) + // { + // Ok(verified_attestation) => Ok(VerifiedUnaggregate { + // indexed_attestation: verified_attestation.into_indexed_attestation(), + // attestation, + // }), + // Err(error) => Err(RejectedUnaggregate { attestation, error }), + // }; + + // self.process_gossip_attestation_result( + // result, + // message_id, + // peer_id, + // subnet_id, + // reprocess_tx, + // should_import, + // seen_timestamp, + // ); + // } + pub fn process_gossip_attestation_batch( self: Arc, packages: Vec>, @@ -273,10 +354,7 @@ impl NetworkBeaconProcessor { for (result, package) in results.into_iter().zip(packages.into_iter()) { let result = match result { - Ok(indexed_attestation) => Ok(VerifiedUnaggregate { - indexed_attestation, - attestation: package.attestation, - }), + Ok(_) => Ok(package.attestation.to_ref()), Err(error) => Err(RejectedUnaggregate { attestation: package.attestation, error, @@ -295,12 +373,128 @@ impl NetworkBeaconProcessor { } } + // Clippy warning is is ignored since the arguments are all of a different type (i.e., they + // cant' be mixed-up) and creating a struct would result in more complexity. + #[allow(clippy::too_many_arguments)] + fn process_single_attestation_result( + self: &Arc, + result: Result, RejectedSingleAttestation>, + message_id: MessageId, + peer_id: PeerId, + subnet_id: SubnetId, + reprocess_tx: Option>, + should_import: bool, + seen_timestamp: Duration, + ) { + match result { + Ok(attestation) => { + let beacon_block_root = attestation.data.beacon_block_root; + + // Register the attestation with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_unaggregated_attestation( + seen_timestamp, + &attestation.data, + vec![attestation.attester_index as u64], + &self.chain.slot_clock, + ); + + // If the attestation is still timely, propagate it. + self.propagate_attestation_if_timely(attestation.data.clone(), message_id, peer_id); + + if !should_import { + return; + } + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, + ); + + if let Err(e) = self.chain.apply_attestation_to_fork_choice( + attestation.data.clone(), + vec![attestation.attester_index as u64], + ) { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation( + e, + )) => { + debug!( + self.log, + "Attestation invalid for fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ) + } + e => error!( + self.log, + "Error applying attestation to fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ), + } + } + + // with committee cache + + let _ = self.chain.with_committee_cache( + attestation.data.target.root, + attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let committees = committee_cache + .get_beacon_committees_at_slot(attestation.data.slot) + .unwrap_or_else(|_| vec![]); + + let attestation = attestation.to_attestation(&committees)?; + + if let Err(e) = self + .chain + .add_to_naive_aggregation_pool(attestation.to_ref()) + { + debug!( + self.log, + "Attestation invalid for agg pool"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ) + } + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); + + Ok(()) + }, + ); + } + Err(RejectedSingleAttestation { attestation, error }) => { + self.handle_attestation_verification_failure( + peer_id, + message_id, + FailedAtt::Single { + attestation, + subnet_id, + should_import, + seen_timestamp, + }, + reprocess_tx, + error, + seen_timestamp, + ); + } + } + } + // Clippy warning is is ignored since the arguments are all of a different type (i.e., they // cant' be mixed-up) and creating a struct would result in more complexity. #[allow(clippy::too_many_arguments)] fn process_gossip_attestation_result( self: &Arc, - result: Result, RejectedUnaggregate>, + result: Result, RejectedUnaggregate>, message_id: MessageId, peer_id: PeerId, subnet_id: SubnetId, @@ -309,9 +503,8 @@ impl NetworkBeaconProcessor { seen_timestamp: Duration, ) { match result { - Ok(verified_attestation) => { - let indexed_attestation = &verified_attestation.indexed_attestation; - let beacon_block_root = indexed_attestation.data().beacon_block_root; + Ok(attestation) => { + let beacon_block_root = attestation.data().beacon_block_root; // Register the attestation with any monitored validators. self.chain @@ -319,13 +512,14 @@ impl NetworkBeaconProcessor { .read() .register_gossip_unaggregated_attestation( seen_timestamp, - indexed_attestation, + attestation.data(), + attestation.attesting_indices_to_vec(), &self.chain.slot_clock, ); // If the attestation is still timely, propagate it. self.propagate_attestation_if_timely( - verified_attestation.attestation(), + attestation.data().clone(), message_id, peer_id, ); @@ -338,10 +532,10 @@ impl NetworkBeaconProcessor { &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, ); - if let Err(e) = self - .chain - .apply_attestation_to_fork_choice(&verified_attestation) - { + if let Err(e) = self.chain.apply_attestation_to_fork_choice( + attestation.data().clone(), + attestation.attesting_indices_to_vec(), + ) { match e { BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation( e, @@ -364,10 +558,7 @@ impl NetworkBeaconProcessor { } } - if let Err(e) = self - .chain - .add_to_naive_aggregation_pool(&verified_attestation) - { + if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) { debug!( self.log, "Attestation invalid for agg pool"; @@ -521,7 +712,7 @@ impl NetworkBeaconProcessor { // If the attestation is still timely, propagate it. self.propagate_attestation_if_timely( - verified_aggregate.attestation(), + verified_aggregate.attestation().data().clone(), message_id, peer_id, ); @@ -541,10 +732,12 @@ impl NetworkBeaconProcessor { &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL, ); - if let Err(e) = self - .chain - .apply_attestation_to_fork_choice(&verified_aggregate) - { + if let Err(e) = self.chain.apply_attestation_to_fork_choice( + verified_aggregate.attestation().data().clone(), + verified_aggregate + .indexed_attestation() + .attesting_indices_to_vec(), + ) { match e { BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation( e, @@ -2185,9 +2378,12 @@ impl NetworkBeaconProcessor { // network. let seen_clock = &self.chain.slot_clock.freeze_at(seen_timestamp); let hindsight_verification = - attestation_verification::verify_propagation_slot_range( + attestation_verification::verify_attestation_propagation_slot_range::< + ManualSlotClock, + T::EthSpec, + >( seen_clock, - failed_att.attestation(), + failed_att.attestation_data().slot, &self.chain.spec, ); @@ -2417,6 +2613,31 @@ impl NetworkBeaconProcessor { }), }) } + FailedAtt::Single { + attestation, + subnet_id, + should_import, + seen_timestamp, + } => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, + ); + let processor = self.clone(); + ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { + beacon_block_root: *beacon_block_root, + process_fn: Box::new(move || { + processor.process_gossip_single_attestation( + message_id, + peer_id, + attestation, + subnet_id, + should_import, + None, // Do not allow this attestation to be re-processed beyond this point. + seen_timestamp, + ) + }), + }) + } FailedAtt::Unaggregate { attestation, subnet_id, @@ -2639,7 +2860,7 @@ impl NetworkBeaconProcessor { self.log, "Ignored attestation to finalized block"; "block_root" => ?beacon_block_root, - "attestation_slot" => failed_att.attestation().data().slot, + "attestation_slot" => failed_att.attestation_data().slot, ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); @@ -2662,9 +2883,9 @@ impl NetworkBeaconProcessor { debug!( self.log, "Dropping attestation"; - "target_root" => ?failed_att.attestation().data().target.root, + "target_root" => ?failed_att.attestation_data().target.root, "beacon_block_root" => ?beacon_block_root, - "slot" => ?failed_att.attestation().data().slot, + "slot" => ?failed_att.attestation_data().slot, "type" => ?attestation_type, "error" => ?e, "peer_id" => % peer_id @@ -2683,7 +2904,7 @@ impl NetworkBeaconProcessor { self.log, "Unable to validate attestation"; "beacon_block_root" => ?beacon_block_root, - "slot" => ?failed_att.attestation().data().slot, + "slot" => ?failed_att.attestation_data().slot, "type" => ?attestation_type, "peer_id" => %peer_id, "error" => ?e, @@ -3080,13 +3301,16 @@ impl NetworkBeaconProcessor { /// timely), propagate it on gossip. Otherwise, ignore it. fn propagate_attestation_if_timely( &self, - attestation: AttestationRef, + attestation_data: AttestationData, message_id: MessageId, peer_id: PeerId, ) { - let is_timely = attestation_verification::verify_propagation_slot_range( + let is_timely = attestation_verification::verify_attestation_propagation_slot_range::< + T::SlotClock, + T::EthSpec, + >( &self.chain.slot_clock, - attestation, + attestation_data.slot, &self.chain.spec, ) .is_ok(); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index d81d964e7cf..6068b4e1a77 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,6 +1,7 @@ use crate::sync::manager::BlockProcessType; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; +use attestation::SingleAttestation; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; @@ -84,6 +85,49 @@ impl NetworkBeaconProcessor { .map_err(Into::into) } + /// Create a new `Work` event for some unaggregated attestation. + pub fn send_single_attestation( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + single_attestation: SingleAttestation, + subnet_id: SubnetId, + should_import: bool, + seen_timestamp: Duration, + ) -> Result<(), Error> { + // Define a closure for processing individual attestations. + let result = self.chain.with_committee_cache( + single_attestation.data.target.root, + single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let committees = + committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?; + + let attestation = single_attestation.to_attestation(&committees)?; + + Ok(self.send_unaggregated_attestation( + message_id.clone(), + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + )) + }, + ); + + match result { + Ok(result) => result, + Err(e) => { + warn!(self.log, "Failed to send SingleAttestation"; "error" => ?e); + Ok(()) + } + } + } + /// Create a new `Work` event for some unaggregated attestation. pub fn send_unaggregated_attestation( self: &Arc, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index e1badfda9d5..5f0336f51a7 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -399,6 +399,17 @@ impl Router { timestamp_now(), ), ), + PubsubMessage::SingleAttestation(subnet_single_attestation) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor.send_single_attestation( + message_id, + peer_id, + subnet_single_attestation.1, + subnet_single_attestation.0, + should_process, + timestamp_now(), + ), + ), PubsubMessage::BeaconBlock(block) => self.handle_beacon_processor_send_result( self.network_beacon_processor.send_gossip_beacon_block( message_id, diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 522c6414eae..53d298cafff 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -16,6 +16,7 @@ pub mod types; use self::mixin::{RequestAccept, ResponseOptional}; use self::types::{Error as ResponseError, *}; +use attestation::SingleAttestation; use derivative::Derivative; use futures::Stream; use futures_util::StreamExt; @@ -1316,7 +1317,7 @@ impl BeaconNodeHttpClient { /// `POST v2/beacon/pool/attestations` pub async fn post_beacon_pool_attestations_v2( &self, - attestations: &[Attestation], + attestations: &[SingleAttestation], fork_name: ForkName, ) -> Result<(), Error> { let mut path = self.eth_path(V2)?; diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index c187399ebd7..1483b933078 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -17,7 +17,7 @@ use std::str::{from_utf8, FromStr}; use std::sync::Arc; use std::time::Duration; use types::beacon_block_body::KzgCommitments; -pub use types::*; +pub use types::{attestation::SingleAttestation, *}; #[cfg(feature = "lighthouse")] use crate::lighthouse::BlockReward; @@ -1127,6 +1127,7 @@ pub enum EventKind { AttesterSlashing(Box>), BlsToExecutionChange(Box), BlockGossip(Box), + SingleAttestation(Box), } impl EventKind { @@ -1150,6 +1151,7 @@ impl EventKind { EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", EventKind::BlockGossip(_) => "block_gossip", + EventKind::SingleAttestation(_) => "single_attestation", } } @@ -1269,6 +1271,7 @@ pub enum EventTopic { Block, BlobSidecar, Attestation, + SingleAttestation, VoluntaryExit, FinalizedCheckpoint, ChainReorg, @@ -1320,6 +1323,7 @@ impl fmt::Display for EventTopic { EventTopic::Block => write!(f, "block"), EventTopic::BlobSidecar => write!(f, "blob_sidecar"), EventTopic::Attestation => write!(f, "attestation"), + EventTopic::SingleAttestation => write!(f, "single_attestation"), EventTopic::VoluntaryExit => write!(f, "voluntary_exit"), EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), EventTopic::ChainReorg => write!(f, "chain_reorg"), diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 85704042df4..f4928b14b60 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -14,10 +14,10 @@ use std::collections::BTreeSet; use std::marker::PhantomData; use std::time::Duration; use types::{ - consts::bellatrix::INTERVALS_PER_SLOT, AbstractExecPayload, AttestationShufflingId, - AttesterSlashingRef, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Checkpoint, - Epoch, EthSpec, ExecPayload, ExecutionBlockHash, FixedBytesExtended, Hash256, - IndexedAttestationRef, RelativeEpoch, SignedBeaconBlock, Slot, + consts::bellatrix::INTERVALS_PER_SLOT, AbstractExecPayload, AttestationData, + AttestationShufflingId, AttesterSlashingRef, BeaconBlockRef, BeaconState, BeaconStateError, + ChainSpec, Checkpoint, Epoch, EthSpec, ExecPayload, ExecutionBlockHash, FixedBytesExtended, + Hash256, IndexedAttestationRef, RelativeEpoch, SignedBeaconBlock, Slot, }; #[derive(Debug)] @@ -239,13 +239,13 @@ pub struct QueuedAttestation { target_epoch: Epoch, } -impl<'a, E: EthSpec> From> for QueuedAttestation { - fn from(a: IndexedAttestationRef<'a, E>) -> Self { +impl QueuedAttestation { + fn new(data: AttestationData, attesting_indices: Vec) -> Self { Self { - slot: a.data().slot, - attesting_indices: a.attesting_indices_to_vec(), - block_root: a.data().beacon_block_root, - target_epoch: a.data().target.epoch, + slot: data.slot, + attesting_indices, + block_root: data.beacon_block_root, + target_epoch: data.target.epoch, } } } @@ -948,7 +948,8 @@ where /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#validate_on_attestation fn validate_on_attestation( &self, - indexed_attestation: IndexedAttestationRef, + attestation_data: &AttestationData, + attesting_indices: &[u64], is_from_block: AttestationFromBlock, ) -> Result<(), InvalidAttestation> { // There is no point in processing an attestation with an empty bitfield. Reject @@ -956,20 +957,20 @@ where // // This is not in the specification, however it should be transparent to other nodes. We // return early here to avoid wasting precious resources verifying the rest of it. - if indexed_attestation.attesting_indices_is_empty() { + if attesting_indices.is_empty() { return Err(InvalidAttestation::EmptyAggregationBitfield); } - let target = indexed_attestation.data().target; + let target = attestation_data.target; if matches!(is_from_block, AttestationFromBlock::False) { self.validate_target_epoch_against_current_time(target.epoch)?; } - if target.epoch != indexed_attestation.data().slot.epoch(E::slots_per_epoch()) { + if target.epoch != attestation_data.slot.epoch(E::slots_per_epoch()) { return Err(InvalidAttestation::BadTargetEpoch { target: target.epoch, - slot: indexed_attestation.data().slot, + slot: attestation_data.slot, }); } @@ -991,9 +992,9 @@ where // attestation and do not delay consideration for later. let block = self .proto_array - .get_block(&indexed_attestation.data().beacon_block_root) + .get_block(&attestation_data.beacon_block_root) .ok_or(InvalidAttestation::UnknownHeadBlock { - beacon_block_root: indexed_attestation.data().beacon_block_root, + beacon_block_root: attestation_data.beacon_block_root, })?; // If an attestation points to a block that is from an earlier slot than the attestation, @@ -1001,7 +1002,7 @@ where // is from a prior epoch to the attestation, then the target root must be equal to the root // of the block that is being attested to. let expected_target = if target.epoch > block.slot.epoch(E::slots_per_epoch()) { - indexed_attestation.data().beacon_block_root + attestation_data.beacon_block_root } else { block.target_root }; @@ -1015,10 +1016,10 @@ where // Attestations must not be for blocks in the future. If this is the case, the attestation // should not be considered. - if block.slot > indexed_attestation.data().slot { + if block.slot > attestation_data.slot { return Err(InvalidAttestation::AttestsToFutureBlock { block: block.slot, - attestation: indexed_attestation.data().slot, + attestation: attestation_data.slot, }); } @@ -1045,7 +1046,8 @@ where pub fn on_attestation( &mut self, system_time_current_slot: Slot, - attestation: IndexedAttestationRef, + attestation_data: AttestationData, + attesting_indices: Vec, is_from_block: AttestationFromBlock, ) -> Result<(), Error> { let _timer = metrics::start_timer(&metrics::FORK_CHOICE_ON_ATTESTATION_TIMES); @@ -1065,18 +1067,18 @@ where // (1) becomes weird once we hit finality and fork choice drops the genesis block. (2) is // fine because votes to the genesis block are not useful; all validators implicitly attest // to genesis just by being present in the chain. - if attestation.data().beacon_block_root == Hash256::zero() { + if attestation_data.beacon_block_root == Hash256::zero() { return Ok(()); } - self.validate_on_attestation(attestation, is_from_block)?; + self.validate_on_attestation(&attestation_data, &attesting_indices, is_from_block)?; - if attestation.data().slot < self.fc_store.get_current_slot() { - for validator_index in attestation.attesting_indices_iter() { + if attestation_data.slot < self.fc_store.get_current_slot() { + for validator_index in attesting_indices.iter() { self.proto_array.process_attestation( *validator_index as usize, - attestation.data().beacon_block_root, - attestation.data().target.epoch, + attestation_data.beacon_block_root, + attestation_data.target.epoch, )?; } } else { @@ -1087,7 +1089,7 @@ where // Delay consideration in the fork choice until their slot is in the past. // ``` self.queued_attestations - .push(QueuedAttestation::from(attestation)); + .push(QueuedAttestation::new(attestation_data, attesting_indices)); } Ok(()) diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 29265e34e4d..676b561c863 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -463,10 +463,12 @@ impl ForkChoiceTest { &self.harness.chain, ); - let result = self - .harness - .chain - .apply_attestation_to_fork_choice(&verified_attestation); + let result = self.harness.chain.apply_attestation_to_fork_choice( + verified_attestation.attestation().data().clone(), + verified_attestation + .indexed_attestation() + .attesting_indices_to_vec(), + ); comparison_func(result); diff --git a/consensus/state_processing/src/per_block_processing/signature_sets.rs b/consensus/state_processing/src/per_block_processing/signature_sets.rs index 2e00ee03418..611939f6896 100644 --- a/consensus/state_processing/src/per_block_processing/signature_sets.rs +++ b/consensus/state_processing/src/per_block_processing/signature_sets.rs @@ -7,12 +7,12 @@ use ssz::DecodeError; use std::borrow::Cow; use tree_hash::TreeHash; use types::{ - AbstractExecPayload, AggregateSignature, AttesterSlashingRef, BeaconBlockRef, BeaconState, - BeaconStateError, ChainSpec, DepositData, Domain, Epoch, EthSpec, Fork, Hash256, - InconsistentFork, IndexedAttestation, IndexedAttestationRef, ProposerSlashing, PublicKey, - PublicKeyBytes, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockHeader, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedRoot, SignedVoluntaryExit, - SigningData, Slot, SyncAggregate, SyncAggregatorSelectionData, Unsigned, + attestation::SingleAttestation, AbstractExecPayload, AggregateSignature, AttesterSlashingRef, + BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, DepositData, Domain, Epoch, EthSpec, + Fork, Hash256, InconsistentFork, IndexedAttestation, IndexedAttestationRef, ProposerSlashing, + PublicKey, PublicKeyBytes, Signature, SignedAggregateAndProof, SignedBeaconBlock, + SignedBeaconBlockHeader, SignedBlsToExecutionChange, SignedContributionAndProof, SignedRoot, + SignedVoluntaryExit, SigningData, Slot, SyncAggregate, SyncAggregatorSelectionData, Unsigned, }; pub type Result = std::result::Result; @@ -298,6 +298,29 @@ where Ok(SignatureSet::multiple_pubkeys(signature, pubkeys, message)) } +pub fn single_attestation_signature_set_from_pubkeys<'a>( + pubkey: Cow<'a, PublicKey>, + single_attestation: &'a SingleAttestation, + fork: &Fork, + genesis_validators_root: Hash256, + spec: &'a ChainSpec, +) -> Result> { + let domain = spec.get_domain( + single_attestation.data.target.epoch, + Domain::BeaconAttester, + fork, + genesis_validators_root, + ); + + let message = single_attestation.data.signing_root(domain); + + Ok(SignatureSet::single_pubkey( + &single_attestation.signature, + pubkey, + message, + )) +} + /// Returns the signature set for the given `indexed_attestation` but pubkeys are supplied directly /// instead of from the state. pub fn indexed_attestation_signature_set_from_pubkeys<'a, 'b, E, F>( diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 3801a2b5d2b..f75ed6e0f8c 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -1,6 +1,6 @@ use crate::slot_data::SlotData; use crate::{test_utils::TestRandom, Hash256, Slot}; -use crate::{Checkpoint, ForkVersionDeserialize}; +use crate::{BeaconCommittee, Checkpoint, ForkVersionDeserialize}; use derivative::Derivative; use safe_arith::ArithError; use serde::{Deserialize, Serialize}; @@ -24,6 +24,7 @@ pub enum Error { IncorrectStateVariant, InvalidCommitteeLength, InvalidCommitteeIndex, + InvalidAggregationBit, } impl From for Error { @@ -257,8 +258,8 @@ impl<'a, E: EthSpec> AttestationRef<'a, E> { pub fn committee_index(&self) -> Option { match self { - AttestationRef::Base(att) => Some(att.data.index), - AttestationRef::Electra(att) => att.committee_index(), + Self::Base(att) => Some(att.data.index), + Self::Electra(att) => att.committee_index(), } } @@ -280,6 +281,13 @@ impl<'a, E: EthSpec> AttestationRef<'a, E> { .collect::>(), } } + + pub fn attesting_indices_to_vec(&self) -> Vec { + match self { + Self::Base(_att) => todo!(), + Self::Electra(_att) => todo!(), + } + } } impl AttestationElectra { @@ -287,6 +295,14 @@ impl AttestationElectra { self.get_committee_indices().first().cloned() } + pub fn get_aggregation_bits(&self) -> Vec { + self.aggregation_bits + .iter() + .enumerate() + .filter_map(|(index, bit)| if bit { Some(index as u64) } else { None }) + .collect() + } + pub fn get_committee_indices(&self) -> Vec { self.committee_bits .iter() @@ -527,6 +543,122 @@ impl ForkVersionDeserialize for Vec> { } } +#[derive( + Debug, + Clone, + Serialize, + Deserialize, + Decode, + Encode, + TestRandom, + Derivative, + arbitrary::Arbitrary, + TreeHash, + PartialEq, +)] +pub struct SingleAttestation { + pub committee_index: u64, + pub attester_index: usize, + pub data: AttestationData, + pub signature: AggregateSignature, +} + +impl SingleAttestation { + /// Produces a `SingleAttestation` with empty signature and empty attester index. + /// ONLY USE IN ELECTRA + pub fn empty_for_signing( + committee_index: u64, + slot: Slot, + beacon_block_root: Hash256, + source: Checkpoint, + target: Checkpoint, + ) -> Self { + Self { + committee_index, + attester_index: 0, + data: AttestationData { + slot, + index: 0, + beacon_block_root, + source, + target, + }, + signature: AggregateSignature::infinity(), + } + } + + pub fn add_signature(&mut self, signature: &Signature, committee_position: usize) { + self.attester_index = committee_position; + self.signature.add_assign(signature); + } + + pub fn to_attestation( + &self, + committees: &[BeaconCommittee], + ) -> Result, Error> { + let beacon_committee = committees + .get(self.committee_index as usize) + .ok_or(Error::InvalidAggregationBit)?; + let aggregation_bits = beacon_committee + .committee + .iter() + .enumerate() + .filter_map(|(i, &validator_index)| { + if self.attester_index == validator_index { + return Some(i); + } + None + }) + .collect::>(); + + if aggregation_bits.len() != 1 { + return Err(Error::InvalidAggregationBit); + } + + let aggregation_bit = aggregation_bits.first().unwrap(); + + let mut committee_bits: BitVector = BitVector::default(); + committee_bits + .set(self.committee_index as usize, true) + .map_err(|_| Error::InvalidCommitteeIndex)?; + + let mut aggregation_bits = BitList::with_capacity(beacon_committee.committee.len()) + .map_err(|_| Error::InvalidCommitteeLength)?; + + aggregation_bits.set(*aggregation_bit, true)?; + + Ok(Attestation::Electra(AttestationElectra { + aggregation_bits, + committee_bits, + data: self.data.clone(), + signature: self.signature.clone(), + })) + } + + /// Signs `self`, setting the `committee_position`'th bit of `aggregation_bits` to `true`. + /// + /// Returns an `AlreadySigned` error if the `committee_position`'th bit is already `true`. + pub fn sign( + &mut self, + secret_key: &SecretKey, + committee_position: usize, + fork: &Fork, + genesis_validators_root: Hash256, + spec: &ChainSpec, + ) { + let domain = spec.get_domain( + self.data.target.epoch, + Domain::BeaconAttester, + fork, + genesis_validators_root, + ); + + let message = self.data.signing_root(domain); + + self.add_signature(&secret_key.sign(message), committee_position); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/indexed_attestation.rs b/consensus/types/src/indexed_attestation.rs index 9274600ed2c..4cbd6650202 100644 --- a/consensus/types/src/indexed_attestation.rs +++ b/consensus/types/src/indexed_attestation.rs @@ -107,6 +107,13 @@ impl IndexedAttestation { } } + pub fn attesting_indices(&self) -> Vec { + match self { + IndexedAttestation::Base(att) => att.attesting_indices.to_vec(), + IndexedAttestation::Electra(att) => att.attesting_indices.to_vec(), + } + } + pub fn attesting_indices_first(&self) -> Option<&u64> { match self { IndexedAttestation::Base(att) => att.attesting_indices.first(), diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 187b070d29f..822248d98eb 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -1,4 +1,5 @@ //! Identifies each shard by an integer identifier. +use crate::attestation::SingleAttestation; use crate::{AttestationRef, ChainSpec, CommitteeIndex, EthSpec, Slot}; use alloy_primitives::{bytes::Buf, U256}; use safe_arith::{ArithError, SafeArith}; @@ -40,6 +41,21 @@ impl SubnetId { id.into() } + /// Compute the subnet for a `SingleAttestation` where each slot in the + /// attestation epoch contains `committee_count_per_slot` committees. + pub fn compute_subnet_for_single_attestation( + single_attestation: &SingleAttestation, + committee_count_per_slot: u64, + spec: &ChainSpec, + ) -> Result { + Self::compute_subnet::( + single_attestation.data.slot, + single_attestation.committee_index, + committee_count_per_slot, + spec, + ) + } + /// Compute the subnet for an attestation where each slot in the /// attestation epoch contains `committee_count_per_slot` committees. pub fn compute_subnet_for_attestation( diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 33ae132e8a2..3c053c32b67 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -609,7 +609,12 @@ impl Tester { self.harness .chain - .apply_attestation_to_fork_choice(&verified_attestation) + .apply_attestation_to_fork_choice( + verified_attestation.attestation().data().clone(), + verified_attestation + .indexed_attestation() + .attesting_indices_to_vec(), + ) .map_err(|e| Error::InternalError(format!("attestation import failed with {:?}", e))) } diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index e31ad4f661b..f56d1acb2ce 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -370,6 +370,7 @@ impl AttestationService { return None; } + // TODO(single-attestation) need a branch to SingleATtestation::EmptyForSigning let mut attestation = match Attestation::::empty_for_signing( duty.committee_index, duty.committee_length as usize, @@ -457,9 +458,12 @@ impl AttestationService { &[validator_metrics::ATTESTATIONS_HTTP_POST], ); if fork_name.electra_enabled() { - beacon_node - .post_beacon_pool_attestations_v2(attestations, fork_name) - .await + // TODO(single-attestation) post single attestation + + // beacon_node + // .post_beacon_pool_attestations_v2::(todo!(), fork_name) + // .await + todo!() } else { beacon_node .post_beacon_pool_attestations_v1(attestations)