Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanitise eth68 announcement #6222

Merged
merged 41 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4a21d91
Driveby: Make sure cache doesn't fill with same peer
emhane Jan 25, 2024
ca622e8
Validate eth68 announcements
emhane Jan 25, 2024
d1e0b76
Plug validation into tx manager
emhane Jan 25, 2024
54db58e
Mark LayerOne struct non-exhaustive
emhane Jan 25, 2024
a5d9195
Minimise list re-ordering
emhane Jan 25, 2024
06f359e
Merge branch 'emhane/sanitise-eth68-announcement' of github.com:parad…
emhane Jan 25, 2024
e9ec0bf
fixup! Minimise list re-ordering
emhane Jan 27, 2024
e2921a9
Remove whitespace
emhane Jan 27, 2024
b82c059
Remove whitespace
emhane Jan 27, 2024
58fb52a
Remove unecessary check
emhane Jan 28, 2024
6ef18db
Set placholder low value for reputation change on bad eth68 announcement
emhane Jan 28, 2024
e1734df
Drive-by: fix bug, bad transactions metric
emhane Jan 28, 2024
b78a977
Fix bug, duplicate hashes not caught by spam filter
emhane Jan 28, 2024
5882251
Update comments
emhane Jan 28, 2024
2cb3561
Fix lint
emhane Jan 28, 2024
4c154e7
Merge branch 'emhane/sanitise-eth68-announcement' of github.com:parad…
emhane Jan 28, 2024
02a0092
Remove unused code
emhane Jan 28, 2024
d039a51
Validate eth66 messages too
emhane Jan 28, 2024
652a553
Fix test
emhane Jan 28, 2024
5427002
Pass whole announcement to all filters
emhane Jan 29, 2024
3984320
Fix typo
emhane Jan 29, 2024
d486637
Fix typo
emhane Jan 29, 2024
add991b
Fix typo
emhane Jan 29, 2024
63b2649
Merge branch 'main' into emhane/sanitise-eth68-announcement
emhane Jan 29, 2024
1211546
Replace sort with hash map in duplicate detection
emhane Jan 29, 2024
3766e4d
Merge branch 'emhane/sanitise-eth68-announcement' of github.com:parad…
emhane Jan 29, 2024
ed06555
Fix test
emhane Jan 29, 2024
f10c314
Debug test tx manager
emhane Jan 29, 2024
0f1bdb6
Add tests for eth66 validation
emhane Jan 29, 2024
9d21851
Fix test, hash map for sorting duplicates changes order of how entrie…
emhane Jan 29, 2024
f3447be
Addres review and make tx type encoded lenght limits lax
emhane Jan 30, 2024
44ee97c
Set reasonable max encoded tx length limit
emhane Jan 30, 2024
3551671
Fix tests and docs
emhane Jan 30, 2024
d78ea3c
Remove unnecessary op
emhane Jan 30, 2024
20c8493
Fix docs
emhane Jan 30, 2024
85d212a
Add docs for eth filter
emhane Jan 30, 2024
203b4b1
Add test for derive more's display impl for zst
emhane Jan 30, 2024
59ac1c2
Merge branch 'emhane/sanitise-eth68-announcement' of github.com:parad…
emhane Jan 30, 2024
6dbe077
Fix lint
emhane Jan 30, 2024
2e5062d
Merge branch 'main' into emhane/sanitise-eth68-announcement
emhane Jan 31, 2024
fd2db04
Fix lint
emhane Jan 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ timeout = -4096
bad_protocol = -2147483648
failed_to_connect = -25600
dropped = -4096
bad_announcement = -1204

[peers.backoff_durations]
low = '30s'
Expand Down
142 changes: 138 additions & 4 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};
use reth_codecs::derive_arbitrary;
use reth_primitives::{Block, Bytes, TransactionSigned, B256, U128};
use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128};

use std::sync::Arc;
use std::{collections::HashMap, mem, sync::Arc};

#[cfg(feature = "arbitrary")]
use proptest::prelude::*;
Expand Down Expand Up @@ -159,6 +159,14 @@ impl NewPooledTransactionHashes {
}
}

/// Returns an immutable reference to transaction hashes.
pub fn hashes(&self) -> &Vec<B256> {
match self {
NewPooledTransactionHashes::Eth66(msg) => &msg.0,
NewPooledTransactionHashes::Eth68(msg) => &msg.hashes,
}
}

/// Returns a mutable reference to transaction hashes.
pub fn hashes_mut(&mut self) -> &mut Vec<B256> {
match self {
Expand Down Expand Up @@ -212,14 +220,45 @@ impl NewPooledTransactionHashes {
}
}

/// Returns an iterator over tx hashes zipped with corresponding eth68 metadata if this is
/// an eth68 message.
/// Returns an immutable reference to the inner type if this an eth68 announcement.
pub fn as_eth68(&self) -> Option<&NewPooledTransactionHashes68> {
match self {
NewPooledTransactionHashes::Eth66(_) => None,
NewPooledTransactionHashes::Eth68(msg) => Some(msg),
}
}

/// Returns a mutable reference to the inner type if this an eth68 announcement.
pub fn as_eth68_mut(&mut self) -> Option<&mut NewPooledTransactionHashes68> {
match self {
NewPooledTransactionHashes::Eth66(_) => None,
NewPooledTransactionHashes::Eth68(msg) => Some(msg),
}
}

/// Returns a mutable reference to the inner type if this an eth66 announcement.
pub fn as_eth66_mut(&mut self) -> Option<&mut NewPooledTransactionHashes66> {
match self {
NewPooledTransactionHashes::Eth66(msg) => Some(msg),
NewPooledTransactionHashes::Eth68(_) => None,
}
}

/// Returns the inner type if this an eth68 announcement.
pub fn take_eth68(&mut self) -> Option<NewPooledTransactionHashes68> {
match self {
NewPooledTransactionHashes::Eth66(_) => None,
NewPooledTransactionHashes::Eth68(msg) => Some(mem::take(msg)),
}
}

/// Returns the inner type if this an eth66 announcement.
pub fn take_eth66(&mut self) -> Option<NewPooledTransactionHashes66> {
match self {
NewPooledTransactionHashes::Eth66(msg) => Some(mem::take(msg)),
NewPooledTransactionHashes::Eth68(_) => None,
}
}
}

impl From<NewPooledTransactionHashes> for EthMessage {
Expand All @@ -243,6 +282,57 @@ impl From<NewPooledTransactionHashes68> for NewPooledTransactionHashes {
}
}

/// Interface for handling an announcement.
pub trait HandleAnnouncement {
/// The announcement contains no entries.
fn is_empty(&self) -> bool;

/// Retain only entries for which the hash in the entry, satisfies a given predicate.
fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool);
}

impl HandleAnnouncement for NewPooledTransactionHashes {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool) {
match self {
NewPooledTransactionHashes::Eth66(msg) => msg.retain_by_hash(f),
NewPooledTransactionHashes::Eth68(msg) => msg.retain_by_hash(f),
}
}
}

/// Announcement data that has been validated according to the configured network. For an eth68
/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66
/// announcement, values of the map are `None`.
pub type ValidAnnouncementData = HashMap<TxHash, Option<(u8, usize)>>;

impl HandleAnnouncement for ValidAnnouncementData {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.retain(|&hash, _| f(hash))
}
}

/// Hashes extracted from valid announcement data. For an eth68 announcement, this means the eth68
/// metadata should have been cached already.
pub type ValidTxHashes = Vec<TxHash>;

impl HandleAnnouncement for ValidTxHashes {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.retain(|&hash| f(hash))
}
}

/// This informs peers of transaction hashes for transactions that have appeared on the network,
/// but have not been included in a block.
#[derive_arbitrary(rlp)]
Expand All @@ -261,6 +351,27 @@ impl From<Vec<B256>> for NewPooledTransactionHashes66 {
}
}

impl HandleAnnouncement for NewPooledTransactionHashes66 {
fn is_empty(&self) -> bool {
self.0.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
let mut indices_to_remove = vec![];
for (i, &hash) in self.0.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}

for (i, index) in indices_to_remove.into_iter().rev().enumerate() {
let index = index.saturating_sub(i);

self.0.remove(index);
}
}
}

/// Same as [`NewPooledTransactionHashes66`] but extends that that beside the transaction hashes,
/// the node sends the transaction types and their sizes (as defined in EIP-2718) as well.
#[derive(Clone, Debug, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -401,6 +512,29 @@ impl Decodable for NewPooledTransactionHashes68 {
}
}

impl HandleAnnouncement for NewPooledTransactionHashes68 {
fn is_empty(&self) -> bool {
self.hashes.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
let mut indices_to_remove = vec![];
for (i, &hash) in self.hashes.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}

for (i, index) in indices_to_remove.into_iter().rev().enumerate() {
let index = index.saturating_sub(i);

self.hashes.remove(index);
self.types.remove(index);
self.sizes.remove(index);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 4 additions & 1 deletion crates/net/network-api/src/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ pub enum ReputationChangeKind {
///
/// Note: this will we only used in pre-merge, pow consensus, since after no more block announcements are sent via devp2p: [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
BadBlock,
/// Peer sent a bad transaction messages. E.g. Transactions which weren't recoverable.
/// Peer sent a bad transaction message. E.g. Transactions which weren't recoverable.
BadTransactions,
/// Peer sent a bad announcement message, e.g. invalid transaction type for the configured
/// network.
BadAnnouncement,
/// Peer sent a message that included a hash or transaction that we already received from the
/// peer.
///
Expand Down
4 changes: 2 additions & 2 deletions crates/net/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ reth-provider.workspace = true
reth-rpc-types.workspace = true
reth-tokio-util.workspace = true

# ethereum
enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
alloy-rlp.workspace = true

# async/futures
Expand Down Expand Up @@ -66,8 +68,6 @@ secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recov
derive_more.workspace = true
schnellru.workspace = true
itertools.workspace = true

enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
tempfile = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions crates/net/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ pub use session::{
PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId,
SessionLimits, SessionManager, SessionsConfig,
};
pub use transactions::{AnnouncementFilter, FilterAnnouncement, ValidateTx68};

pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};
8 changes: 8 additions & 0 deletions crates/net/network/src/peers/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const ALREADY_SEEN_TRANSACTION_REPUTATION_CHANGE: i32 = 0;
/// The reputation change to apply to a peer which violates protocol rules: minimal reputation
const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN;

/// The reputation change to apply to a peer that sent a bad announcement.
// todo: current value is a hint, needs to be set properly
const BAD_ANNOUNCEMENT_REPUTATION_CHANGE: i32 = REPUTATION_UNIT;

/// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold
#[inline]
pub(crate) fn is_banned_reputation(reputation: i32) -> bool {
Expand All @@ -59,6 +63,8 @@ pub struct ReputationChangeWeights {
pub failed_to_connect: Reputation,
/// Weight for [`ReputationChangeKind::Dropped`]
pub dropped: Reputation,
/// Weight for [`ReputationChangeKind::BadMessage`]
pub bad_announcement: Reputation,
}

// === impl ReputationChangeWeights ===
Expand All @@ -78,6 +84,7 @@ impl ReputationChangeWeights {
ReputationChangeKind::Dropped => self.dropped.into(),
ReputationChangeKind::Reset => DEFAULT_REPUTATION.into(),
ReputationChangeKind::Other(val) => val.into(),
ReputationChangeKind::BadAnnouncement => self.bad_announcement.into(),
}
}
}
Expand All @@ -93,6 +100,7 @@ impl Default for ReputationChangeWeights {
bad_protocol: BAD_PROTOCOL_REPUTATION_CHANGE,
failed_to_connect: FAILED_TO_CONNECT_REPUTATION_CHANGE,
dropped: REMOTE_DISCONNECT_REPUTATION_CHANGE,
bad_announcement: BAD_ANNOUNCEMENT_REPUTATION_CHANGE,
}
}
}
Expand Down
25 changes: 15 additions & 10 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use reth_eth_wire::{EthVersion, GetPooledTransactions};
use reth_eth_wire::{EthVersion, GetPooledTransactions, HandleAnnouncement};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{PeerId, PooledTransactionsElement, TxHash};
use schnellru::{ByLength, Unlimited};
Expand All @@ -16,7 +16,9 @@ use std::{
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};

use super::{Peer, PooledTransactions, FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT};
use super::{
AnnouncementFilter, Peer, PooledTransactions, FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
};

/// Maximum concurrent [`GetPooledTxRequest`]s to allow per peer.
pub(super) const MAX_CONCURRENT_TX_REQUESTS_PER_PEER: u8 = 1;
Expand Down Expand Up @@ -65,6 +67,8 @@ pub(super) struct TransactionFetcher {
pub(super) unknown_hashes: LruMap<TxHash, (u8, LruCache<PeerId>), Unlimited>,
/// Size metadata for unknown eth68 hashes.
pub(super) eth68_meta: LruMap<TxHash, usize, Unlimited>,
/// Filter for valid eth68 announcements.
pub(super) filter_valid_hashes: AnnouncementFilter,
}

// === impl TransactionFetcher ===
Expand Down Expand Up @@ -312,18 +316,18 @@ impl TransactionFetcher {
self.remove_from_unknown_hashes(hashes)
}

pub(super) fn filter_unseen_hashes(
pub(super) fn filter_unseen_hashes<T: HandleAnnouncement>(
&mut self,
new_announced_hashes: &mut Vec<TxHash>,
new_announced_hashes: &mut T,
peer_id: PeerId,
is_session_active: impl Fn(PeerId) -> bool,
) {
// filter out inflight hashes, and register the peer as fallback for all inflight hashes
new_announced_hashes.retain(|hash| {
new_announced_hashes.retain_by_hash(|hash| {
// occupied entry
if let Some((_retries, ref mut backups)) = self.unknown_hashes.peek_mut(hash) {
if let Some((_retries, ref mut backups)) = self.unknown_hashes.peek_mut(&hash) {
// hash has been seen but is not inflight
if self.buffered_hashes.remove(hash) {
if self.buffered_hashes.remove(&hash) {
return true
}
// hash has been seen and is in flight. store peer as fallback peer.
Expand All @@ -339,13 +343,13 @@ impl TransactionFetcher {
for peer_id in ended_sessions {
backups.remove(&peer_id);
}
backups.insert(peer_id);

return false
}

let msg_version = || self.eth68_meta.peek(hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66);

// vacant entry
let msg_version = || self.eth68_meta.peek(&hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66);

trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
Expand Down Expand Up @@ -718,6 +722,7 @@ impl Default for TransactionFetcher {
),
unknown_hashes: LruMap::new_unlimited(),
eth68_meta: LruMap::new_unlimited(),
filter_valid_hashes: Default::default(),
}
}
}
Expand Down
Loading
Loading